Better websocket processing

This commit is contained in:
2024-10-03 13:56:06 +02:00
parent fd4d43596f
commit 8bf201b3e5
3 changed files with 42 additions and 15 deletions

View File

@ -3,7 +3,7 @@ use chrono::{NaiveDateTime, Utc};
#[macro_use]
extern crate rocket;
use rocket::{fairing::AdHoc, futures::SinkExt, trace::error, Build, Rocket, State};
use rocket::{fairing::AdHoc, trace::error, Build, Rocket, State};
use rocket_db_pools::{
rocket::{
figment::{
@ -479,24 +479,23 @@ async fn pong() -> PlainText {
#[get("/ws")]
pub async fn ws(ws: WebSocket) -> rocket_ws::Channel<'static> {
use rocket::futures::StreamExt;
use rocket::tokio;
use rocket_ws as ws;
use std::time::Duration;
ws.channel(move |mut stream: ws::stream::DuplexStream| {
Box::pin(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
let mut interval = tokio::time::interval(Duration::from_secs(60));
tokio::spawn(async move {
let mut worker = Worker::initial(&mut stream);
loop {
tokio::select! {
_ = interval.tick() => {
// Send message every 10 seconds
//let reading = get_latest_readings().await.unwrap();
//let _ = stream.send(ws::Message::Text(json!(reading).to_string())).await;
// info!("Sent message");
// Send message every X seconds
if let Ok(true) = worker.tick().await {
break;
}
}
Ok(false) = worker.poll() => {
// Continue the loop

View File

@ -75,6 +75,16 @@ impl<'a> Worker<'a> {
self.stream.next()
}
pub async fn tick(&mut self) -> Result<bool, ()> {
match self.send(rocket_ws::Message::Ping(vec![])).await {
Ok(_) => Ok(false),
Err(err) => {
error!("Processing error: {err}");
Ok(true) // Break processing loop
}
}
}
pub async fn poll(&mut self) -> Result<bool, ()> {
let message = self.next();
@ -97,7 +107,7 @@ impl<'a> Worker<'a> {
reason: "Client disconected".to_string().into(),
};
let _ = self.stream.close(Some(close_frame)).await;
return Ok(true);
return Ok(true); // Break processing loop
}
rocket_ws::Message::Ping(ping_data) => {
match self.send(rocket_ws::Message::Pong(ping_data)).await {
@ -123,7 +133,7 @@ impl<'a> Worker<'a> {
};
let _ = self.stream.close(Some(close_frame)).await;
// The connection is closed by the client
Ok(true)
Ok(true) // Break processing loop
}
None => Ok(false),
}

View File

@ -1,3 +1,4 @@
use std::any::Any;
use std::{env, net::IpAddr};
use chrono::{Duration, NaiveDateTime, Utc};
@ -52,7 +53,7 @@ impl Into<Worker> for WebSocket<MaybeTlsStream<std::net::TcpStream>> {
}
impl Worker {
pub fn wait_for_messages(&mut self) -> Result<(), Error> {
pub fn wait_for_messages(&mut self) -> Result<bool, Error> {
self.tick();
match self.ws.read() {
Ok(server_request) => {
@ -66,15 +67,18 @@ impl Worker {
Message::Pong(_) => {}
Message::Frame(_) => {}
Message::Binary(_) => {}
Message::Close(_) => {}
Message::Close(_) => {
return Ok(true); // Break the processing loop
}
};
Ok(())
Ok(false)
}
Err(err) => {
match err {
// Silently drop the error: Processing error: IO error: Resource temporarily unavailable (os error 11)
// That occurs when no messages are to be read
Error::Io(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()),
Error::Io(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(false),
Error::Io(ref e) if e.kind() == std::io::ErrorKind::NotConnected => Ok(true), // Break the processing loop
_ => Err(err),
}
}
@ -216,8 +220,22 @@ fn main() -> () {
let mut worker: Worker = socket.into();
loop {
match worker.wait_for_messages() {
Ok(_) => {}
Err(err) => error!("Processing error: {err}"),
Ok(true) => {
error!("Stopping processing");
break;
}
Ok(false) => {
// Keep processing
}
Err(tungstenite::Error::ConnectionClosed) => {
error!("Stopping processing: connection closed");
break;
}
Err(tungstenite::Error::AlreadyClosed) => {
error!("Stopping processing: connection already closed");
break;
}
Err(err) => error!("Processing error: {err} -> {:?}", err),
}
}
}