Move worker to tungstenite and add WORKER_NAME ENV

This commit is contained in:
2024-09-28 01:06:40 +02:00
parent cad1073448
commit 04aea8558f
2 changed files with 72 additions and 27 deletions

View File

@ -13,7 +13,7 @@ path = "worker.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
ws2 = "0.2.5" tungstenite = { version = "0.24.0", default-features = true, features = ["native-tls"] }
log2 = "0.1.11" log2 = "0.1.11"
diesel = { version = "2.2.0", default-features = false, features = [] } diesel = { version = "2.2.0", default-features = false, features = [] }
dns-ptr-resolver = {git = "https://github.com/wdes/dns-ptr-resolver.git"} dns-ptr-resolver = {git = "https://github.com/wdes/dns-ptr-resolver.git"}

View File

@ -2,9 +2,10 @@ use std::{env, net::IpAddr};
use chrono::{Duration, NaiveDateTime, Utc}; use chrono::{Duration, NaiveDateTime, Utc};
use detection::detect_scanner; use detection::detect_scanner;
use dns_ptr_resolver::{get_ptr, ResolvedResult}; use dns_ptr_resolver::get_ptr;
use log2::*; use log2::*;
use ws2::{Client, Pod, WebSocket}; use tungstenite::stream::MaybeTlsStream;
use tungstenite::{connect, Error, Message, WebSocket};
pub mod detection; pub mod detection;
pub mod modules; pub mod modules;
@ -17,20 +18,66 @@ pub struct IpToResolve {
pub address: IpAddr, pub address: IpAddr,
} }
#[derive(Debug, Clone)] #[derive(Debug)]
pub struct Worker { pub struct Worker {
pub authenticated: bool, pub authenticated: bool,
pub tasks: Vec<IpToResolve>, pub tasks: Vec<IpToResolve>,
pub last_request_for_work: Option<NaiveDateTime>, pub last_request_for_work: Option<NaiveDateTime>,
ws: WebSocket<MaybeTlsStream<std::net::TcpStream>>,
} }
impl Worker { impl Into<Worker> for WebSocket<MaybeTlsStream<std::net::TcpStream>> {
pub fn initial() -> Worker { fn into(self) -> Worker {
info!("New worker"); let wait_time = std::time::Duration::from_secs(1);
match self.get_ref() {
tungstenite::stream::MaybeTlsStream::Plain(stream) => stream
.set_read_timeout(Some(wait_time))
.expect("set_nonblocking to work"),
tungstenite::stream::MaybeTlsStream::NativeTls(stream) => {
stream
.get_ref()
.set_read_timeout(Some(wait_time))
.expect("set_nonblocking to work");
()
}
_ => unimplemented!(),
};
Worker { Worker {
authenticated: false, authenticated: false,
tasks: vec![], tasks: vec![],
last_request_for_work: None, last_request_for_work: None,
ws: self,
}
}
}
impl Worker {
pub fn wait_for_messages(&mut self) -> Result<(), Error> {
self.tick();
match self.ws.read() {
Ok(server_request) => {
match server_request {
Message::Text(msg_string) => {
self.receive_request(msg_string.into());
}
Message::Ping(data) => {
let _ = self.ws.write(Message::Pong(data));
}
Message::Pong(_) => {}
Message::Frame(_) => {}
Message::Binary(_) => {}
Message::Close(_) => {}
};
Ok(())
}
Err(err) => {
match err {
// Silently drop the error: Processing error: IO error: Resource temporarily unavailable (os error 11)
// That occurs when no messages are to be read
Error::Io(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()),
_ => Err(err),
}
}
} }
} }
@ -49,11 +96,11 @@ impl Worker {
self self
} }
pub fn tick(&mut self, ws_client: &Client) -> &Worker { pub fn tick(&mut self) -> () {
let mut request: Option<WorkerMessages> = None; let mut request: Option<WorkerMessages> = None;
if !self.is_authenticated() { if !self.is_authenticated() {
request = Some(WorkerMessages::AuthenticateRequest { request = Some(WorkerMessages::AuthenticateRequest {
login: "williamdes".to_string(), login: env::var("WORKER_NAME").expect("The ENV WORKER_NAME should be set"),
}); });
} else { } else {
if self.last_request_for_work.is_none() if self.last_request_for_work.is_none()
@ -67,14 +114,13 @@ impl Worker {
// it has a request to send // it has a request to send
if let Some(request) = request { if let Some(request) = request {
self.send_request(ws_client, request); self.send_request(request);
} }
self
} }
pub fn send_request(&mut self, ws_client: &Client, request: WorkerMessages) -> &Worker { pub fn send_request(&mut self, request: WorkerMessages) -> &Worker {
let msg_string: String = request.to_string(); let msg_string: String = request.to_string();
match ws_client.send(msg_string) { match self.ws.send(Message::Text(msg_string)) {
Ok(_) => { Ok(_) => {
match request { match request {
WorkerMessages::AuthenticateRequest { login } => { WorkerMessages::AuthenticateRequest { login } => {
@ -93,7 +139,7 @@ impl Worker {
self self
} }
pub fn receive_request(&mut self, ws: &WebSocket, server_request: WorkerMessages) -> &Worker { pub fn receive_request(&mut self, server_request: WorkerMessages) -> &Worker {
match server_request { match server_request {
WorkerMessages::DoWorkRequest { neworks } => { WorkerMessages::DoWorkRequest { neworks } => {
info!("Should work on: {:?}", neworks); info!("Should work on: {:?}", neworks);
@ -115,7 +161,7 @@ impl Worker {
address: addr, address: addr,
}; };
let msg_string: String = request.to_string(); let msg_string: String = request.to_string();
match ws.send(msg_string) { match self.ws.send(Message::Text(msg_string)) {
Ok(_) => {} Ok(_) => {}
Err(err) => error!("Unable to send scanner result: {err}"), Err(err) => error!("Unable to send scanner result: {err}"),
} }
@ -124,7 +170,7 @@ impl Worker {
Err(err) => error!("Error detecting for {addr}: {:?}", err), Err(err) => error!("Error detecting for {addr}: {:?}", err),
}, },
Err(err) => { Err(_) => {
//debug!("Error processing {addr}: {err}") //debug!("Error processing {addr}: {err}")
} }
}; };
@ -144,7 +190,7 @@ impl Worker {
} }
} }
impl ws2::Handler for Worker { /*impl ws2::Handler for Worker {
fn on_open(&mut self, ws: &WebSocket) -> Pod { fn on_open(&mut self, ws: &WebSocket) -> Pod {
info!("Connected to: {ws}, starting to work"); info!("Connected to: {ws}, starting to work");
Ok(()) Ok(())
@ -160,7 +206,7 @@ impl ws2::Handler for Worker {
self.receive_request(ws, server_request); self.receive_request(ws, server_request);
Ok(()) Ok(())
} }
} }*/
fn main() -> () { fn main() -> () {
let _log2 = log2::stdout() let _log2 = log2::stdout()
@ -175,21 +221,20 @@ fn main() -> () {
Ok(worker_url) => worker_url, Ok(worker_url) => worker_url,
Err(_) => "ws://127.0.0.1:8800".to_string(), Err(_) => "ws://127.0.0.1:8800".to_string(),
}; };
let mut worker = Worker::initial();
match ws2::connect(&url) { match connect(&url) {
Ok(mut ws_client) => { Ok((socket, response)) => {
let connected = ws_client.is_open(); let connected = response.status() == 101;
if connected { if connected {
info!("Connected to: {url}"); info!("Connected to: {url}");
} else { } else {
info!("Connecting to: {url}"); info!("Connecting replied {}: {url}", response.status());
} }
let mut worker: Worker = socket.into();
loop { loop {
match ws_client.process(&mut worker, 0.5) { match worker.wait_for_messages() {
Ok(_) => { Ok(_) => {}
worker.tick(&ws_client);
}
Err(err) => error!("Processing error: {err}"), Err(err) => error!("Processing error: {err}"),
} }
} }