From 32d1abdcee6dd179181fe0c5789a582014ff3206 Mon Sep 17 00:00:00 2001 From: William Desportes Date: Mon, 7 Oct 2024 23:31:33 +0200 Subject: [PATCH] Implement saving the received results --- snow-scanner/Cargo.toml | 2 +- snow-scanner/src/event_bus.rs | 69 ++++++++++---- snow-scanner/src/main.rs | 104 ++++++++++++++++----- snow-scanner/src/server.rs | 141 +++++++++++++++-------------- snow-scanner/src/worker/Cargo.toml | 2 +- 5 files changed, 206 insertions(+), 112 deletions(-) diff --git a/snow-scanner/Cargo.toml b/snow-scanner/Cargo.toml index fca07a6..fc3611e 100644 --- a/snow-scanner/Cargo.toml +++ b/snow-scanner/Cargo.toml @@ -3,7 +3,7 @@ name = "snow-scanner" version = "0.1.0" authors = ["William Desportes "] edition = "2021" -rust-version = "1.78.0" # MSRV +rust-version = "1.81.0" # MSRV description = "A program to scan internet and find scanners" homepage = "https://github.com/wdes/snow-scanner/tree/v1.2.0-dev#readme" repository = "https://github.com/wdes/snow-scanner" diff --git a/snow-scanner/src/event_bus.rs b/snow-scanner/src/event_bus.rs index e68c399..d67e15d 100644 --- a/snow-scanner/src/event_bus.rs +++ b/snow-scanner/src/event_bus.rs @@ -1,11 +1,17 @@ +use std::{net::IpAddr, str::FromStr}; + +use crate::{worker::detection::detect_scanner_from_name, DbConnection, SnowDb}; +use hickory_resolver::Name; use rocket::futures::channel::mpsc as rocket_mpsc; use rocket::futures::StreamExt; use rocket::tokio; +use crate::Scanner; + /// Handles all the raw events being streamed from balancers and parses and filters them into only the events we care about. pub struct EventBus { - events_rx: rocket_mpsc::Receiver, - events_tx: rocket_mpsc::Sender, + events_rx: rocket_mpsc::Receiver, + events_tx: rocket_mpsc::Sender, bus_tx: tokio::sync::broadcast::Sender, } @@ -20,13 +26,13 @@ impl EventBus { } } - pub async fn run(&mut self) { + // db: &Connection + pub async fn run(&mut self, mut conn: DbConnection) { info!("EventBus started"); loop { tokio::select! { Some(event) = self.events_rx.next() => { - info!("EventBus received: {event}"); - self.handle_event(event); + self.handle_event(event, &mut conn).await; } else => { warn!("EventBus stopped"); @@ -36,18 +42,44 @@ impl EventBus { } } - fn handle_event(&self, event: rocket_ws::Message) { - info!("Received event: {}", event); + async fn handle_event(&self, event: EventBusWriterEvent, db: &mut DbConnection) { + info!("Received event"); if self.bus_tx.receiver_count() == 0 { return; } - match self.bus_tx.send(event) { - Ok(count) => { - info!("Event sent to {count} subscribers"); - } - Err(err) => { - error!("Error sending event to subscribers: {}", err); + match event { + EventBusWriterEvent::ScannerFoundResponse { name, address } => { + let name = Name::from_str(name.as_str()).unwrap(); + match detect_scanner_from_name(&name) { + Ok(Some(scanner_type)) => { + match Scanner::find_or_new(address.into(), scanner_type, Some(name), db) + .await + { + Ok(scanner) => { + let _ = scanner.save(db).await; + } + Err(err) => { + error!("Error find or save: {:?}", err); + } + } + } + Ok(None) => { + error!("No name detected for: {:?}", name); + } + + Err(err) => { + error!("No name detected error: {:?}", err); + } + }; } + EventBusWriterEvent::BroadcastMessage(msg) => match self.bus_tx.send(msg) { + Ok(count) => { + info!("Event sent to {count} subscribers"); + } + Err(err) => { + error!("Error sending event to subscribers: {}", err); + } + }, } } @@ -69,15 +101,20 @@ pub struct EventBusSubscriber { /// Enables subscriptions to the event bus pub struct EventBusWriter { - bus_tx: rocket_mpsc::Sender, + bus_tx: rocket_mpsc::Sender, +} + +pub enum EventBusWriterEvent { + BroadcastMessage(rocket_ws::Message), + ScannerFoundResponse { name: String, address: IpAddr }, } impl EventBusWriter { - pub fn new(bus_tx: rocket_mpsc::Sender) -> Self { + pub fn new(bus_tx: rocket_mpsc::Sender) -> Self { Self { bus_tx } } - pub fn write(&self) -> rocket_mpsc::Sender { + pub fn write(&self) -> rocket_mpsc::Sender { self.bus_tx.clone() } } diff --git a/snow-scanner/src/main.rs b/snow-scanner/src/main.rs index 24fb8ac..9539d0f 100644 --- a/snow-scanner/src/main.rs +++ b/snow-scanner/src/main.rs @@ -4,8 +4,15 @@ use chrono::{NaiveDateTime, Utc}; extern crate rocket; use cidr::IpCidr; -use event_bus::{EventBusSubscriber, EventBusWriter}; -use rocket::{fairing::AdHoc, futures::SinkExt, trace::error, Rocket, State}; +use event_bus::{EventBusSubscriber, EventBusWriter, EventBusWriterEvent}; +use rocket::{ + fairing::AdHoc, + futures::SinkExt, + http::Status, + request::{FromRequest, Outcome, Request}, + trace::error, + Rocket, State, +}; use rocket_db_pools::{ rocket::{ figment::{ @@ -16,7 +23,7 @@ use rocket_db_pools::{ fs::NamedFile, Responder, }, - Connection, + Connection, Pool, }; use crate::worker::modules::{Network, WorkerMessages}; @@ -32,7 +39,10 @@ use rocket_ws::WebSocket; use server::Server; use worker::detection::{detect_scanner, get_dns_client, Scanners}; -use std::{env, fmt}; +use std::{ + env, fmt, + ops::{Deref, DerefMut}, +}; use std::{io::Write, net::SocketAddr}; use std::{path::PathBuf, str::FromStr}; use uuid::Uuid; @@ -49,11 +59,43 @@ pub mod worker; use crate::models::*; -#[derive(Database)] +#[derive(Database, Clone)] #[database("snow_scanner_db")] pub struct SnowDb(MysqlPool); -type DbConn = Connection; +pub type ReqDbConn = Connection; +pub type DbConn = DbConnection; + +#[rocket::async_trait] +impl<'r, D: Database> FromRequest<'r> for DbConnection { + type Error = Option<::Error>; + + async fn from_request(req: &'r Request<'_>) -> Outcome { + match D::fetch(req.rocket()) { + Some(db) => match db.get().await { + Ok(conn) => Outcome::Success(DbConnection(conn)), + Err(e) => Outcome::Error((Status::ServiceUnavailable, Some(e))), + }, + None => Outcome::Error((Status::InternalServerError, None)), + } + } +} + +pub struct DbConnection(pub ::Connection); + +impl Deref for DbConnection { + type Target = ::Connection; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for DbConnection { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} trait IsStatic { fn is_static(self: &Self) -> bool; @@ -251,10 +293,12 @@ async fn handle_scan( info!("Added {}", cidr.to_string()); let net = IpCidr::from_str(cidr).unwrap(); - let msg = WorkerMessages::DoWorkRequest { - neworks: vec![Network(net)], - } - .into(); + let msg = EventBusWriterEvent::BroadcastMessage( + WorkerMessages::DoWorkRequest { + neworks: vec![Network(net)], + } + .into(), + ); let _ = bus_tx.send(msg).await; } @@ -425,7 +469,7 @@ static SCAN_TASKS_FOOT: &str = r#" "#; #[get("/scan/tasks")] -async fn handle_list_scan_tasks(mut db: Connection) -> MultiReply { +async fn handle_list_scan_tasks(mut db: DbConn) -> MultiReply { let mut html_data: Vec = vec![SCAN_TASKS_HEAD.to_string()]; let scan_tasks_list = match ScanTask::list(&mut db).await { @@ -504,19 +548,19 @@ struct AppConfigs { } async fn report_counts<'a>(rocket: Rocket) -> Rocket { - use rocket_db_pools::diesel::AsyncConnectionWrapper; - let conn = SnowDb::fetch(&rocket) - .expect("database is attached") + .expect("Failed to get DB connection") + .clone() .get() .await .unwrap_or_else(|e| { span_error!("failed to connect to MySQL database" => error!("{e}")); panic!("aborting launch"); }); - - let _: AsyncConnectionWrapper<_> = conn.into(); - info!("Connected to the DB"); + match Scanner::list_names(Scanners::Stretchoid, &mut DbConnection(conn)).await { + Ok(d) => info!("Found {} Stretchoid scanners", d.len()), + Err(err) => error!("Unable to fetch Stretchoid scanners: {err}"), + } rocket } @@ -569,13 +613,25 @@ async fn main() -> Result<(), rocket::Error> { } }) })) - .attach(AdHoc::on_liftoff("Run websocket client manager", |_| { - Box::pin(async move { - rocket::tokio::spawn(async move { - event_bus.run().await; - }); - }) - })) + .attach(AdHoc::on_liftoff( + "Run websocket client manager", + move |r| { + Box::pin(async move { + let conn = SnowDb::fetch(r) + .expect("Failed to get DB connection") + .clone() + .get() + .await + .unwrap_or_else(|e| { + span_error!("failed to connect to MySQL database" => error!("{e}")); + panic!("aborting launch"); + }); + rocket::tokio::spawn(async move { + event_bus.run(DbConnection(conn)).await; + }); + }) + }, + )) .manage(AppConfigs { static_data_dir }) .manage(event_subscriber) .manage(event_writer) diff --git a/snow-scanner/src/server.rs b/snow-scanner/src/server.rs index b5cf28b..216f48b 100644 --- a/snow-scanner/src/server.rs +++ b/snow-scanner/src/server.rs @@ -3,63 +3,11 @@ use rocket_ws::{frame::CloseFrame, Message}; use std::pin::Pin; use crate::{ - event_bus::{EventBusEvent, EventBusWriter}, + event_bus::{EventBusEvent, EventBusWriter, EventBusWriterEvent}, worker::modules::WorkerMessages, }; use rocket::futures::channel::mpsc as rocket_mpsc; -pub struct WsChat {} - -impl WsChat { - pub async fn work( - mut stream: rocket_ws::stream::DuplexStream, - mut bus_rx: rocket::tokio::sync::broadcast::Receiver, - mut bus_tx: rocket_mpsc::Sender, - mut ws_receiver: rocket_mpsc::Receiver, - ) { - use crate::rocket::futures::StreamExt; - use rocket::tokio; - - let _ = bus_tx.send(rocket_ws::Message::Ping(vec![])).await; - - let mut worker = Worker::initial(&mut stream); - let mut interval = rocket::tokio::time::interval(std::time::Duration::from_secs(60)); - loop { - tokio::select! { - _ = interval.tick() => { - // Send message every X seconds - if let Ok(true) = worker.tick().await { - break; - } - } - result = bus_rx.recv() => { - let message = match result { - Ok(message) => message, - Err(err) => { - error!("Bus error: {err}"); - continue; - } - }; - if let Err(err) = worker.send(message).await { - error!("Error sending event to Event bus WebSocket: {}", err); - break; - } - } - Some(message) = ws_receiver.next() => { - info!("Received message from other client: {:?}", message); - let _ = worker.send(message).await; - }, - Ok(false) = worker.poll() => { - // Continue the loop - } - else => { - break; - } - } - } - } -} - pub struct Server {} type HandleBox = Pin< @@ -70,21 +18,13 @@ impl Server { pub fn handle( stream: rocket_ws::stream::DuplexStream, bus_rx: rocket::tokio::sync::broadcast::Receiver, - bus_tx: rocket_mpsc::Sender, + bus_tx: rocket_mpsc::Sender, ws_receiver: rocket_mpsc::Receiver, ) -> HandleBox { use rocket::tokio; - //SharedData::add_worker(tx.clone(), &shared.workers); - //move |mut stream: ws::stream::DuplexStream| { Box::pin(async move { - let work_fn = WsChat::work( - stream, - bus_rx, - bus_tx, - ws_receiver, - //workers - ); + let work_fn = Worker::work(stream, bus_rx, bus_tx, ws_receiver); tokio::spawn(work_fn); tokio::signal::ctrl_c().await.unwrap(); @@ -108,10 +48,14 @@ impl Server { }*/ pub fn shutdown_to_all(server: &EventBusWriter) -> () { - let res = server.write().try_send(Message::Close(Some(CloseFrame { - code: rocket_ws::frame::CloseCode::Away, - reason: "Server stop".into(), - }))); + let res = server + .write() + .try_send(EventBusWriterEvent::BroadcastMessage(Message::Close(Some( + CloseFrame { + code: rocket_ws::frame::CloseCode::Away, + reason: "Server stop".into(), + }, + )))); match res { Ok(_) => { info!("Worker did receive stop signal."); @@ -148,15 +92,66 @@ pub struct Worker<'a> { authenticated: bool, login: Option, stream: &'a mut rocket_ws::stream::DuplexStream, + bus_tx: &'a mut rocket_mpsc::Sender, } impl<'a> Worker<'a> { - pub fn initial(stream: &mut rocket_ws::stream::DuplexStream) -> Worker { + pub fn initial( + stream: &'a mut rocket_ws::stream::DuplexStream, + bus_tx: &'a mut rocket_mpsc::Sender, + ) -> Worker<'a> { info!("New worker"); Worker { authenticated: false, login: None, stream, + bus_tx, + } + } + + pub async fn work( + mut stream: rocket_ws::stream::DuplexStream, + mut bus_rx: rocket::tokio::sync::broadcast::Receiver, + mut bus_tx: rocket_mpsc::Sender, + mut ws_receiver: rocket_mpsc::Receiver, + ) { + use crate::rocket::futures::StreamExt; + use rocket::tokio; + + let mut worker = Worker::initial(&mut stream, &mut bus_tx); + let mut interval = rocket::tokio::time::interval(std::time::Duration::from_secs(60)); + loop { + tokio::select! { + _ = interval.tick() => { + // Send message every X seconds + if let Ok(true) = worker.tick().await { + break; + } + } + result = bus_rx.recv() => { + let message = match result { + Ok(message) => message, + Err(err) => { + error!("Bus error: {err}"); + continue; + } + }; + if let Err(err) = worker.send(message).await { + error!("Error sending event to Event bus WebSocket: {}", err); + break; + } + } + Some(message) = ws_receiver.next() => { + info!("Received message from other client: {:?}", message); + let _ = worker.send(message).await; + }, + Ok(false) = worker.poll() => { + // Continue the loop + } + else => { + break; + } + } } } @@ -272,7 +267,9 @@ impl<'a> Worker<'a> { } WorkerMessages::ScannerFoundResponse { name, address } => { info!("Detected {name} for {address}"); - //self.new_scanners.insert(name, address); + let _ = self + .bus_tx + .try_send(EventBusWriterEvent::ScannerFoundResponse { name, address }); Ok(()) } WorkerMessages::GetWorkRequest {} => { @@ -282,7 +279,11 @@ impl<'a> Worker<'a> { WorkerMessages::DoWorkRequest { .. } | WorkerMessages::Invalid { .. } => { error!("Unable to understand: {msg}"); // Unable to understand, close the connection - //return ws.close(); + let close_frame = rocket_ws::frame::CloseFrame { + code: rocket_ws::frame::CloseCode::Unsupported, + reason: "Invalid data received".to_string().into(), + }; + let _ = self.stream.close(Some(close_frame)).await; Err("Unable to understand: {msg}}") } /*msg => { error!("No implemented: {:#?}", msg); diff --git a/snow-scanner/src/worker/Cargo.toml b/snow-scanner/src/worker/Cargo.toml index bb69f2d..3a274aa 100644 --- a/snow-scanner/src/worker/Cargo.toml +++ b/snow-scanner/src/worker/Cargo.toml @@ -3,7 +3,7 @@ name = "snow-scanner-worker" version = "0.1.0" authors = ["William Desportes "] edition = "2021" -rust-version = "1.78.0" # MSRV +rust-version = "1.81.0" # MSRV description = "The CLI to run a snow-scanner worker" [[bin]]