From de3b21e210e7adf86e2b808e26275b65433b7061 Mon Sep 17 00:00:00 2001 From: William Desportes Date: Tue, 24 Sep 2024 04:16:08 +0200 Subject: [PATCH] Make a working scan worker/server --- snow-scanner/src/main.rs | 94 ++------------ snow-scanner/src/models.rs | 44 ++++++- snow-scanner/src/server.rs | 52 +++++++- snow-scanner/src/worker/detection.rs | 61 ++++++++++ snow-scanner/src/worker/mod.rs | 1 + snow-scanner/src/worker/modules.rs | 4 +- snow-scanner/src/worker/worker.rs | 175 ++++++++++++++++----------- 7 files changed, 274 insertions(+), 157 deletions(-) create mode 100644 snow-scanner/src/worker/detection.rs diff --git a/snow-scanner/src/main.rs b/snow-scanner/src/main.rs index 571d116..747e535 100644 --- a/snow-scanner/src/main.rs +++ b/snow-scanner/src/main.rs @@ -5,27 +5,22 @@ use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer}; use log2::*; use chrono::{NaiveDateTime, Utc}; -use diesel::deserialize::{self, FromSqlRow}; +use diesel::deserialize::{self}; use diesel::mysql::{Mysql, MysqlValue}; use diesel::sql_types::Text; use diesel::r2d2::ConnectionManager; use diesel::r2d2::Pool; +use worker::detection::{detect_scanner, get_dns_client, Scanners}; use std::collections::HashMap; use std::io::Write; use std::path::PathBuf; -use std::str::FromStr; use std::{env, fmt}; use uuid::Uuid; use serde::{Deserialize, Deserializer, Serialize}; -use hickory_resolver::config::{NameServerConfigGroup, ResolverConfig, ResolverOpts}; -use hickory_resolver::{Name, Resolver}; -use std::net::IpAddr; -use std::time::Duration; - use diesel::serialize::IsNull; use diesel::{serialize, MysqlConnection}; use dns_ptr_resolver::{get_ptr, ResolvedResult}; @@ -41,17 +36,6 @@ use crate::server::Server; /// Short-hand for the database pool type to use throughout the app. type DbPool = Pool>; -// Create alias for HMAC-SHA256 -// type HmacSha256 = Hmac; - -#[derive(Debug, Clone, Copy, FromSqlRow)] -pub enum Scanners { - Stretchoid, - Binaryedge, - Censys, - InternetMeasurement, -} - trait IsStatic { fn is_static(self: &Self) -> bool; } @@ -128,24 +112,6 @@ impl deserialize::FromSql for Scanners { } } -fn detect_scanner(ptr_result: &ResolvedResult) -> Result { - match ptr_result.result { - Some(ref x) - if x.trim_to(2) - .eq_case(&Name::from_str("binaryedge.ninja.").expect("Should parse")) => - { - Ok(Scanners::Binaryedge) - } - Some(ref x) - if x.trim_to(2) - .eq_case(&Name::from_str("stretchoid.com.").expect("Should parse")) => - { - Ok(Scanners::Stretchoid) - } - _ => Err(()), - } -} - async fn handle_ip(pool: web::Data, ip: String) -> Result> { let query_address = ip.parse().expect("To parse"); @@ -168,41 +134,12 @@ async fn handle_ip(pool: web::Data, ip: String) -> Result { - let ip_type = if ip.contains(':') { 6 } else { 4 }; - + Ok(Some(scanner_type)) => { // use web::block to offload blocking Diesel queries without blocking server thread web::block(move || { // note that obtaining a connection from the pool is also potentially blocking let conn = &mut pool.get().unwrap(); - let scanner_row_result = Scanner::find(ip.clone(), ip_type, conn); - let scanner_row = match scanner_row_result { - Ok(scanner_row) => scanner_row, - Err(_) => return Err(None), - }; - - let scanner = if let Some(mut scanners) = scanner_row { - scanners.last_seen_at = Some(Utc::now().naive_utc()); - scanners.last_checked_at = Some(Utc::now().naive_utc()); - scanners.updated_at = Some(Utc::now().naive_utc()); - scanners - } else { - Scanner { - ip: ip, - ip_type: ip_type, - scanner_name: scanner_name.clone(), - ip_ptr: match result.result { - Some(ptr) => Some(ptr.to_string()), - None => None, - }, - created_at: Utc::now().naive_utc(), - updated_at: None, - last_seen_at: None, - last_checked_at: None, - } - }; - - match scanner.save(conn) { + match Scanner::find_or_new(query_address, scanner_type, result.result, conn) { Ok(scanner) => Ok(scanner), Err(_) => Err(None), } @@ -210,6 +147,7 @@ async fn handle_ip(pool: web::Data, ip: String) -> Result Err(None), Err(_) => Err(Some(result)), } @@ -493,23 +431,6 @@ fn get_connection(database_url: &str) -> DbPool { .expect("Could not build connection pool") } -fn get_dns_client() -> Resolver { - let server_ip = "1.1.1.1"; - - let server = NameServerConfigGroup::from_ips_clear( - &[IpAddr::from_str(server_ip).unwrap()], - 53, // Port 53 - true, - ); - - let config = ResolverConfig::from_parts(None, vec![], server); - let mut options = ResolverOpts::default(); - options.timeout = Duration::from_secs(5); - options.attempts = 1; // One try - - Resolver::new(config, options).unwrap() -} - fn plain_contents(data: String) -> HttpResponse { HttpResponse::Ok() .content_type(ContentType::plaintext()) @@ -604,8 +525,12 @@ async fn main() -> std::io::Result<()> { match ws2::listen(worker_server_address.as_str()) { Ok(mut ws_server) => { std::thread::spawn(move || { + let pool = get_connection(db_url.as_str()); + // note that obtaining a connection from the pool is also potentially blocking + let conn = &mut pool.get().unwrap(); let mut ws_server_handles = Server { clients: HashMap::new(), + new_scanners: HashMap::new(), }; info!("Worker server is listening on: {worker_server_address}"); loop { @@ -614,6 +539,7 @@ async fn main() -> std::io::Result<()> { Err(err) => error!("Processing error: {err}"), } ws_server_handles.cleanup(&ws_server); + ws_server_handles.commit(conn); } }); } diff --git a/snow-scanner/src/models.rs b/snow-scanner/src/models.rs index 34ef2c1..4a877e3 100644 --- a/snow-scanner/src/models.rs +++ b/snow-scanner/src/models.rs @@ -1,8 +1,11 @@ +use std::net::IpAddr; + use crate::Scanners; -use chrono::NaiveDateTime; +use chrono::{NaiveDateTime, Utc}; use diesel::dsl::insert_into; use diesel::prelude::*; use diesel::result::Error as DieselError; +use hickory_resolver::Name; use crate::schema::scan_tasks::dsl::scan_tasks; use crate::schema::scanners::dsl::scanners; @@ -22,6 +25,45 @@ pub struct Scanner { } impl Scanner { + pub fn find_or_new( + query_address: IpAddr, + scanner_name: Scanners, + ptr: Option, + conn: &mut MysqlConnection, + ) -> Result { + let ip_type = if query_address.is_ipv6() { 6 } else { 4 }; + let scanner_row_result = Scanner::find(query_address.to_string(), ip_type, conn); + let scanner_row = match scanner_row_result { + Ok(scanner_row) => scanner_row, + Err(_) => return Err(()), + }; + + let scanner = if let Some(mut scanner) = scanner_row { + scanner.last_seen_at = Some(Utc::now().naive_utc()); + scanner.last_checked_at = Some(Utc::now().naive_utc()); + scanner.updated_at = Some(Utc::now().naive_utc()); + scanner + } else { + Scanner { + ip: query_address.to_string(), + ip_type: ip_type, + scanner_name: scanner_name.clone(), + ip_ptr: match ptr { + Some(ptr) => Some(ptr.to_string()), + None => None, + }, + created_at: Utc::now().naive_utc(), + updated_at: None, + last_seen_at: None, + last_checked_at: None, + } + }; + match scanner.save(conn) { + Ok(scanner) => Ok(scanner), + Err(_) => Err(()), + } + } + pub fn find( ip_address: String, ip_type: u8, diff --git a/snow-scanner/src/server.rs b/snow-scanner/src/server.rs index 5678095..8560760 100644 --- a/snow-scanner/src/server.rs +++ b/snow-scanner/src/server.rs @@ -1,12 +1,21 @@ use cidr::IpCidr; +use diesel::MysqlConnection; +use hickory_resolver::Name; use log2::*; -use std::{collections::HashMap, str::FromStr}; +use std::{collections::HashMap, net::IpAddr, str::FromStr}; use ws2::{Pod, WebSocket}; -use crate::worker::modules::{Network, WorkerMessages}; +use crate::{ + worker::{ + detection::detect_scanner_from_name, + modules::{Network, WorkerMessages}, + }, + DbPool, Scanner, +}; pub struct Server { pub clients: HashMap, + pub new_scanners: HashMap, } impl Server { @@ -14,6 +23,38 @@ impl Server { // TODO: implement check not logged in &self } + + pub fn commit(&mut self, conn: &mut MysqlConnection) -> &Server { + for (name, query_address) in self.new_scanners.clone() { + let scanner_name = Name::from_str(name.as_str()).unwrap(); + + match detect_scanner_from_name(&scanner_name) { + Ok(Some(scanner_type)) => { + match Scanner::find_or_new( + query_address, + scanner_type, + Some(scanner_name), + conn, + ) { + Ok(scanner) => { + // Got saved + self.new_scanners.remove(&name); + info!( + "Saved {scanner_type}: {name} for {query_address}: {:?}", + scanner.ip_ptr + ); + } + Err(err) => { + error!("Unable to find or new {:?}", err); + } + }; + } + Ok(None) => {} + Err(_) => {} + } + } + self + } } #[derive(Debug, Clone)] @@ -90,9 +131,14 @@ impl ws2::Handler for Server { return Ok(()); } } + WorkerMessages::ScannerFoundResponse { name, address } => { + info!("Detected {name} for {address}"); + self.new_scanners.insert(name, address); + Ok(()) + } WorkerMessages::GetWorkRequest {} => { worker_reply = Some(WorkerMessages::DoWorkRequest { - neworks: vec![Network(IpCidr::from_str("127.0.0.0/31")?)], + neworks: vec![Network(IpCidr::from_str("52.189.78.0/24")?)], }); Ok(()) } diff --git a/snow-scanner/src/worker/detection.rs b/snow-scanner/src/worker/detection.rs new file mode 100644 index 0000000..8570a20 --- /dev/null +++ b/snow-scanner/src/worker/detection.rs @@ -0,0 +1,61 @@ +use std::net::IpAddr; +use std::str::FromStr; +use std::time::Duration; + +use diesel::deserialize::FromSqlRow; +use dns_ptr_resolver::ResolvedResult; + +use hickory_resolver::config::{NameServerConfigGroup, ResolverConfig, ResolverOpts}; +use hickory_resolver::{Name, Resolver}; + +#[derive(Debug, Clone, Copy, FromSqlRow)] +pub enum Scanners { + Stretchoid, + Binaryedge, + Censys, + InternetMeasurement, +} + +pub fn get_dns_client() -> Resolver { + let server_ip = "1.1.1.1"; + + let server = NameServerConfigGroup::from_ips_clear( + &[IpAddr::from_str(server_ip).unwrap()], + 53, // Port 53 + true, + ); + + let config = ResolverConfig::from_parts(None, vec![], server); + let mut options = ResolverOpts::default(); + options.timeout = Duration::from_secs(5); + options.attempts = 1; // One try + + Resolver::new(config, options).unwrap() +} + +pub fn detect_scanner(ptr_result: &ResolvedResult) -> Result, ()> { + match &ptr_result.result { + Some(name) => detect_scanner_from_name(&name), + None => Ok(None), + } +} + +pub fn detect_scanner_from_name(name: &Name) -> Result, ()> { + match name { + ref name + if name + .trim_to(2) + .eq_case(&Name::from_str("binaryedge.ninja.").expect("Should parse")) => + { + Ok(Some(Scanners::Binaryedge)) + } + ref name + if name + .trim_to(2) + .eq_case(&Name::from_str("stretchoid.com.").expect("Should parse")) => + { + Ok(Some(Scanners::Stretchoid)) + } + &_ => Ok(None), + } +} diff --git a/snow-scanner/src/worker/mod.rs b/snow-scanner/src/worker/mod.rs index 55e5f1d..5ff9510 100644 --- a/snow-scanner/src/worker/mod.rs +++ b/snow-scanner/src/worker/mod.rs @@ -1 +1,2 @@ +pub mod detection; pub mod modules; diff --git a/snow-scanner/src/worker/modules.rs b/snow-scanner/src/worker/modules.rs index 9ae42be..1cd0d26 100644 --- a/snow-scanner/src/worker/modules.rs +++ b/snow-scanner/src/worker/modules.rs @@ -1,4 +1,4 @@ -use std::str::FromStr; +use std::{net::IpAddr, str::FromStr}; use cidr::IpCidr; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -15,6 +15,8 @@ pub enum WorkerMessages { GetWorkRequest {}, #[serde(rename = "do_work")] DoWorkRequest { neworks: Vec }, + #[serde(rename = "scanner_found")] + ScannerFoundResponse { name: String, address: IpAddr }, #[serde(rename = "")] Invalid { err: String }, } diff --git a/snow-scanner/src/worker/worker.rs b/snow-scanner/src/worker/worker.rs index 43aed0d..0dae5b1 100644 --- a/snow-scanner/src/worker/worker.rs +++ b/snow-scanner/src/worker/worker.rs @@ -1,11 +1,15 @@ use std::{env, net::IpAddr}; use chrono::{Duration, NaiveDateTime, Utc}; +use detection::detect_scanner; +use dns_ptr_resolver::{get_ptr, ResolvedResult}; use log2::*; -use ws2::{Pod, WebSocket}; +use ws2::{Client, Pod, WebSocket}; +pub mod detection; pub mod modules; +use crate::detection::get_dns_client; use crate::modules::WorkerMessages; #[derive(Debug, Clone)] @@ -44,6 +48,100 @@ impl Worker { self.authenticated = true; self } + + pub fn tick(&mut self, ws_client: &Client) -> &Worker { + let mut request: Option = None; + if !self.is_authenticated() { + request = Some(WorkerMessages::AuthenticateRequest { + login: "williamdes".to_string(), + }); + } else { + if self.last_request_for_work.is_none() + || (self.last_request_for_work.is_some() + && Utc::now().naive_utc() + > (self.last_request_for_work.unwrap() + Duration::minutes(10))) + { + request = Some(WorkerMessages::GetWorkRequest {}); + } + } + + // it has a request to send + if let Some(request) = request { + self.send_request(ws_client, request); + } + self + } + + pub fn send_request(&mut self, ws_client: &Client, request: WorkerMessages) -> &Worker { + let msg_string: String = request.to_string(); + match ws_client.send(msg_string) { + Ok(_) => { + match request { + WorkerMessages::AuthenticateRequest { login } => { + self.authenticated = true; // Anyway, it will kick us if this is not success + info!("Logged in as: {login}") + } + WorkerMessages::GetWorkRequest {} => { + self.last_request_for_work = Some(Utc::now().naive_utc()); + info!("Asked for work: {:?}", self.last_request_for_work) + } + msg => error!("No implemented: {:#?}", msg), + } + } + Err(err) => error!("Unable to send: {err}"), + } + self + } + + pub fn receive_request(&mut self, ws: &WebSocket, server_request: WorkerMessages) -> &Worker { + match server_request { + WorkerMessages::DoWorkRequest { neworks } => { + info!("Should work on: {:?}", neworks); + for cidr in neworks { + let cidr = cidr.0; + info!("Picking up: {cidr}"); + info!("Range, from {} to {}", cidr.first(), cidr.last()); + let addresses = cidr.iter().addresses(); + let count = addresses.count(); + let mut current = 0; + for addr in addresses { + let client = get_dns_client(); + match get_ptr(addr, client) { + Ok(result) => match detect_scanner(&result) { + Ok(Some(scanner_name)) => { + info!("Detected {:?} for {addr}", scanner_name); + let request = WorkerMessages::ScannerFoundResponse { + name: result.result.unwrap().to_string(), + address: addr, + }; + let msg_string: String = request.to_string(); + match ws.send(msg_string) { + Ok(_) => {} + Err(err) => error!("Unable to send scanner result: {err}"), + } + } + Ok(None) => {} + + Err(err) => error!("Error detecting for {addr}: {:?}", err), + }, + Err(err) => { + //debug!("Error processing {addr}: {err}") + } + }; + + current += 1; + } + } + } + WorkerMessages::AuthenticateRequest { .. } + | WorkerMessages::ScannerFoundResponse { .. } + | WorkerMessages::GetWorkRequest {} + | WorkerMessages::Invalid { .. } => { + error!("Unable to understand message: {:?}", server_request); + } + } + self + } } impl ws2::Handler for Worker { @@ -58,20 +156,9 @@ impl ws2::Handler for Worker { } fn on_message(&mut self, ws: &WebSocket, msg: String) -> Pod { - let worker_request: WorkerMessages = msg.clone().into(); - - match worker_request { - WorkerMessages::DoWorkRequest { neworks } => { - info!("Should work on: {:?}", neworks); - Ok(()) - } - WorkerMessages::AuthenticateRequest { .. } - | WorkerMessages::GetWorkRequest {} - | WorkerMessages::Invalid { .. } => { - error!("Unable to understand message: {msg}, {:?}", worker_request); - Ok(()) - } - } + let server_request: WorkerMessages = msg.clone().into(); + self.receive_request(ws, server_request); + Ok(()) } } @@ -97,44 +184,11 @@ fn main() -> () { loop { match ws_client.process(&mut worker, 0.5) { - Ok(_) => {} + Ok(_) => { + worker.tick(&ws_client); + } Err(err) => error!("Processing error: {err}"), } - let mut request: Option = None; - if !worker.is_authenticated() { - request = Some(WorkerMessages::AuthenticateRequest { - login: "williamdes".to_string(), - }); - } else { - if worker.last_request_for_work.is_none() - || (worker.last_request_for_work.is_some() - && Utc::now().naive_utc() - > (worker.last_request_for_work.unwrap() + Duration::minutes(10))) - { - request = Some(WorkerMessages::GetWorkRequest {}); - } - } - - // it has a request to send - if let Some(request) = request { - let msg_string: String = request.to_string(); - match ws_client.send(msg_string) { - Ok(_) => { - match request { - WorkerMessages::AuthenticateRequest { login } => { - worker.authenticated = true; // Anyway, it will kick us if this is not success - info!("Logged in as: {login}") - } - WorkerMessages::GetWorkRequest {} => { - worker.last_request_for_work = Some(Utc::now().naive_utc()); - info!("Asked for work: {:?}", worker.last_request_for_work) - } - msg => error!("No implemented: {:#?}", msg), - } - } - Err(err) => error!("Unable to connect to {url}: {err}"), - } - } } } Err(err) => error!("Unable to connect to {url}: {err}"), @@ -154,11 +208,7 @@ thread::spawn(move || { let mut rows = stmt.query(named_params! {}).unwrap(); println!("Waiting for jobs"); while let Some(row) = rows.next().unwrap() { - let task_group_id: String = row.get(0).unwrap(); - let cidr_str: String = row.get(1).unwrap(); - let cidr: IpCidr = cidr_str.parse().expect("Should parse CIDR"); - println!("Picking up: {} -> {}", task_group_id, cidr); - println!("Range, from {} to {}", cidr.first(), cidr.last()); + let _ = conn.execute("UPDATE scan_tasks SET updated_at = :updated_at, started_at = :started_at WHERE cidr = :cidr AND task_group_id = :task_group_id", named_params! { ":updated_at": Utc::now().naive_utc().to_string(), @@ -166,15 +216,7 @@ thread::spawn(move || { ":cidr": cidr_str, ":task_group_id": task_group_id, }).unwrap(); - let addresses = cidr.iter().addresses(); - let count = addresses.count(); - let mut current = 0; - for addr in addresses { - match handle_ip(conn, addr.to_string()) { - Ok(scanner) => println!("Processed {}", scanner.ip), - Err(_) => println!("Processed {}", addr), - } - current += 1; + if (current / count) % 10 == 0 { let _ = conn.execute("UPDATE scan_tasks SET updated_at = :updated_at, still_processing_at = :still_processing_at WHERE cidr = :cidr AND task_group_id = :task_group_id", named_params! { @@ -193,8 +235,5 @@ thread::spawn(move || { ":task_group_id": task_group_id, }).unwrap(); } - - let two_hundred_millis = Duration::from_millis(500); - thread::sleep(two_hundred_millis); } });*/