diff --git a/snow-scanner/src/worker/Cargo.toml b/snow-scanner/src/worker/Cargo.toml index 16d4be6..9e346d8 100644 --- a/snow-scanner/src/worker/Cargo.toml +++ b/snow-scanner/src/worker/Cargo.toml @@ -13,7 +13,7 @@ path = "worker.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -ws2 = "0.2.5" +tungstenite = { version = "0.24.0", default-features = true, features = ["native-tls"] } log2 = "0.1.11" diesel = { version = "2.2.0", default-features = false, features = [] } dns-ptr-resolver = {git = "https://github.com/wdes/dns-ptr-resolver.git"} diff --git a/snow-scanner/src/worker/worker.rs b/snow-scanner/src/worker/worker.rs index 7a1ffb8..7f99685 100644 --- a/snow-scanner/src/worker/worker.rs +++ b/snow-scanner/src/worker/worker.rs @@ -2,9 +2,10 @@ use std::{env, net::IpAddr}; use chrono::{Duration, NaiveDateTime, Utc}; use detection::detect_scanner; -use dns_ptr_resolver::{get_ptr, ResolvedResult}; +use dns_ptr_resolver::get_ptr; use log2::*; -use ws2::{Client, Pod, WebSocket}; +use tungstenite::stream::MaybeTlsStream; +use tungstenite::{connect, Error, Message, WebSocket}; pub mod detection; pub mod modules; @@ -17,20 +18,66 @@ pub struct IpToResolve { pub address: IpAddr, } -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct Worker { pub authenticated: bool, pub tasks: Vec, pub last_request_for_work: Option, + ws: WebSocket>, } -impl Worker { - pub fn initial() -> Worker { - info!("New worker"); +impl Into for WebSocket> { + fn into(self) -> Worker { + let wait_time = std::time::Duration::from_secs(1); + match self.get_ref() { + tungstenite::stream::MaybeTlsStream::Plain(stream) => stream + .set_read_timeout(Some(wait_time)) + .expect("set_nonblocking to work"), + tungstenite::stream::MaybeTlsStream::NativeTls(stream) => { + stream + .get_ref() + .set_read_timeout(Some(wait_time)) + .expect("set_nonblocking to work"); + () + } + _ => unimplemented!(), + }; Worker { authenticated: false, tasks: vec![], last_request_for_work: None, + ws: self, + } + } +} + +impl Worker { + pub fn wait_for_messages(&mut self) -> Result<(), Error> { + self.tick(); + match self.ws.read() { + Ok(server_request) => { + match server_request { + Message::Text(msg_string) => { + self.receive_request(msg_string.into()); + } + Message::Ping(data) => { + let _ = self.ws.write(Message::Pong(data)); + } + Message::Pong(_) => {} + Message::Frame(_) => {} + Message::Binary(_) => {} + Message::Close(_) => {} + }; + Ok(()) + } + Err(err) => { + match err { + // Silently drop the error: Processing error: IO error: Resource temporarily unavailable (os error 11) + // That occurs when no messages are to be read + Error::Io(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()), + _ => Err(err), + } + } } } @@ -49,11 +96,11 @@ impl Worker { self } - pub fn tick(&mut self, ws_client: &Client) -> &Worker { + pub fn tick(&mut self) -> () { let mut request: Option = None; if !self.is_authenticated() { request = Some(WorkerMessages::AuthenticateRequest { - login: "williamdes".to_string(), + login: env::var("WORKER_NAME").expect("The ENV WORKER_NAME should be set"), }); } else { if self.last_request_for_work.is_none() @@ -67,14 +114,13 @@ impl Worker { // it has a request to send if let Some(request) = request { - self.send_request(ws_client, request); + self.send_request(request); } - self } - pub fn send_request(&mut self, ws_client: &Client, request: WorkerMessages) -> &Worker { + pub fn send_request(&mut self, request: WorkerMessages) -> &Worker { let msg_string: String = request.to_string(); - match ws_client.send(msg_string) { + match self.ws.send(Message::Text(msg_string)) { Ok(_) => { match request { WorkerMessages::AuthenticateRequest { login } => { @@ -93,7 +139,7 @@ impl Worker { self } - pub fn receive_request(&mut self, ws: &WebSocket, server_request: WorkerMessages) -> &Worker { + pub fn receive_request(&mut self, server_request: WorkerMessages) -> &Worker { match server_request { WorkerMessages::DoWorkRequest { neworks } => { info!("Should work on: {:?}", neworks); @@ -115,7 +161,7 @@ impl Worker { address: addr, }; let msg_string: String = request.to_string(); - match ws.send(msg_string) { + match self.ws.send(Message::Text(msg_string)) { Ok(_) => {} Err(err) => error!("Unable to send scanner result: {err}"), } @@ -124,7 +170,7 @@ impl Worker { Err(err) => error!("Error detecting for {addr}: {:?}", err), }, - Err(err) => { + Err(_) => { //debug!("Error processing {addr}: {err}") } }; @@ -144,7 +190,7 @@ impl Worker { } } -impl ws2::Handler for Worker { +/*impl ws2::Handler for Worker { fn on_open(&mut self, ws: &WebSocket) -> Pod { info!("Connected to: {ws}, starting to work"); Ok(()) @@ -160,7 +206,7 @@ impl ws2::Handler for Worker { self.receive_request(ws, server_request); Ok(()) } -} +}*/ fn main() -> () { let _log2 = log2::stdout() @@ -175,21 +221,20 @@ fn main() -> () { Ok(worker_url) => worker_url, Err(_) => "ws://127.0.0.1:8800".to_string(), }; - let mut worker = Worker::initial(); - match ws2::connect(&url) { - Ok(mut ws_client) => { - let connected = ws_client.is_open(); + + match connect(&url) { + Ok((socket, response)) => { + let connected = response.status() == 101; if connected { info!("Connected to: {url}"); } else { - info!("Connecting to: {url}"); + info!("Connecting replied {}: {url}", response.status()); } + let mut worker: Worker = socket.into(); loop { - match ws_client.process(&mut worker, 0.5) { - Ok(_) => { - worker.tick(&ws_client); - } + match worker.wait_for_messages() { + Ok(_) => {} Err(err) => error!("Processing error: {err}"), } }