Implement saving the received results

This commit is contained in:
2024-10-07 23:31:33 +02:00
parent f589d4c11e
commit 32d1abdcee
5 changed files with 206 additions and 112 deletions

View File

@ -3,7 +3,7 @@ name = "snow-scanner"
version = "0.1.0" version = "0.1.0"
authors = ["William Desportes <williamdes@wdes.fr>"] authors = ["William Desportes <williamdes@wdes.fr>"]
edition = "2021" edition = "2021"
rust-version = "1.78.0" # MSRV rust-version = "1.81.0" # MSRV
description = "A program to scan internet and find scanners" description = "A program to scan internet and find scanners"
homepage = "https://github.com/wdes/snow-scanner/tree/v1.2.0-dev#readme" homepage = "https://github.com/wdes/snow-scanner/tree/v1.2.0-dev#readme"
repository = "https://github.com/wdes/snow-scanner" repository = "https://github.com/wdes/snow-scanner"

View File

@ -1,11 +1,17 @@
use std::{net::IpAddr, str::FromStr};
use crate::{worker::detection::detect_scanner_from_name, DbConnection, SnowDb};
use hickory_resolver::Name;
use rocket::futures::channel::mpsc as rocket_mpsc; use rocket::futures::channel::mpsc as rocket_mpsc;
use rocket::futures::StreamExt; use rocket::futures::StreamExt;
use rocket::tokio; use rocket::tokio;
use crate::Scanner;
/// Handles all the raw events being streamed from balancers and parses and filters them into only the events we care about. /// Handles all the raw events being streamed from balancers and parses and filters them into only the events we care about.
pub struct EventBus { pub struct EventBus {
events_rx: rocket_mpsc::Receiver<rocket_ws::Message>, events_rx: rocket_mpsc::Receiver<EventBusWriterEvent>,
events_tx: rocket_mpsc::Sender<rocket_ws::Message>, events_tx: rocket_mpsc::Sender<EventBusWriterEvent>,
bus_tx: tokio::sync::broadcast::Sender<EventBusEvent>, bus_tx: tokio::sync::broadcast::Sender<EventBusEvent>,
} }
@ -20,13 +26,13 @@ impl EventBus {
} }
} }
pub async fn run(&mut self) { // db: &Connection<SnowDb>
pub async fn run(&mut self, mut conn: DbConnection<SnowDb>) {
info!("EventBus started"); info!("EventBus started");
loop { loop {
tokio::select! { tokio::select! {
Some(event) = self.events_rx.next() => { Some(event) = self.events_rx.next() => {
info!("EventBus received: {event}"); self.handle_event(event, &mut conn).await;
self.handle_event(event);
} }
else => { else => {
warn!("EventBus stopped"); warn!("EventBus stopped");
@ -36,18 +42,44 @@ impl EventBus {
} }
} }
fn handle_event(&self, event: rocket_ws::Message) { async fn handle_event(&self, event: EventBusWriterEvent, db: &mut DbConnection<SnowDb>) {
info!("Received event: {}", event); info!("Received event");
if self.bus_tx.receiver_count() == 0 { if self.bus_tx.receiver_count() == 0 {
return; return;
} }
match self.bus_tx.send(event) { match event {
EventBusWriterEvent::ScannerFoundResponse { name, address } => {
let name = Name::from_str(name.as_str()).unwrap();
match detect_scanner_from_name(&name) {
Ok(Some(scanner_type)) => {
match Scanner::find_or_new(address.into(), scanner_type, Some(name), db)
.await
{
Ok(scanner) => {
let _ = scanner.save(db).await;
}
Err(err) => {
error!("Error find or save: {:?}", err);
}
}
}
Ok(None) => {
error!("No name detected for: {:?}", name);
}
Err(err) => {
error!("No name detected error: {:?}", err);
}
};
}
EventBusWriterEvent::BroadcastMessage(msg) => match self.bus_tx.send(msg) {
Ok(count) => { Ok(count) => {
info!("Event sent to {count} subscribers"); info!("Event sent to {count} subscribers");
} }
Err(err) => { Err(err) => {
error!("Error sending event to subscribers: {}", err); error!("Error sending event to subscribers: {}", err);
} }
},
} }
} }
@ -69,15 +101,20 @@ pub struct EventBusSubscriber {
/// Enables subscriptions to the event bus /// Enables subscriptions to the event bus
pub struct EventBusWriter { pub struct EventBusWriter {
bus_tx: rocket_mpsc::Sender<EventBusEvent>, bus_tx: rocket_mpsc::Sender<EventBusWriterEvent>,
}
pub enum EventBusWriterEvent {
BroadcastMessage(rocket_ws::Message),
ScannerFoundResponse { name: String, address: IpAddr },
} }
impl EventBusWriter { impl EventBusWriter {
pub fn new(bus_tx: rocket_mpsc::Sender<EventBusEvent>) -> Self { pub fn new(bus_tx: rocket_mpsc::Sender<EventBusWriterEvent>) -> Self {
Self { bus_tx } Self { bus_tx }
} }
pub fn write(&self) -> rocket_mpsc::Sender<EventBusEvent> { pub fn write(&self) -> rocket_mpsc::Sender<EventBusWriterEvent> {
self.bus_tx.clone() self.bus_tx.clone()
} }
} }

View File

@ -4,8 +4,15 @@ use chrono::{NaiveDateTime, Utc};
extern crate rocket; extern crate rocket;
use cidr::IpCidr; use cidr::IpCidr;
use event_bus::{EventBusSubscriber, EventBusWriter}; use event_bus::{EventBusSubscriber, EventBusWriter, EventBusWriterEvent};
use rocket::{fairing::AdHoc, futures::SinkExt, trace::error, Rocket, State}; use rocket::{
fairing::AdHoc,
futures::SinkExt,
http::Status,
request::{FromRequest, Outcome, Request},
trace::error,
Rocket, State,
};
use rocket_db_pools::{ use rocket_db_pools::{
rocket::{ rocket::{
figment::{ figment::{
@ -16,7 +23,7 @@ use rocket_db_pools::{
fs::NamedFile, fs::NamedFile,
Responder, Responder,
}, },
Connection, Connection, Pool,
}; };
use crate::worker::modules::{Network, WorkerMessages}; use crate::worker::modules::{Network, WorkerMessages};
@ -32,7 +39,10 @@ use rocket_ws::WebSocket;
use server::Server; use server::Server;
use worker::detection::{detect_scanner, get_dns_client, Scanners}; use worker::detection::{detect_scanner, get_dns_client, Scanners};
use std::{env, fmt}; use std::{
env, fmt,
ops::{Deref, DerefMut},
};
use std::{io::Write, net::SocketAddr}; use std::{io::Write, net::SocketAddr};
use std::{path::PathBuf, str::FromStr}; use std::{path::PathBuf, str::FromStr};
use uuid::Uuid; use uuid::Uuid;
@ -49,11 +59,43 @@ pub mod worker;
use crate::models::*; use crate::models::*;
#[derive(Database)] #[derive(Database, Clone)]
#[database("snow_scanner_db")] #[database("snow_scanner_db")]
pub struct SnowDb(MysqlPool); pub struct SnowDb(MysqlPool);
type DbConn = Connection<SnowDb>; pub type ReqDbConn = Connection<SnowDb>;
pub type DbConn = DbConnection<SnowDb>;
#[rocket::async_trait]
impl<'r, D: Database> FromRequest<'r> for DbConnection<D> {
type Error = Option<<D::Pool as Pool>::Error>;
async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> {
match D::fetch(req.rocket()) {
Some(db) => match db.get().await {
Ok(conn) => Outcome::Success(DbConnection(conn)),
Err(e) => Outcome::Error((Status::ServiceUnavailable, Some(e))),
},
None => Outcome::Error((Status::InternalServerError, None)),
}
}
}
pub struct DbConnection<D: Database>(pub <D::Pool as Pool>::Connection);
impl<D: Database> Deref for DbConnection<D> {
type Target = <D::Pool as Pool>::Connection;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<D: Database> DerefMut for DbConnection<D> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
trait IsStatic { trait IsStatic {
fn is_static(self: &Self) -> bool; fn is_static(self: &Self) -> bool;
@ -251,10 +293,12 @@ async fn handle_scan(
info!("Added {}", cidr.to_string()); info!("Added {}", cidr.to_string());
let net = IpCidr::from_str(cidr).unwrap(); let net = IpCidr::from_str(cidr).unwrap();
let msg = WorkerMessages::DoWorkRequest { let msg = EventBusWriterEvent::BroadcastMessage(
WorkerMessages::DoWorkRequest {
neworks: vec![Network(net)], neworks: vec![Network(net)],
} }
.into(); .into(),
);
let _ = bus_tx.send(msg).await; let _ = bus_tx.send(msg).await;
} }
@ -425,7 +469,7 @@ static SCAN_TASKS_FOOT: &str = r#"
"#; "#;
#[get("/scan/tasks")] #[get("/scan/tasks")]
async fn handle_list_scan_tasks(mut db: Connection<SnowDb>) -> MultiReply { async fn handle_list_scan_tasks(mut db: DbConn) -> MultiReply {
let mut html_data: Vec<String> = vec![SCAN_TASKS_HEAD.to_string()]; let mut html_data: Vec<String> = vec![SCAN_TASKS_HEAD.to_string()];
let scan_tasks_list = match ScanTask::list(&mut db).await { let scan_tasks_list = match ScanTask::list(&mut db).await {
@ -504,19 +548,19 @@ struct AppConfigs {
} }
async fn report_counts<'a>(rocket: Rocket<rocket::Build>) -> Rocket<rocket::Build> { async fn report_counts<'a>(rocket: Rocket<rocket::Build>) -> Rocket<rocket::Build> {
use rocket_db_pools::diesel::AsyncConnectionWrapper;
let conn = SnowDb::fetch(&rocket) let conn = SnowDb::fetch(&rocket)
.expect("database is attached") .expect("Failed to get DB connection")
.clone()
.get() .get()
.await .await
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
span_error!("failed to connect to MySQL database" => error!("{e}")); span_error!("failed to connect to MySQL database" => error!("{e}"));
panic!("aborting launch"); panic!("aborting launch");
}); });
match Scanner::list_names(Scanners::Stretchoid, &mut DbConnection(conn)).await {
let _: AsyncConnectionWrapper<_> = conn.into(); Ok(d) => info!("Found {} Stretchoid scanners", d.len()),
info!("Connected to the DB"); Err(err) => error!("Unable to fetch Stretchoid scanners: {err}"),
}
rocket rocket
} }
@ -569,13 +613,25 @@ async fn main() -> Result<(), rocket::Error> {
} }
}) })
})) }))
.attach(AdHoc::on_liftoff("Run websocket client manager", |_| { .attach(AdHoc::on_liftoff(
"Run websocket client manager",
move |r| {
Box::pin(async move { Box::pin(async move {
let conn = SnowDb::fetch(r)
.expect("Failed to get DB connection")
.clone()
.get()
.await
.unwrap_or_else(|e| {
span_error!("failed to connect to MySQL database" => error!("{e}"));
panic!("aborting launch");
});
rocket::tokio::spawn(async move { rocket::tokio::spawn(async move {
event_bus.run().await; event_bus.run(DbConnection(conn)).await;
}); });
}) })
})) },
))
.manage(AppConfigs { static_data_dir }) .manage(AppConfigs { static_data_dir })
.manage(event_subscriber) .manage(event_subscriber)
.manage(event_writer) .manage(event_writer)

View File

@ -3,63 +3,11 @@ use rocket_ws::{frame::CloseFrame, Message};
use std::pin::Pin; use std::pin::Pin;
use crate::{ use crate::{
event_bus::{EventBusEvent, EventBusWriter}, event_bus::{EventBusEvent, EventBusWriter, EventBusWriterEvent},
worker::modules::WorkerMessages, worker::modules::WorkerMessages,
}; };
use rocket::futures::channel::mpsc as rocket_mpsc; use rocket::futures::channel::mpsc as rocket_mpsc;
pub struct WsChat {}
impl WsChat {
pub async fn work(
mut stream: rocket_ws::stream::DuplexStream,
mut bus_rx: rocket::tokio::sync::broadcast::Receiver<EventBusEvent>,
mut bus_tx: rocket_mpsc::Sender<EventBusEvent>,
mut ws_receiver: rocket_mpsc::Receiver<rocket_ws::Message>,
) {
use crate::rocket::futures::StreamExt;
use rocket::tokio;
let _ = bus_tx.send(rocket_ws::Message::Ping(vec![])).await;
let mut worker = Worker::initial(&mut stream);
let mut interval = rocket::tokio::time::interval(std::time::Duration::from_secs(60));
loop {
tokio::select! {
_ = interval.tick() => {
// Send message every X seconds
if let Ok(true) = worker.tick().await {
break;
}
}
result = bus_rx.recv() => {
let message = match result {
Ok(message) => message,
Err(err) => {
error!("Bus error: {err}");
continue;
}
};
if let Err(err) = worker.send(message).await {
error!("Error sending event to Event bus WebSocket: {}", err);
break;
}
}
Some(message) = ws_receiver.next() => {
info!("Received message from other client: {:?}", message);
let _ = worker.send(message).await;
},
Ok(false) = worker.poll() => {
// Continue the loop
}
else => {
break;
}
}
}
}
}
pub struct Server {} pub struct Server {}
type HandleBox = Pin< type HandleBox = Pin<
@ -70,21 +18,13 @@ impl Server {
pub fn handle( pub fn handle(
stream: rocket_ws::stream::DuplexStream, stream: rocket_ws::stream::DuplexStream,
bus_rx: rocket::tokio::sync::broadcast::Receiver<EventBusEvent>, bus_rx: rocket::tokio::sync::broadcast::Receiver<EventBusEvent>,
bus_tx: rocket_mpsc::Sender<EventBusEvent>, bus_tx: rocket_mpsc::Sender<EventBusWriterEvent>,
ws_receiver: rocket_mpsc::Receiver<rocket_ws::Message>, ws_receiver: rocket_mpsc::Receiver<rocket_ws::Message>,
) -> HandleBox { ) -> HandleBox {
use rocket::tokio; use rocket::tokio;
//SharedData::add_worker(tx.clone(), &shared.workers);
//move |mut stream: ws::stream::DuplexStream| {
Box::pin(async move { Box::pin(async move {
let work_fn = WsChat::work( let work_fn = Worker::work(stream, bus_rx, bus_tx, ws_receiver);
stream,
bus_rx,
bus_tx,
ws_receiver,
//workers
);
tokio::spawn(work_fn); tokio::spawn(work_fn);
tokio::signal::ctrl_c().await.unwrap(); tokio::signal::ctrl_c().await.unwrap();
@ -108,10 +48,14 @@ impl Server {
}*/ }*/
pub fn shutdown_to_all(server: &EventBusWriter) -> () { pub fn shutdown_to_all(server: &EventBusWriter) -> () {
let res = server.write().try_send(Message::Close(Some(CloseFrame { let res = server
.write()
.try_send(EventBusWriterEvent::BroadcastMessage(Message::Close(Some(
CloseFrame {
code: rocket_ws::frame::CloseCode::Away, code: rocket_ws::frame::CloseCode::Away,
reason: "Server stop".into(), reason: "Server stop".into(),
}))); },
))));
match res { match res {
Ok(_) => { Ok(_) => {
info!("Worker did receive stop signal."); info!("Worker did receive stop signal.");
@ -148,15 +92,66 @@ pub struct Worker<'a> {
authenticated: bool, authenticated: bool,
login: Option<String>, login: Option<String>,
stream: &'a mut rocket_ws::stream::DuplexStream, stream: &'a mut rocket_ws::stream::DuplexStream,
bus_tx: &'a mut rocket_mpsc::Sender<EventBusWriterEvent>,
} }
impl<'a> Worker<'a> { impl<'a> Worker<'a> {
pub fn initial(stream: &mut rocket_ws::stream::DuplexStream) -> Worker { pub fn initial(
stream: &'a mut rocket_ws::stream::DuplexStream,
bus_tx: &'a mut rocket_mpsc::Sender<EventBusWriterEvent>,
) -> Worker<'a> {
info!("New worker"); info!("New worker");
Worker { Worker {
authenticated: false, authenticated: false,
login: None, login: None,
stream, stream,
bus_tx,
}
}
pub async fn work(
mut stream: rocket_ws::stream::DuplexStream,
mut bus_rx: rocket::tokio::sync::broadcast::Receiver<EventBusEvent>,
mut bus_tx: rocket_mpsc::Sender<EventBusWriterEvent>,
mut ws_receiver: rocket_mpsc::Receiver<rocket_ws::Message>,
) {
use crate::rocket::futures::StreamExt;
use rocket::tokio;
let mut worker = Worker::initial(&mut stream, &mut bus_tx);
let mut interval = rocket::tokio::time::interval(std::time::Duration::from_secs(60));
loop {
tokio::select! {
_ = interval.tick() => {
// Send message every X seconds
if let Ok(true) = worker.tick().await {
break;
}
}
result = bus_rx.recv() => {
let message = match result {
Ok(message) => message,
Err(err) => {
error!("Bus error: {err}");
continue;
}
};
if let Err(err) = worker.send(message).await {
error!("Error sending event to Event bus WebSocket: {}", err);
break;
}
}
Some(message) = ws_receiver.next() => {
info!("Received message from other client: {:?}", message);
let _ = worker.send(message).await;
},
Ok(false) = worker.poll() => {
// Continue the loop
}
else => {
break;
}
}
} }
} }
@ -272,7 +267,9 @@ impl<'a> Worker<'a> {
} }
WorkerMessages::ScannerFoundResponse { name, address } => { WorkerMessages::ScannerFoundResponse { name, address } => {
info!("Detected {name} for {address}"); info!("Detected {name} for {address}");
//self.new_scanners.insert(name, address); let _ = self
.bus_tx
.try_send(EventBusWriterEvent::ScannerFoundResponse { name, address });
Ok(()) Ok(())
} }
WorkerMessages::GetWorkRequest {} => { WorkerMessages::GetWorkRequest {} => {
@ -282,7 +279,11 @@ impl<'a> Worker<'a> {
WorkerMessages::DoWorkRequest { .. } | WorkerMessages::Invalid { .. } => { WorkerMessages::DoWorkRequest { .. } | WorkerMessages::Invalid { .. } => {
error!("Unable to understand: {msg}"); error!("Unable to understand: {msg}");
// Unable to understand, close the connection // Unable to understand, close the connection
//return ws.close(); let close_frame = rocket_ws::frame::CloseFrame {
code: rocket_ws::frame::CloseCode::Unsupported,
reason: "Invalid data received".to_string().into(),
};
let _ = self.stream.close(Some(close_frame)).await;
Err("Unable to understand: {msg}}") Err("Unable to understand: {msg}}")
} /*msg => { } /*msg => {
error!("No implemented: {:#?}", msg); error!("No implemented: {:#?}", msg);

View File

@ -3,7 +3,7 @@ name = "snow-scanner-worker"
version = "0.1.0" version = "0.1.0"
authors = ["William Desportes <williamdes@wdes.fr>"] authors = ["William Desportes <williamdes@wdes.fr>"]
edition = "2021" edition = "2021"
rust-version = "1.78.0" # MSRV rust-version = "1.81.0" # MSRV
description = "The CLI to run a snow-scanner worker" description = "The CLI to run a snow-scanner worker"
[[bin]] [[bin]]