From 8bf201b3e5102369f8832e752de9d1a57fff9474 Mon Sep 17 00:00:00 2001 From: William Desportes Date: Thu, 3 Oct 2024 13:56:06 +0200 Subject: [PATCH] Better websocket processing --- snow-scanner/src/main.rs | 13 ++++++------- snow-scanner/src/server.rs | 14 ++++++++++++-- snow-scanner/src/worker/worker.rs | 30 ++++++++++++++++++++++++------ 3 files changed, 42 insertions(+), 15 deletions(-) diff --git a/snow-scanner/src/main.rs b/snow-scanner/src/main.rs index 13ff24e..a2d32b3 100644 --- a/snow-scanner/src/main.rs +++ b/snow-scanner/src/main.rs @@ -3,7 +3,7 @@ use chrono::{NaiveDateTime, Utc}; #[macro_use] extern crate rocket; -use rocket::{fairing::AdHoc, futures::SinkExt, trace::error, Build, Rocket, State}; +use rocket::{fairing::AdHoc, trace::error, Build, Rocket, State}; use rocket_db_pools::{ rocket::{ figment::{ @@ -479,24 +479,23 @@ async fn pong() -> PlainText { #[get("/ws")] pub async fn ws(ws: WebSocket) -> rocket_ws::Channel<'static> { - use rocket::futures::StreamExt; use rocket::tokio; use rocket_ws as ws; use std::time::Duration; ws.channel(move |mut stream: ws::stream::DuplexStream| { Box::pin(async move { - let mut interval = tokio::time::interval(Duration::from_secs(10)); + let mut interval = tokio::time::interval(Duration::from_secs(60)); tokio::spawn(async move { let mut worker = Worker::initial(&mut stream); loop { tokio::select! { _ = interval.tick() => { - // Send message every 10 seconds - //let reading = get_latest_readings().await.unwrap(); - //let _ = stream.send(ws::Message::Text(json!(reading).to_string())).await; - // info!("Sent message"); + // Send message every X seconds + if let Ok(true) = worker.tick().await { + break; + } } Ok(false) = worker.poll() => { // Continue the loop diff --git a/snow-scanner/src/server.rs b/snow-scanner/src/server.rs index d05b4ac..a1e37b8 100644 --- a/snow-scanner/src/server.rs +++ b/snow-scanner/src/server.rs @@ -75,6 +75,16 @@ impl<'a> Worker<'a> { self.stream.next() } + pub async fn tick(&mut self) -> Result { + match self.send(rocket_ws::Message::Ping(vec![])).await { + Ok(_) => Ok(false), + Err(err) => { + error!("Processing error: {err}"); + Ok(true) // Break processing loop + } + } + } + pub async fn poll(&mut self) -> Result { let message = self.next(); @@ -97,7 +107,7 @@ impl<'a> Worker<'a> { reason: "Client disconected".to_string().into(), }; let _ = self.stream.close(Some(close_frame)).await; - return Ok(true); + return Ok(true); // Break processing loop } rocket_ws::Message::Ping(ping_data) => { match self.send(rocket_ws::Message::Pong(ping_data)).await { @@ -123,7 +133,7 @@ impl<'a> Worker<'a> { }; let _ = self.stream.close(Some(close_frame)).await; // The connection is closed by the client - Ok(true) + Ok(true) // Break processing loop } None => Ok(false), } diff --git a/snow-scanner/src/worker/worker.rs b/snow-scanner/src/worker/worker.rs index 0ac4589..7d83a89 100644 --- a/snow-scanner/src/worker/worker.rs +++ b/snow-scanner/src/worker/worker.rs @@ -1,3 +1,4 @@ +use std::any::Any; use std::{env, net::IpAddr}; use chrono::{Duration, NaiveDateTime, Utc}; @@ -52,7 +53,7 @@ impl Into for WebSocket> { } impl Worker { - pub fn wait_for_messages(&mut self) -> Result<(), Error> { + pub fn wait_for_messages(&mut self) -> Result { self.tick(); match self.ws.read() { Ok(server_request) => { @@ -66,15 +67,18 @@ impl Worker { Message::Pong(_) => {} Message::Frame(_) => {} Message::Binary(_) => {} - Message::Close(_) => {} + Message::Close(_) => { + return Ok(true); // Break the processing loop + } }; - Ok(()) + Ok(false) } Err(err) => { match err { // Silently drop the error: Processing error: IO error: Resource temporarily unavailable (os error 11) // That occurs when no messages are to be read - Error::Io(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()), + Error::Io(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(false), + Error::Io(ref e) if e.kind() == std::io::ErrorKind::NotConnected => Ok(true), // Break the processing loop _ => Err(err), } } @@ -216,8 +220,22 @@ fn main() -> () { let mut worker: Worker = socket.into(); loop { match worker.wait_for_messages() { - Ok(_) => {} - Err(err) => error!("Processing error: {err}"), + Ok(true) => { + error!("Stopping processing"); + break; + } + Ok(false) => { + // Keep processing + } + Err(tungstenite::Error::ConnectionClosed) => { + error!("Stopping processing: connection closed"); + break; + } + Err(tungstenite::Error::AlreadyClosed) => { + error!("Stopping processing: connection already closed"); + break; + } + Err(err) => error!("Processing error: {err} -> {:?}", err), } } }