From f589d4c11e9a70c5721f50eda58cbbc7a8a36564 Mon Sep 17 00:00:00 2001 From: William Desportes Date: Mon, 7 Oct 2024 00:15:29 +0200 Subject: [PATCH] Broadcast work requests to each node --- snow-scanner/src/main.rs | 34 +++++++++++++++++++++--------- snow-scanner/src/server.rs | 16 +++++--------- snow-scanner/src/worker/modules.rs | 6 ++++++ 3 files changed, 35 insertions(+), 21 deletions(-) diff --git a/snow-scanner/src/main.rs b/snow-scanner/src/main.rs index 38507e1..24fb8ac 100644 --- a/snow-scanner/src/main.rs +++ b/snow-scanner/src/main.rs @@ -3,6 +3,7 @@ use chrono::{NaiveDateTime, Utc}; #[macro_use] extern crate rocket; +use cidr::IpCidr; use event_bus::{EventBusSubscriber, EventBusWriter}; use rocket::{fairing::AdHoc, futures::SinkExt, trace::error, Rocket, State}; use rocket_db_pools::{ @@ -18,6 +19,8 @@ use rocket_db_pools::{ Connection, }; +use crate::worker::modules::{Network, WorkerMessages}; + use rocket_db_pools::diesel::mysql::{Mysql, MysqlValue}; use rocket_db_pools::diesel::serialize::IsNull; use rocket_db_pools::diesel::sql_types::Text; @@ -29,9 +32,9 @@ use rocket_ws::WebSocket; use server::Server; use worker::detection::{detect_scanner, get_dns_client, Scanners}; -use std::path::PathBuf; use std::{env, fmt}; use std::{io::Write, net::SocketAddr}; +use std::{path::PathBuf, str::FromStr}; use uuid::Uuid; use serde::{Deserialize, Deserializer, Serialize}; @@ -220,17 +223,21 @@ enum MultiReply { } #[post("/scan", data = "
")] -async fn handle_scan(mut db: DbConn, form: Form>) -> MultiReply { +async fn handle_scan( + mut db: DbConn, + form: Form>, + event_bus_writer: &State, +) -> MultiReply { if form.username.len() < 4 { return MultiReply::FormError(PlainText("Invalid username".to_string())); } let task_group_id: Uuid = Uuid::now_v7(); - for ip in form.ips.lines() { + for cidr in form.ips.lines() { let scan_task = ScanTask { task_group_id: task_group_id.to_string(), - cidr: ip.to_string(), + cidr: cidr.to_string(), created_by_username: form.username.to_string(), created_at: Utc::now().naive_utc(), updated_at: None, @@ -238,8 +245,19 @@ async fn handle_scan(mut db: DbConn, form: Form>) -> MultiReply { still_processing_at: None, ended_at: None, }; + let mut bus_tx = event_bus_writer.write(); match scan_task.save(&mut db).await { - Ok(_) => error!("Added {}", ip.to_string()), + Ok(_) => { + info!("Added {}", cidr.to_string()); + let net = IpCidr::from_str(cidr).unwrap(); + + let msg = WorkerMessages::DoWorkRequest { + neworks: vec![Network(net)], + } + .into(); + + let _ = bus_tx.send(msg).await; + } Err(err) => error!("Not added: {:?}", err), } } @@ -460,11 +478,7 @@ async fn index() -> HtmlContents { } #[get("/ping")] -async fn pong(event_bus_writer: &State) -> PlainText { - let mut bus_tx = event_bus_writer.write(); - let _ = bus_tx - .send(rocket_ws::Message::Text("Groumpf!".to_string())) - .await; +async fn pong() -> PlainText { PlainText("pong".to_string()) } diff --git a/snow-scanner/src/server.rs b/snow-scanner/src/server.rs index edad8c8..b5cf28b 100644 --- a/snow-scanner/src/server.rs +++ b/snow-scanner/src/server.rs @@ -1,11 +1,10 @@ -use cidr::IpCidr; use rocket::futures::{stream::Next, SinkExt, StreamExt}; use rocket_ws::{frame::CloseFrame, Message}; -use std::{pin::Pin, str::FromStr}; +use std::pin::Pin; use crate::{ event_bus::{EventBusEvent, EventBusWriter}, - worker::modules::{Network, WorkerMessages}, + worker::modules::WorkerMessages, }; use rocket::futures::channel::mpsc as rocket_mpsc; @@ -21,10 +20,8 @@ impl WsChat { use crate::rocket::futures::StreamExt; use rocket::tokio; - let _ = bus_tx - .send(rocket_ws::Message::Text("I am new !".to_string())) - .await; - //SharedData::send_to_all(&workers, "I am new !"); + let _ = bus_tx.send(rocket_ws::Message::Ping(vec![])).await; + let mut worker = Worker::initial(&mut stream); let mut interval = rocket::tokio::time::interval(std::time::Duration::from_secs(60)); loop { @@ -279,10 +276,7 @@ impl<'a> Worker<'a> { Ok(()) } WorkerMessages::GetWorkRequest {} => { - let net = IpCidr::from_str("52.189.78.0/24").unwrap(); - worker_reply = Some(WorkerMessages::DoWorkRequest { - neworks: vec![Network(net)], - }); + worker_reply = Some(WorkerMessages::DoWorkRequest { neworks: vec![] }); Ok(()) } WorkerMessages::DoWorkRequest { .. } | WorkerMessages::Invalid { .. } => { diff --git a/snow-scanner/src/worker/modules.rs b/snow-scanner/src/worker/modules.rs index 3a257af..f51d995 100644 --- a/snow-scanner/src/worker/modules.rs +++ b/snow-scanner/src/worker/modules.rs @@ -67,6 +67,12 @@ impl Into for String { } } +impl Into for WorkerMessages { + fn into(self) -> RocketMessage { + RocketMessage::Text(self.to_string()) + } +} + impl TryInto for RocketMessage { type Error = String;