Compare commits
15 Commits
b731f6dc21
...
dbbbdc4818
Author | SHA1 | Date | |
---|---|---|---|
dbbbdc4818
|
|||
75dc88bcc1
|
|||
bc3f3fe34c
|
|||
32d1abdcee
|
|||
f589d4c11e
|
|||
e5c3b38121
|
|||
2a7ea4c969
|
|||
8bf201b3e5
|
|||
fd4d43596f
|
|||
25df2642e9
|
|||
04aea8558f
|
|||
cad1073448
|
|||
c01177e4c8
|
|||
d84918851b
|
|||
36468e4baf
|
@ -3,7 +3,7 @@ name = "snow-scanner"
|
||||
version = "0.1.0"
|
||||
authors = ["William Desportes <williamdes@wdes.fr>"]
|
||||
edition = "2021"
|
||||
rust-version = "1.78.0" # MSRV
|
||||
rust-version = "1.81.0" # MSRV
|
||||
description = "A program to scan internet and find scanners"
|
||||
homepage = "https://github.com/wdes/snow-scanner/tree/v1.2.0-dev#readme"
|
||||
repository = "https://github.com/wdes/snow-scanner"
|
||||
@ -29,24 +29,35 @@ maintenance = { status = "passively-maintained" }
|
||||
name = "snow-scanner"
|
||||
path = "src/main.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "snow-scanner-worker"
|
||||
path = "src/worker/worker.rs"
|
||||
[workspace]
|
||||
|
||||
members = [
|
||||
"src/worker"
|
||||
]
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[features]
|
||||
|
||||
# Enable unstable features, requires nightly
|
||||
# Currently only used to enable rusts official ip support
|
||||
unstable = []
|
||||
|
||||
[dependencies]
|
||||
log2 = "0.1.11"
|
||||
ws2 = "0.2.5"
|
||||
actix-web = "4"
|
||||
actix-files = "0.6.6"
|
||||
rocket = { git = "https://github.com/rwf2/Rocket/", rev = "3bf9ef02d6e803fe9f753777f5a829dda6d2453d"}
|
||||
rocket_ws = { git = "https://github.com/rwf2/Rocket/", rev = "3bf9ef02d6e803fe9f753777f5a829dda6d2453d"}
|
||||
rocket_db_pools = { git = "https://github.com/rwf2/Rocket/", rev = "3bf9ef02d6e803fe9f753777f5a829dda6d2453d", default-features = false, features = ["diesel_mysql"] }
|
||||
|
||||
# mariadb-dev on Alpine
|
||||
# "mysqlclient-src" "mysql_backend"
|
||||
diesel = { version = "2.2.0", default-features = false, features = ["mysql", "chrono", "uuid", "r2d2"] }
|
||||
diesel = { version = "^2", default-features = false, features = ["mysql", "chrono", "uuid"] }
|
||||
|
||||
ws = { package = "rocket_ws", version = "0.1.1" }
|
||||
|
||||
dns-ptr-resolver = {git = "https://github.com/wdes/dns-ptr-resolver.git"}
|
||||
hickory-resolver = { version = "0.24.1", default-features = false, features = ["tokio-runtime", "dns-over-h3", "dns-over-https", "dns-over-quic"]}
|
||||
chrono = "0.4.38"
|
||||
uuid = { version = "1.10.0", default-features = false, features = ["v7", "serde", "std"] }
|
||||
cidr = "0.2.2"
|
||||
serde = "1.0.210"
|
||||
cidr = "0.3.0"
|
||||
serde = { version = "1.0.210", features = ["derive"] }
|
||||
serde_json = "1.0.128"
|
||||
|
136
snow-scanner/src/event_bus.rs
Normal file
136
snow-scanner/src/event_bus.rs
Normal file
@ -0,0 +1,136 @@
|
||||
use std::{net::IpAddr, str::FromStr};
|
||||
|
||||
use crate::{
|
||||
worker::detection::{detect_scanner_from_name, validate_ip},
|
||||
DbConnection, SnowDb,
|
||||
};
|
||||
use hickory_resolver::Name;
|
||||
use rocket::futures::channel::mpsc as rocket_mpsc;
|
||||
use rocket::futures::StreamExt;
|
||||
use rocket::tokio;
|
||||
|
||||
use crate::Scanner;
|
||||
|
||||
/// Handles all the raw events being streamed from balancers and parses and filters them into only the events we care about.
|
||||
pub struct EventBus {
|
||||
events_rx: rocket_mpsc::Receiver<EventBusWriterEvent>,
|
||||
events_tx: rocket_mpsc::Sender<EventBusWriterEvent>,
|
||||
bus_tx: tokio::sync::broadcast::Sender<EventBusEvent>,
|
||||
}
|
||||
|
||||
impl EventBus {
|
||||
pub fn new() -> Self {
|
||||
let (events_tx, events_rx) = rocket_mpsc::channel(100);
|
||||
let (bus_tx, _) = tokio::sync::broadcast::channel(100);
|
||||
Self {
|
||||
events_rx,
|
||||
events_tx,
|
||||
bus_tx,
|
||||
}
|
||||
}
|
||||
|
||||
// db: &Connection<SnowDb>
|
||||
pub async fn run(&mut self, mut conn: DbConnection<SnowDb>) {
|
||||
info!("EventBus started");
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(event) = self.events_rx.next() => {
|
||||
self.handle_event(event, &mut conn).await;
|
||||
}
|
||||
else => {
|
||||
warn!("EventBus stopped");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_event(&self, event: EventBusWriterEvent, db: &mut DbConnection<SnowDb>) {
|
||||
info!("Received event");
|
||||
if self.bus_tx.receiver_count() == 0 {
|
||||
return;
|
||||
}
|
||||
match event {
|
||||
EventBusWriterEvent::ScannerFoundResponse { name, address } => {
|
||||
let ip: IpAddr = address.into();
|
||||
if !validate_ip(ip) {
|
||||
error!("Invalid IP address: {ip}");
|
||||
return;
|
||||
}
|
||||
let name = Name::from_str(name.as_str()).unwrap();
|
||||
match detect_scanner_from_name(&name) {
|
||||
Ok(Some(scanner_type)) => {
|
||||
match Scanner::find_or_new(ip, scanner_type, Some(name), db).await {
|
||||
Ok(scanner) => {
|
||||
let _ = scanner.save(db).await;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Error find or save: {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
error!("No name detected for: {:?}", name);
|
||||
}
|
||||
|
||||
Err(err) => {
|
||||
error!("No name detected error: {:?}", err);
|
||||
}
|
||||
};
|
||||
}
|
||||
EventBusWriterEvent::BroadcastMessage(msg) => match self.bus_tx.send(msg) {
|
||||
Ok(count) => {
|
||||
info!("Event sent to {count} subscribers");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Error sending event to subscribers: {}", err);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn subscriber(&self) -> EventBusSubscriber {
|
||||
EventBusSubscriber::new(self.bus_tx.clone())
|
||||
}
|
||||
|
||||
pub fn writer(&self) -> EventBusWriter {
|
||||
EventBusWriter::new(self.events_tx.clone())
|
||||
}
|
||||
}
|
||||
|
||||
pub type EventBusEvent = rocket_ws::Message;
|
||||
|
||||
/// Enables subscriptions to the event bus
|
||||
pub struct EventBusSubscriber {
|
||||
bus_tx: tokio::sync::broadcast::Sender<EventBusEvent>,
|
||||
}
|
||||
|
||||
/// Enables subscriptions to the event bus
|
||||
pub struct EventBusWriter {
|
||||
bus_tx: rocket_mpsc::Sender<EventBusWriterEvent>,
|
||||
}
|
||||
|
||||
pub enum EventBusWriterEvent {
|
||||
BroadcastMessage(rocket_ws::Message),
|
||||
ScannerFoundResponse { name: String, address: IpAddr },
|
||||
}
|
||||
|
||||
impl EventBusWriter {
|
||||
pub fn new(bus_tx: rocket_mpsc::Sender<EventBusWriterEvent>) -> Self {
|
||||
Self { bus_tx }
|
||||
}
|
||||
|
||||
pub fn write(&self) -> rocket_mpsc::Sender<EventBusWriterEvent> {
|
||||
self.bus_tx.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl EventBusSubscriber {
|
||||
pub fn new(bus_tx: tokio::sync::broadcast::Sender<EventBusEvent>) -> Self {
|
||||
Self { bus_tx }
|
||||
}
|
||||
|
||||
pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<EventBusEvent> {
|
||||
self.bus_tx.subscribe()
|
||||
}
|
||||
}
|
@ -1,40 +1,101 @@
|
||||
use actix_files::NamedFile;
|
||||
use actix_web::error::ErrorInternalServerError;
|
||||
use actix_web::http::header::ContentType;
|
||||
use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
|
||||
use log2::*;
|
||||
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
use diesel::deserialize::{self};
|
||||
use diesel::mysql::{Mysql, MysqlValue};
|
||||
use diesel::sql_types::Text;
|
||||
|
||||
use diesel::r2d2::ConnectionManager;
|
||||
use diesel::r2d2::Pool;
|
||||
use worker::detection::{detect_scanner, get_dns_client, Scanners};
|
||||
#[macro_use]
|
||||
extern crate rocket;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
use std::{env, fmt};
|
||||
use cidr::IpCidr;
|
||||
use event_bus::{EventBusSubscriber, EventBusWriter, EventBusWriterEvent};
|
||||
use rocket::{
|
||||
fairing::AdHoc,
|
||||
futures::SinkExt,
|
||||
http::Status,
|
||||
request::{FromParam, FromRequest, Outcome, Request},
|
||||
trace::error,
|
||||
Rocket, State,
|
||||
};
|
||||
use rocket_db_pools::{
|
||||
rocket::{
|
||||
figment::{
|
||||
util::map,
|
||||
value::{Map, Value},
|
||||
},
|
||||
form::Form,
|
||||
fs::NamedFile,
|
||||
Responder,
|
||||
},
|
||||
Connection, Pool,
|
||||
};
|
||||
|
||||
use crate::worker::modules::{Network, WorkerMessages};
|
||||
|
||||
use rocket_db_pools::diesel::mysql::{Mysql, MysqlValue};
|
||||
use rocket_db_pools::diesel::serialize::IsNull;
|
||||
use rocket_db_pools::diesel::sql_types::Text;
|
||||
use rocket_db_pools::diesel::MysqlPool;
|
||||
use rocket_db_pools::diesel::{deserialize, serialize};
|
||||
use rocket_db_pools::Database;
|
||||
|
||||
use rocket_ws::WebSocket;
|
||||
use server::Server;
|
||||
use worker::detection::{detect_scanner, get_dns_client, validate_ip, Scanners};
|
||||
|
||||
use std::{
|
||||
env, fmt,
|
||||
ops::{Deref, DerefMut},
|
||||
};
|
||||
use std::{io::Write, net::SocketAddr};
|
||||
use std::{path::PathBuf, str::FromStr};
|
||||
use uuid::Uuid;
|
||||
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
|
||||
use diesel::serialize::IsNull;
|
||||
use diesel::{serialize, MysqlConnection};
|
||||
use dns_ptr_resolver::{get_ptr, ResolvedResult};
|
||||
|
||||
pub mod event_bus;
|
||||
pub mod models;
|
||||
pub mod schema;
|
||||
pub mod server;
|
||||
pub mod worker;
|
||||
|
||||
use crate::models::*;
|
||||
use crate::server::Server;
|
||||
|
||||
/// Short-hand for the database pool type to use throughout the app.
|
||||
type DbPool = Pool<ConnectionManager<MysqlConnection>>;
|
||||
#[derive(Database, Clone)]
|
||||
#[database("snow_scanner_db")]
|
||||
pub struct SnowDb(MysqlPool);
|
||||
|
||||
pub type ReqDbConn = Connection<SnowDb>;
|
||||
pub type DbConn = DbConnection<SnowDb>;
|
||||
|
||||
#[rocket::async_trait]
|
||||
impl<'r, D: Database> FromRequest<'r> for DbConnection<D> {
|
||||
type Error = Option<<D::Pool as Pool>::Error>;
|
||||
|
||||
async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> {
|
||||
match D::fetch(req.rocket()) {
|
||||
Some(db) => match db.get().await {
|
||||
Ok(conn) => Outcome::Success(DbConnection(conn)),
|
||||
Err(e) => Outcome::Error((Status::ServiceUnavailable, Some(e))),
|
||||
},
|
||||
None => Outcome::Error((Status::InternalServerError, None)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DbConnection<D: Database>(pub <D::Pool as Pool>::Connection);
|
||||
|
||||
impl<D: Database> Deref for DbConnection<D> {
|
||||
type Target = <D::Pool as Pool>::Connection;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<D: Database> DerefMut for DbConnection<D> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
trait IsStatic {
|
||||
fn is_static(self: &Self) -> bool;
|
||||
@ -50,6 +111,22 @@ impl IsStatic for Scanners {
|
||||
}
|
||||
}
|
||||
|
||||
impl FromParam<'_> for Scanners {
|
||||
type Error = String;
|
||||
|
||||
fn from_param(param: &'_ str) -> Result<Self, Self::Error> {
|
||||
match param {
|
||||
"stretchoid" => Ok(Scanners::Stretchoid),
|
||||
"binaryedge" => Ok(Scanners::Binaryedge),
|
||||
"stretchoid.txt" => Ok(Scanners::Stretchoid),
|
||||
"binaryedge.txt" => Ok(Scanners::Binaryedge),
|
||||
"censys.txt" => Ok(Scanners::Censys),
|
||||
"internet-measurement.com.txt" => Ok(Scanners::InternetMeasurement),
|
||||
v => Err(format!("Unknown value: {v}")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for Scanners {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
@ -103,16 +180,29 @@ impl serialize::ToSql<Text, Mysql> for Scanners {
|
||||
impl deserialize::FromSql<Text, Mysql> for Scanners {
|
||||
fn from_sql(bytes: MysqlValue) -> deserialize::Result<Self> {
|
||||
let value = <String as deserialize::FromSql<Text, Mysql>>::from_sql(bytes)?;
|
||||
match &value as &str {
|
||||
"stretchoid" => Ok(Scanners::Stretchoid),
|
||||
"binaryedge" => Ok(Scanners::Binaryedge),
|
||||
"internet-measurement.com" => Ok(Scanners::InternetMeasurement),
|
||||
_ => Err("Unrecognized enum variant".into()),
|
||||
let value = &value as &str;
|
||||
let value: Result<Scanners, String> = value.try_into();
|
||||
match value {
|
||||
Ok(d) => Ok(d),
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_ip(pool: web::Data<DbPool>, ip: String) -> Result<Scanner, Option<ResolvedResult>> {
|
||||
impl TryInto<Scanners> for &str {
|
||||
type Error = String;
|
||||
|
||||
fn try_into(self) -> Result<Scanners, Self::Error> {
|
||||
match self {
|
||||
"stretchoid" => Ok(Scanners::Stretchoid),
|
||||
"binaryedge" => Ok(Scanners::Binaryedge),
|
||||
"internet-measurement.com" => Ok(Scanners::InternetMeasurement),
|
||||
value => Err(format!("Invalid value: {value}")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_ip(mut conn: DbConn, ip: String) -> Result<Scanner, Option<ResolvedResult>> {
|
||||
let query_address = ip.parse().expect("To parse");
|
||||
|
||||
let ptr_result: Result<ResolvedResult, ()> = std::thread::spawn(move || {
|
||||
@ -135,17 +225,15 @@ async fn handle_ip(pool: web::Data<DbPool>, ip: String) -> Result<Scanner, Optio
|
||||
|
||||
match detect_scanner(&result) {
|
||||
Ok(Some(scanner_type)) => {
|
||||
// use web::block to offload blocking Diesel queries without blocking server thread
|
||||
web::block(move || {
|
||||
// note that obtaining a connection from the pool is also potentially blocking
|
||||
let conn = &mut pool.get().unwrap();
|
||||
match Scanner::find_or_new(query_address, scanner_type, result.result, conn) {
|
||||
if !validate_ip(query_address) {
|
||||
error!("Invalid IP address: {ip}");
|
||||
return Err(None);
|
||||
}
|
||||
match Scanner::find_or_new(query_address, scanner_type, result.result, &mut conn).await
|
||||
{
|
||||
Ok(scanner) => Ok(scanner),
|
||||
Err(_) => Err(None),
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
Ok(None) => Err(None),
|
||||
|
||||
@ -176,55 +264,80 @@ static FORM: &str = r#"
|
||||
</html>
|
||||
"#;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct ScanParams {
|
||||
username: String,
|
||||
ips: String,
|
||||
#[derive(FromForm, Serialize, Deserialize)]
|
||||
pub struct ScanParams<'r> {
|
||||
username: &'r str,
|
||||
ips: &'r str,
|
||||
}
|
||||
|
||||
async fn handle_scan(pool: web::Data<DbPool>, params: web::Form<ScanParams>) -> HttpResponse {
|
||||
if params.username.len() < 4 {
|
||||
return plain_contents("Invalid username".to_string());
|
||||
#[derive(Responder)]
|
||||
enum MultiReply {
|
||||
#[response(status = 500, content_type = "text")]
|
||||
Error(ServerError),
|
||||
#[response(status = 422)]
|
||||
FormError(PlainText),
|
||||
#[response(status = 404)]
|
||||
NotFound(String),
|
||||
#[response(status = 200)]
|
||||
Content(HtmlContents),
|
||||
#[response(status = 200)]
|
||||
FileContents(NamedFile),
|
||||
}
|
||||
|
||||
#[post("/scan", data = "<form>")]
|
||||
async fn handle_scan(
|
||||
mut db: DbConn,
|
||||
form: Form<ScanParams<'_>>,
|
||||
event_bus_writer: &State<EventBusWriter>,
|
||||
) -> MultiReply {
|
||||
if form.username.len() < 4 {
|
||||
return MultiReply::FormError(PlainText("Invalid username".to_string()));
|
||||
}
|
||||
|
||||
let task_group_id: Uuid = Uuid::now_v7();
|
||||
|
||||
// use web::block to offload blocking Diesel queries without blocking server thread
|
||||
let _ = web::block(move || {
|
||||
// note that obtaining a connection from the pool is also potentially blocking
|
||||
let conn = &mut pool.get().unwrap();
|
||||
for ip in params.ips.lines() {
|
||||
for cidr in form.ips.lines() {
|
||||
let scan_task = ScanTask {
|
||||
task_group_id: task_group_id.to_string(),
|
||||
cidr: ip.to_string(),
|
||||
created_by_username: params.username.clone(),
|
||||
cidr: cidr.to_string(),
|
||||
created_by_username: form.username.to_string(),
|
||||
created_at: Utc::now().naive_utc(),
|
||||
updated_at: None,
|
||||
started_at: None,
|
||||
still_processing_at: None,
|
||||
ended_at: None,
|
||||
};
|
||||
match scan_task.save(conn) {
|
||||
Ok(_) => error!("Added {}", ip.to_string()),
|
||||
let mut bus_tx = event_bus_writer.write();
|
||||
match scan_task.save(&mut db).await {
|
||||
Ok(_) => {
|
||||
info!("Added {}", cidr.to_string());
|
||||
let net = IpCidr::from_str(cidr).unwrap();
|
||||
|
||||
let msg = EventBusWriterEvent::BroadcastMessage(
|
||||
WorkerMessages::DoWorkRequest {
|
||||
neworks: vec![Network(net)],
|
||||
}
|
||||
.into(),
|
||||
);
|
||||
|
||||
let _ = bus_tx.send(msg).await;
|
||||
}
|
||||
Err(err) => error!("Not added: {:?}", err),
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
// map diesel query errors to a 500 error response
|
||||
.map_err(|err| ErrorInternalServerError(err));
|
||||
|
||||
html_contents(format!("New task added: {} !", task_group_id))
|
||||
MultiReply::Content(HtmlContents(format!("New task added: {} !", task_group_id)))
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
#[derive(FromForm, Serialize, Deserialize)]
|
||||
pub struct ReportParams {
|
||||
ip: String,
|
||||
}
|
||||
|
||||
async fn handle_report(pool: web::Data<DbPool>, params: web::Form<ReportParams>) -> HttpResponse {
|
||||
match handle_ip(pool, params.ip.clone()).await {
|
||||
Ok(scanner) => html_contents(match scanner.scanner_name {
|
||||
#[post("/report", data = "<form>")]
|
||||
async fn handle_report(db: DbConn, form: Form<ReportParams>) -> HtmlContents {
|
||||
match handle_ip(db, form.ip.clone()).await {
|
||||
Ok(scanner) => HtmlContents(match scanner.scanner_name {
|
||||
Scanners::Binaryedge => match scanner.last_checked_at {
|
||||
Some(date) => format!(
|
||||
"Reported a binaryedge ninja! <b>{}</b> known as {} since {date}.",
|
||||
@ -252,9 +365,9 @@ async fn handle_report(pool: web::Data<DbPool>, params: web::Form<ReportParams>)
|
||||
_ => format!("Not supported"),
|
||||
}),
|
||||
|
||||
Err(ptr_result) => html_contents(format!(
|
||||
Err(ptr_result) => HtmlContents(format!(
|
||||
"The IP <b>{}</a> resolved as {:?} did not match known scanners patterns.",
|
||||
params.ip,
|
||||
form.ip,
|
||||
match ptr_result {
|
||||
Some(res) => res.result,
|
||||
None => None,
|
||||
@ -267,87 +380,78 @@ struct SecurePath {
|
||||
pub data: String,
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for SecurePath {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let s = <String>::deserialize(deserializer)?;
|
||||
impl FromParam<'_> for SecurePath {
|
||||
type Error = String;
|
||||
|
||||
fn from_param(param: &'_ str) -> Result<Self, Self::Error> {
|
||||
// A-Z a-z 0-9
|
||||
// . - _
|
||||
if s.chars()
|
||||
if param
|
||||
.chars()
|
||||
.all(|c| c.is_ascii_alphanumeric() || c == '.' || c == '-' || c == '_')
|
||||
{
|
||||
return Ok(SecurePath { data: s });
|
||||
return Ok(SecurePath {
|
||||
data: param.to_string(),
|
||||
});
|
||||
}
|
||||
Err(serde::de::Error::custom(format!(
|
||||
"Invalid value: {}",
|
||||
s.to_string()
|
||||
)))
|
||||
Err(format!(
|
||||
"Invalid path value (forbidden chars): {}",
|
||||
param.to_string()
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[get("/collections/<vendor_name>/<file_name>")]
|
||||
async fn handle_get_collection(
|
||||
path: web::Path<(SecurePath, SecurePath)>,
|
||||
req: HttpRequest,
|
||||
static_data_dir: actix_web::web::Data<String>,
|
||||
) -> actix_web::Result<HttpResponse> {
|
||||
let (vendor_name, file_name) = path.into_inner();
|
||||
|
||||
vendor_name: SecurePath,
|
||||
file_name: SecurePath,
|
||||
app_configs: &State<AppConfigs>,
|
||||
) -> MultiReply {
|
||||
let mut path: PathBuf = PathBuf::new();
|
||||
let static_data_dir: String = static_data_dir.into_inner().to_string();
|
||||
let static_data_dir: String = app_configs.static_data_dir.clone();
|
||||
path.push(static_data_dir);
|
||||
path.push("collections");
|
||||
path.push(vendor_name.data);
|
||||
path.push(file_name.data);
|
||||
match NamedFile::open(path) {
|
||||
Ok(file) => Ok(file.into_response(&req)),
|
||||
Err(err) => Ok(HttpResponse::NotFound()
|
||||
.content_type(ContentType::plaintext())
|
||||
.body(format!("File not found: {}.\n", err))),
|
||||
match NamedFile::open(path).await {
|
||||
Ok(file) => MultiReply::FileContents(file),
|
||||
Err(err) => MultiReply::NotFound(err.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
#[get("/scanners/<scanner_name>")]
|
||||
async fn handle_list_scanners(
|
||||
pool: web::Data<DbPool>,
|
||||
path: web::Path<Scanners>,
|
||||
req: HttpRequest,
|
||||
static_data_dir: actix_web::web::Data<String>,
|
||||
) -> actix_web::Result<HttpResponse> {
|
||||
let scanner_name = path.into_inner();
|
||||
let static_data_dir: String = static_data_dir.into_inner().to_string();
|
||||
mut db: DbConn,
|
||||
scanner_name: Scanners,
|
||||
app_configs: &State<AppConfigs>,
|
||||
) -> MultiReply {
|
||||
let static_data_dir: String = app_configs.static_data_dir.clone();
|
||||
if scanner_name.is_static() {
|
||||
let mut path: PathBuf = PathBuf::new();
|
||||
path.push(static_data_dir);
|
||||
path.push("scanners");
|
||||
path.push(scanner_name.to_string());
|
||||
path.push(match scanner_name {
|
||||
Scanners::Stretchoid |
|
||||
Scanners::Binaryedge => panic!("This should not happen"),
|
||||
Scanners::Censys => "censys.txt".to_string(),
|
||||
Scanners::InternetMeasurement => "internet-measurement.com.txt".to_string(),
|
||||
});
|
||||
|
||||
return match NamedFile::open(path) {
|
||||
Ok(file) => Ok(file.into_response(&req)),
|
||||
Err(err) => Ok(HttpResponse::NotFound()
|
||||
.content_type(ContentType::plaintext())
|
||||
.body(format!("File not found: {}.\n", err))),
|
||||
return match NamedFile::open(path).await {
|
||||
Ok(file) => MultiReply::FileContents(file),
|
||||
Err(err) => MultiReply::NotFound(err.to_string()),
|
||||
};
|
||||
}
|
||||
|
||||
// use web::block to offload blocking Diesel queries without blocking server thread
|
||||
let scanners_list = web::block(move || {
|
||||
// note that obtaining a connection from the pool is also potentially blocking
|
||||
let conn = &mut pool.get().unwrap();
|
||||
match Scanner::list_names(scanner_name, conn) {
|
||||
let scanners_list = match Scanner::list_names(scanner_name, &mut db).await {
|
||||
Ok(data) => Ok(data),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
})
|
||||
.await
|
||||
// map diesel query errors to a 500 error response
|
||||
.map_err(|err| ErrorInternalServerError(err))
|
||||
.unwrap();
|
||||
};
|
||||
|
||||
if let Ok(scanners) = scanners_list {
|
||||
Ok(html_contents(scanners.join("\n")))
|
||||
MultiReply::Content(HtmlContents(scanners.join("\n")))
|
||||
} else {
|
||||
Ok(server_error("Unable to list scanners".to_string()))
|
||||
MultiReply::Error(ServerError("Unable to list scanners".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
@ -376,23 +480,16 @@ static SCAN_TASKS_FOOT: &str = r#"
|
||||
</html>
|
||||
"#;
|
||||
|
||||
async fn handle_list_scan_tasks(pool: web::Data<DbPool>) -> HttpResponse {
|
||||
#[get("/scan/tasks")]
|
||||
async fn handle_list_scan_tasks(mut db: DbConn) -> MultiReply {
|
||||
let mut html_data: Vec<String> = vec![SCAN_TASKS_HEAD.to_string()];
|
||||
|
||||
// use web::block to offload blocking Diesel queries without blocking server thread
|
||||
let scan_tasks_list = web::block(move || {
|
||||
// note that obtaining a connection from the pool is also potentially blocking
|
||||
let conn = &mut pool.get().unwrap();
|
||||
match ScanTask::list(conn) {
|
||||
let scan_tasks_list = match ScanTask::list(&mut db).await {
|
||||
Ok(data) => Ok(data),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
})
|
||||
.await
|
||||
// map diesel query errors to a 500 error response
|
||||
.map_err(|err| ErrorInternalServerError(err));
|
||||
};
|
||||
|
||||
if let Ok(scan_tasks) = scan_tasks_list.unwrap() {
|
||||
if let Ok(scan_tasks) = scan_tasks_list {
|
||||
for row in scan_tasks {
|
||||
let cidr: String = row.cidr;
|
||||
let started_at: Option<NaiveDateTime> = row.started_at;
|
||||
@ -413,69 +510,87 @@ async fn handle_list_scan_tasks(pool: web::Data<DbPool>) -> HttpResponse {
|
||||
|
||||
html_data.push(SCAN_TASKS_FOOT.to_string());
|
||||
|
||||
html_contents(html_data.join("\n"))
|
||||
MultiReply::Content(HtmlContents(html_data.join("\n")))
|
||||
} else {
|
||||
return server_error("Unable to list scan tasks".to_string());
|
||||
return MultiReply::Error(ServerError("Unable to list scan tasks".to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
fn get_connection(database_url: &str) -> DbPool {
|
||||
let manager = ConnectionManager::<MysqlConnection>::new(database_url);
|
||||
// Refer to the `r2d2` documentation for more methods to use
|
||||
// when building a connection pool
|
||||
Pool::builder()
|
||||
.max_size(5)
|
||||
.test_on_check_out(true)
|
||||
.build(manager)
|
||||
.expect("Could not build connection pool")
|
||||
#[derive(Responder)]
|
||||
#[response(status = 200, content_type = "text")]
|
||||
pub struct PlainText(String);
|
||||
|
||||
#[derive(Responder)]
|
||||
#[response(status = 200, content_type = "html")]
|
||||
pub struct HtmlContents(String);
|
||||
|
||||
#[derive(Responder)]
|
||||
#[response(status = 500, content_type = "html")]
|
||||
pub struct ServerError(String);
|
||||
|
||||
#[get("/")]
|
||||
async fn index() -> HtmlContents {
|
||||
HtmlContents(FORM.to_string())
|
||||
}
|
||||
|
||||
fn plain_contents(data: String) -> HttpResponse {
|
||||
HttpResponse::Ok()
|
||||
.content_type(ContentType::plaintext())
|
||||
.body(data)
|
||||
#[get("/ping")]
|
||||
async fn pong() -> PlainText {
|
||||
PlainText("pong".to_string())
|
||||
}
|
||||
|
||||
fn html_contents(data: String) -> HttpResponse {
|
||||
HttpResponse::Ok()
|
||||
.content_type(ContentType::html())
|
||||
.body(data)
|
||||
#[get("/ws")]
|
||||
pub async fn ws(
|
||||
ws: WebSocket,
|
||||
event_bus: &State<EventBusSubscriber>,
|
||||
event_bus_writer: &State<EventBusWriter>,
|
||||
) -> rocket_ws::Channel<'static> {
|
||||
use rocket::futures::channel::mpsc as rocket_mpsc;
|
||||
|
||||
let (_, ws_receiver) = rocket_mpsc::channel::<rocket_ws::Message>(1);
|
||||
let bus_rx = event_bus.subscribe();
|
||||
let bus_tx = event_bus_writer.write();
|
||||
let channel: rocket_ws::Channel =
|
||||
ws.channel(|stream| Server::handle(stream, bus_rx, bus_tx, ws_receiver));
|
||||
|
||||
channel
|
||||
}
|
||||
|
||||
fn server_error(data: String) -> HttpResponse {
|
||||
HttpResponse::InternalServerError()
|
||||
.content_type(ContentType::html())
|
||||
.body(data)
|
||||
struct AppConfigs {
|
||||
static_data_dir: String,
|
||||
}
|
||||
|
||||
async fn index() -> HttpResponse {
|
||||
html_contents(FORM.to_string())
|
||||
async fn report_counts<'a>(rocket: Rocket<rocket::Build>) -> Rocket<rocket::Build> {
|
||||
let conn = SnowDb::fetch(&rocket)
|
||||
.expect("Failed to get DB connection")
|
||||
.clone()
|
||||
.get()
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
span_error!("failed to connect to MySQL database" => error!("{e}"));
|
||||
panic!("aborting launch");
|
||||
});
|
||||
match Scanner::list_names(Scanners::Stretchoid, &mut DbConnection(conn)).await {
|
||||
Ok(d) => info!("Found {} Stretchoid scanners", d.len()),
|
||||
Err(err) => error!("Unable to fetch Stretchoid scanners: {err}"),
|
||||
}
|
||||
|
||||
rocket
|
||||
}
|
||||
|
||||
async fn pong() -> HttpResponse {
|
||||
plain_contents("pong".to_string())
|
||||
}
|
||||
|
||||
#[actix_web::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
let _log2 = log2::stdout()
|
||||
.module(false)
|
||||
.level(match env::var("RUST_LOG") {
|
||||
Ok(level) => level,
|
||||
Err(_) => "debug".to_string(),
|
||||
})
|
||||
.start();
|
||||
|
||||
let server_address: String = if let Ok(env) = env::var("SERVER_ADDRESS") {
|
||||
env
|
||||
#[rocket::main]
|
||||
async fn main() -> Result<(), rocket::Error> {
|
||||
let server_address: SocketAddr = if let Ok(env) = env::var("SERVER_ADDRESS") {
|
||||
env.parse()
|
||||
.expect("The ENV SERVER_ADDRESS should be a valid socket address (address:port)")
|
||||
} else {
|
||||
"127.0.0.1:8000".to_string()
|
||||
"127.0.0.1:8000"
|
||||
.parse()
|
||||
.expect("The default address should be valid")
|
||||
};
|
||||
|
||||
let worker_server_address: String = if let Ok(env) = env::var("WORKER_SERVER_ADDRESS") {
|
||||
env
|
||||
} else {
|
||||
"127.0.0.1:8800".to_string()
|
||||
let static_data_dir: String = match env::var("STATIC_DATA_DIR") {
|
||||
Ok(val) => val,
|
||||
Err(_) => "../data/".to_string(),
|
||||
};
|
||||
|
||||
let db_url: String = if let Ok(env) = env::var("DB_URL") {
|
||||
@ -485,72 +600,67 @@ async fn main() -> std::io::Result<()> {
|
||||
"mysql://localhost".to_string()
|
||||
};
|
||||
|
||||
let pool = get_connection(db_url.as_str());
|
||||
|
||||
// note that obtaining a connection from the pool is also potentially blocking
|
||||
let conn = &mut pool.get().unwrap();
|
||||
let names = Scanner::list_names(Scanners::Stretchoid, conn);
|
||||
match names {
|
||||
Ok(names) => info!("Found {} Stretchoid scanners", names.len()),
|
||||
Err(err) => error!("Unable to get names: {}", err),
|
||||
let db: Map<_, Value> = map! {
|
||||
"url" => db_url.into(),
|
||||
"pool_size" => 10.into(),
|
||||
"timeout" => 5.into(),
|
||||
};
|
||||
|
||||
let server = HttpServer::new(move || {
|
||||
let static_data_dir: String = match env::var("STATIC_DATA_DIR") {
|
||||
Ok(val) => val,
|
||||
Err(_) => "../data/".to_string(),
|
||||
};
|
||||
let config_figment = rocket::Config::figment()
|
||||
.merge(("address", server_address.ip().to_string()))
|
||||
.merge(("port", server_address.port()))
|
||||
.merge(("databases", map!["snow_scanner_db" => db]));
|
||||
|
||||
App::new()
|
||||
.app_data(web::Data::new(pool.clone()))
|
||||
.app_data(actix_web::web::Data::new(static_data_dir))
|
||||
.route("/", web::get().to(index))
|
||||
.route("/ping", web::get().to(pong))
|
||||
.route("/report", web::post().to(handle_report))
|
||||
.route("/scan", web::post().to(handle_scan))
|
||||
.route("/scan/tasks", web::get().to(handle_list_scan_tasks))
|
||||
.route(
|
||||
"/scanners/{scanner_name}",
|
||||
web::get().to(handle_list_scanners),
|
||||
)
|
||||
.route(
|
||||
"/collections/{vendor_name}/{file_name}",
|
||||
web::get().to(handle_get_collection),
|
||||
)
|
||||
let mut event_bus = event_bus::EventBus::new();
|
||||
let event_subscriber = event_bus.subscriber();
|
||||
let event_writer = event_bus.writer();
|
||||
|
||||
let _ = rocket::custom(config_figment)
|
||||
.attach(SnowDb::init())
|
||||
.attach(AdHoc::on_ignite("Report counts", report_counts))
|
||||
.attach(AdHoc::on_shutdown("Close Websockets", |r| {
|
||||
Box::pin(async move {
|
||||
if let Some(writer) = r.state::<EventBusWriter>() {
|
||||
Server::shutdown_to_all(writer);
|
||||
}
|
||||
})
|
||||
.bind(&server_address);
|
||||
match server {
|
||||
Ok(server) => {
|
||||
match ws2::listen(worker_server_address.as_str()) {
|
||||
Ok(mut ws_server) => {
|
||||
std::thread::spawn(move || {
|
||||
let pool = get_connection(db_url.as_str());
|
||||
// note that obtaining a connection from the pool is also potentially blocking
|
||||
let conn = &mut pool.get().unwrap();
|
||||
let mut ws_server_handles = Server {
|
||||
clients: HashMap::new(),
|
||||
new_scanners: HashMap::new(),
|
||||
};
|
||||
info!("Worker server is listening on: {worker_server_address}");
|
||||
loop {
|
||||
match ws_server.process(&mut ws_server_handles, 0.5) {
|
||||
Ok(_) => {}
|
||||
Err(err) => error!("Processing error: {err}"),
|
||||
}
|
||||
ws_server_handles.cleanup(&ws_server);
|
||||
ws_server_handles.commit(conn);
|
||||
}
|
||||
}))
|
||||
.attach(AdHoc::on_liftoff(
|
||||
"Run websocket client manager",
|
||||
move |r| {
|
||||
Box::pin(async move {
|
||||
let conn = SnowDb::fetch(r)
|
||||
.expect("Failed to get DB connection")
|
||||
.clone()
|
||||
.get()
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
span_error!("failed to connect to MySQL database" => error!("{e}"));
|
||||
panic!("aborting launch");
|
||||
});
|
||||
}
|
||||
Err(err) => error!("Unable to listen on {worker_server_address}: {err}"),
|
||||
};
|
||||
|
||||
info!("Now listening on {}", server_address);
|
||||
server.run().await
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Could not bind the server to {}", server_address);
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
rocket::tokio::spawn(async move {
|
||||
event_bus.run(DbConnection(conn)).await;
|
||||
});
|
||||
})
|
||||
},
|
||||
))
|
||||
.manage(AppConfigs { static_data_dir })
|
||||
.manage(event_subscriber)
|
||||
.manage(event_writer)
|
||||
.mount(
|
||||
"/",
|
||||
routes![
|
||||
index,
|
||||
pong,
|
||||
handle_report,
|
||||
handle_scan,
|
||||
handle_list_scan_tasks,
|
||||
handle_list_scanners,
|
||||
handle_get_collection,
|
||||
ws,
|
||||
],
|
||||
)
|
||||
.launch()
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -1,11 +1,9 @@
|
||||
use std::net::IpAddr;
|
||||
|
||||
use crate::Scanners;
|
||||
use crate::{DbConn, Scanners};
|
||||
use chrono::{NaiveDateTime, Utc};
|
||||
use diesel::dsl::insert_into;
|
||||
use diesel::prelude::*;
|
||||
use diesel::result::Error as DieselError;
|
||||
use hickory_resolver::Name;
|
||||
use rocket_db_pools::diesel::{dsl::insert_into, prelude::*, result::Error as DieselError};
|
||||
|
||||
use crate::schema::scan_tasks::dsl::scan_tasks;
|
||||
use crate::schema::scanners::dsl::scanners;
|
||||
@ -25,14 +23,14 @@ pub struct Scanner {
|
||||
}
|
||||
|
||||
impl Scanner {
|
||||
pub fn find_or_new(
|
||||
pub async fn find_or_new(
|
||||
query_address: IpAddr,
|
||||
scanner_name: Scanners,
|
||||
ptr: Option<Name>,
|
||||
conn: &mut MysqlConnection,
|
||||
conn: &mut DbConn,
|
||||
) -> Result<Scanner, ()> {
|
||||
let ip_type = if query_address.is_ipv6() { 6 } else { 4 };
|
||||
let scanner_row_result = Scanner::find(query_address.to_string(), ip_type, conn);
|
||||
let scanner_row_result = Scanner::find(query_address.to_string(), ip_type, conn).await;
|
||||
let scanner_row = match scanner_row_result {
|
||||
Ok(scanner_row) => scanner_row,
|
||||
Err(_) => return Err(()),
|
||||
@ -58,31 +56,31 @@ impl Scanner {
|
||||
last_checked_at: None,
|
||||
}
|
||||
};
|
||||
match scanner.save(conn) {
|
||||
match scanner.save(conn).await {
|
||||
Ok(scanner) => Ok(scanner),
|
||||
Err(_) => Err(()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn find(
|
||||
pub async fn find(
|
||||
ip_address: String,
|
||||
ip_type: u8,
|
||||
conn: &mut MysqlConnection,
|
||||
conn: &mut DbConn,
|
||||
) -> Result<Option<Scanner>, DieselError> {
|
||||
use crate::schema::scanners;
|
||||
|
||||
scanners
|
||||
.select(Scanner::as_select())
|
||||
.filter(scanners::ip.eq(ip_address))
|
||||
.filter(scanners::ip_type.eq(ip_type))
|
||||
.order((scanners::ip_type.desc(), scanners::created_at.desc()))
|
||||
.first(conn)
|
||||
.await
|
||||
.optional()
|
||||
}
|
||||
|
||||
pub fn list_names(
|
||||
pub async fn list_names(
|
||||
scanner_name: Scanners,
|
||||
conn: &mut MysqlConnection,
|
||||
conn: &mut DbConn,
|
||||
) -> Result<Vec<String>, DieselError> {
|
||||
use crate::schema::scanners;
|
||||
use crate::schema::scanners::ip;
|
||||
@ -92,16 +90,20 @@ impl Scanner {
|
||||
.filter(scanners::scanner_name.eq(scanner_name.to_string()))
|
||||
.order((scanners::ip_type.desc(), scanners::created_at.desc()))
|
||||
.load::<String>(conn)
|
||||
.await
|
||||
}
|
||||
|
||||
pub fn save(self: Scanner, conn: &mut MysqlConnection) -> Result<Scanner, DieselError> {
|
||||
let new_scanner = NewScanner::from_scanner(&self);
|
||||
match insert_into(scanners)
|
||||
pub async fn save(self: Scanner, conn: &mut DbConn) -> Result<Scanner, DieselError> {
|
||||
use crate::schema::scanners;
|
||||
|
||||
let new_scanner: NewScanner = NewScanner::from_scanner(&self).await;
|
||||
match insert_into(scanners::table)
|
||||
.values(&new_scanner)
|
||||
.on_conflict(diesel::dsl::DuplicatedKeys)
|
||||
.do_update()
|
||||
.set(&new_scanner)
|
||||
.execute(conn)
|
||||
.await
|
||||
{
|
||||
Ok(_) => Ok(self),
|
||||
Err(err) => Err(err),
|
||||
@ -124,7 +126,7 @@ pub struct NewScanner {
|
||||
}
|
||||
|
||||
impl NewScanner {
|
||||
pub fn from_scanner<'x>(scanner: &Scanner) -> NewScanner {
|
||||
pub async fn from_scanner<'x>(scanner: &Scanner) -> NewScanner {
|
||||
NewScanner {
|
||||
ip: scanner.ip.to_string(),
|
||||
ip_type: scanner.ip_type,
|
||||
@ -165,7 +167,22 @@ pub struct ScanTaskitem {
|
||||
}
|
||||
|
||||
impl ScanTask {
|
||||
pub fn list(conn: &mut MysqlConnection) -> Result<Vec<ScanTaskitem>, DieselError> {
|
||||
pub async fn list_not_started(mut conn: DbConn) -> Result<Vec<ScanTaskitem>, DieselError> {
|
||||
use crate::schema::scan_tasks;
|
||||
|
||||
let res = scan_tasks
|
||||
.select(ScanTaskitem::as_select())
|
||||
.filter(scan_tasks::started_at.is_null())
|
||||
.order((scan_tasks::created_at.asc(),))
|
||||
.load::<ScanTaskitem>(&mut conn)
|
||||
.await;
|
||||
match res {
|
||||
Ok(rows) => Ok(rows),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list(conn: &mut DbConn) -> Result<Vec<ScanTaskitem>, DieselError> {
|
||||
use crate::schema::scan_tasks;
|
||||
|
||||
let res = scan_tasks
|
||||
@ -174,21 +191,26 @@ impl ScanTask {
|
||||
scan_tasks::created_at.desc(),
|
||||
scan_tasks::task_group_id.asc(),
|
||||
))
|
||||
.load::<ScanTaskitem>(conn);
|
||||
.load::<ScanTaskitem>(conn)
|
||||
.await;
|
||||
match res {
|
||||
Ok(rows) => Ok(rows),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn save(self: &ScanTask, conn: &mut MysqlConnection) -> Result<(), DieselError> {
|
||||
let new_scan_task = NewScanTask::from_scan_task(self);
|
||||
match insert_into(scan_tasks)
|
||||
pub async fn save(self: &ScanTask, conn: &mut DbConn) -> Result<(), DieselError> {
|
||||
use crate::schema::scan_tasks;
|
||||
|
||||
let new_scan_task: NewScanTask = NewScanTask::from_scan_task(self).await;
|
||||
|
||||
match insert_into(scan_tasks::table)
|
||||
.values(&new_scan_task)
|
||||
.on_conflict(diesel::dsl::DuplicatedKeys)
|
||||
.do_update()
|
||||
.set(&new_scan_task)
|
||||
.execute(conn)
|
||||
.await
|
||||
{
|
||||
Ok(_) => Ok(()),
|
||||
Err(err) => Err(err),
|
||||
@ -211,7 +233,7 @@ pub struct NewScanTask {
|
||||
}
|
||||
|
||||
impl NewScanTask {
|
||||
pub fn from_scan_task<'x>(scan_task: &ScanTask) -> NewScanTask {
|
||||
pub async fn from_scan_task<'x>(scan_task: &ScanTask) -> NewScanTask {
|
||||
NewScanTask {
|
||||
task_group_id: scan_task.task_group_id.to_string(),
|
||||
cidr: scan_task.cidr.to_owned(),
|
||||
|
@ -1,74 +1,229 @@
|
||||
use cidr::IpCidr;
|
||||
use diesel::MysqlConnection;
|
||||
use hickory_resolver::Name;
|
||||
use log2::*;
|
||||
use std::{collections::HashMap, net::IpAddr, str::FromStr};
|
||||
use ws2::{Pod, WebSocket};
|
||||
use rocket::futures::{stream::Next, SinkExt, StreamExt};
|
||||
use rocket_ws::{frame::CloseFrame, Message};
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::{
|
||||
worker::{
|
||||
detection::detect_scanner_from_name,
|
||||
modules::{Network, WorkerMessages},
|
||||
},
|
||||
DbPool, Scanner,
|
||||
event_bus::{EventBusEvent, EventBusWriter, EventBusWriterEvent},
|
||||
worker::modules::WorkerMessages,
|
||||
};
|
||||
use rocket::futures::channel::mpsc as rocket_mpsc;
|
||||
|
||||
pub struct Server {
|
||||
pub clients: HashMap<u32, Worker>,
|
||||
pub new_scanners: HashMap<String, IpAddr>,
|
||||
}
|
||||
pub struct Server {}
|
||||
|
||||
type HandleBox = Pin<
|
||||
Box<dyn std::future::Future<Output = Result<(), rocket_ws::result::Error>> + std::marker::Send>,
|
||||
>;
|
||||
|
||||
impl Server {
|
||||
pub fn cleanup(&self, _: &ws2::Server) -> &Server {
|
||||
// TODO: implement check not logged in
|
||||
&self
|
||||
pub fn handle(
|
||||
stream: rocket_ws::stream::DuplexStream,
|
||||
bus_rx: rocket::tokio::sync::broadcast::Receiver<EventBusEvent>,
|
||||
bus_tx: rocket_mpsc::Sender<EventBusWriterEvent>,
|
||||
ws_receiver: rocket_mpsc::Receiver<rocket_ws::Message>,
|
||||
) -> HandleBox {
|
||||
use rocket::tokio;
|
||||
|
||||
Box::pin(async move {
|
||||
let work_fn = Worker::work(stream, bus_rx, bus_tx, ws_receiver);
|
||||
tokio::spawn(work_fn);
|
||||
|
||||
tokio::signal::ctrl_c().await.unwrap();
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub fn commit(&mut self, conn: &mut MysqlConnection) -> &Server {
|
||||
for (name, query_address) in self.new_scanners.clone() {
|
||||
let scanner_name = Name::from_str(name.as_str()).unwrap();
|
||||
pub fn new() -> Server {
|
||||
Server {}
|
||||
}
|
||||
|
||||
match detect_scanner_from_name(&scanner_name) {
|
||||
Ok(Some(scanner_type)) => {
|
||||
match Scanner::find_or_new(
|
||||
query_address,
|
||||
scanner_type,
|
||||
Some(scanner_name),
|
||||
conn,
|
||||
) {
|
||||
Ok(scanner) => {
|
||||
// Got saved
|
||||
self.new_scanners.remove(&name);
|
||||
info!(
|
||||
"Saved {scanner_type}: {name} for {query_address}: {:?}",
|
||||
scanner.ip_ptr
|
||||
);
|
||||
/*pub fn add_worker(tx: rocket_mpsc::Sender<Message>, workers: &Mutex<WorkersList>) -> () {
|
||||
let workers_lock = workers.try_lock();
|
||||
if let Ok(mut workers) = workers_lock {
|
||||
workers.push(tx);
|
||||
info!("Clients: {}", workers.len());
|
||||
std::mem::drop(workers);
|
||||
} else {
|
||||
error!("Unable to lock workers");
|
||||
}
|
||||
}*/
|
||||
|
||||
pub fn shutdown_to_all(server: &EventBusWriter) -> () {
|
||||
let res = server
|
||||
.write()
|
||||
.try_send(EventBusWriterEvent::BroadcastMessage(Message::Close(Some(
|
||||
CloseFrame {
|
||||
code: rocket_ws::frame::CloseCode::Away,
|
||||
reason: "Server stop".into(),
|
||||
},
|
||||
))));
|
||||
match res {
|
||||
Ok(_) => {
|
||||
info!("Worker did receive stop signal.");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Unable to find or new {:?}", err);
|
||||
error!("Send error: {err}");
|
||||
}
|
||||
};
|
||||
}
|
||||
Ok(None) => {}
|
||||
Err(_) => {}
|
||||
|
||||
/*pub fn send_to_all(workers: &Mutex<WorkersList>, message: &str) -> () {
|
||||
let workers_lock = workers.try_lock();
|
||||
if let Ok(ref workers) = workers_lock {
|
||||
workers.iter().for_each(|tx| {
|
||||
let res = tx.clone().try_send(Message::Text(message.to_string()));
|
||||
match res {
|
||||
Ok(_) => {
|
||||
info!("Message sent to worker !");
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Send error: {err}");
|
||||
}
|
||||
self
|
||||
};
|
||||
});
|
||||
info!("Currently {} workers online.", workers.len());
|
||||
std::mem::drop(workers_lock);
|
||||
} else {
|
||||
error!("Unable to lock workers");
|
||||
}
|
||||
}*/
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Worker {
|
||||
pub authenticated: bool,
|
||||
pub login: Option<String>,
|
||||
pub struct Worker<'a> {
|
||||
authenticated: bool,
|
||||
login: Option<String>,
|
||||
stream: &'a mut rocket_ws::stream::DuplexStream,
|
||||
bus_tx: &'a mut rocket_mpsc::Sender<EventBusWriterEvent>,
|
||||
}
|
||||
|
||||
impl Worker {
|
||||
pub fn initial() -> Worker {
|
||||
impl<'a> Worker<'a> {
|
||||
pub fn initial(
|
||||
stream: &'a mut rocket_ws::stream::DuplexStream,
|
||||
bus_tx: &'a mut rocket_mpsc::Sender<EventBusWriterEvent>,
|
||||
) -> Worker<'a> {
|
||||
info!("New worker");
|
||||
Worker {
|
||||
authenticated: false,
|
||||
login: None,
|
||||
stream,
|
||||
bus_tx,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn work(
|
||||
mut stream: rocket_ws::stream::DuplexStream,
|
||||
mut bus_rx: rocket::tokio::sync::broadcast::Receiver<EventBusEvent>,
|
||||
mut bus_tx: rocket_mpsc::Sender<EventBusWriterEvent>,
|
||||
mut ws_receiver: rocket_mpsc::Receiver<rocket_ws::Message>,
|
||||
) {
|
||||
use crate::rocket::futures::StreamExt;
|
||||
use rocket::tokio;
|
||||
|
||||
let mut worker = Worker::initial(&mut stream, &mut bus_tx);
|
||||
let mut interval = rocket::tokio::time::interval(std::time::Duration::from_secs(60));
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => {
|
||||
// Send message every X seconds
|
||||
if let Ok(true) = worker.tick().await {
|
||||
break;
|
||||
}
|
||||
}
|
||||
result = bus_rx.recv() => {
|
||||
let message = match result {
|
||||
Ok(message) => message,
|
||||
Err(err) => {
|
||||
error!("Bus error: {err}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if let Err(err) = worker.send(message).await {
|
||||
error!("Error sending event to Event bus WebSocket: {}", err);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Some(message) = ws_receiver.next() => {
|
||||
info!("Received message from other client: {:?}", message);
|
||||
let _ = worker.send(message).await;
|
||||
},
|
||||
Ok(false) = worker.poll() => {
|
||||
// Continue the loop
|
||||
}
|
||||
else => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send(&mut self, msg: Message) -> Result<(), rocket_ws::result::Error> {
|
||||
self.stream.send(msg).await
|
||||
}
|
||||
|
||||
pub fn next(&mut self) -> Next<'_, rocket_ws::stream::DuplexStream> {
|
||||
self.stream.next()
|
||||
}
|
||||
|
||||
pub async fn tick(&mut self) -> Result<bool, ()> {
|
||||
match self.send(rocket_ws::Message::Ping(vec![])).await {
|
||||
Ok(_) => Ok(false),
|
||||
Err(err) => {
|
||||
error!("Processing error: {err}");
|
||||
Ok(true) // Break processing loop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn poll(&mut self) -> Result<bool, ()> {
|
||||
let message = self.next();
|
||||
|
||||
match message.await {
|
||||
Some(Ok(message)) => {
|
||||
match message {
|
||||
rocket_ws::Message::Text(_) => match self.on_message(&message).await {
|
||||
Ok(_) => {}
|
||||
Err(err) => error!("Processing error: {err}"),
|
||||
},
|
||||
rocket_ws::Message::Binary(data) => {
|
||||
// Handle Binary message
|
||||
info!("Received Binary message: {:?}", data);
|
||||
}
|
||||
rocket_ws::Message::Close(close_frame) => {
|
||||
// Handle Close message
|
||||
info!("Received Close message: {:?}", close_frame);
|
||||
let close_frame = rocket_ws::frame::CloseFrame {
|
||||
code: rocket_ws::frame::CloseCode::Normal,
|
||||
reason: "Client disconected".to_string().into(),
|
||||
};
|
||||
let _ = self.stream.close(Some(close_frame)).await;
|
||||
return Ok(true); // Break processing loop
|
||||
}
|
||||
rocket_ws::Message::Ping(ping_data) => {
|
||||
match self.send(rocket_ws::Message::Pong(ping_data)).await {
|
||||
Ok(_) => {}
|
||||
Err(err) => error!("Processing error: {err}"),
|
||||
}
|
||||
}
|
||||
rocket_ws::Message::Pong(pong_data) => {
|
||||
// Handle Pong message
|
||||
info!("Received Pong message: {:?}", pong_data);
|
||||
}
|
||||
_ => {
|
||||
info!("Received other message: {:?}", message);
|
||||
}
|
||||
};
|
||||
Ok(false)
|
||||
}
|
||||
Some(Err(_)) => {
|
||||
info!("Connection closed");
|
||||
let close_frame = rocket_ws::frame::CloseFrame {
|
||||
code: rocket_ws::frame::CloseCode::Normal,
|
||||
reason: "Client disconected".to_string().into(),
|
||||
};
|
||||
let _ = self.stream.close(Some(close_frame)).await;
|
||||
// The connection is closed by the client
|
||||
Ok(true) // Break processing loop
|
||||
}
|
||||
None => Ok(false),
|
||||
}
|
||||
}
|
||||
|
||||
@ -90,62 +245,46 @@ impl Worker {
|
||||
self.authenticated = true;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl ws2::Handler for Server {
|
||||
fn on_open(&mut self, ws: &WebSocket) -> Pod {
|
||||
info!("New client: {ws}");
|
||||
let worker = Worker::initial();
|
||||
// Add the client
|
||||
self.clients.insert(ws.id(), worker);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn on_close(&mut self, ws: &WebSocket) -> Pod {
|
||||
info!("Client /quit: {ws}");
|
||||
// Drop the client
|
||||
self.clients.remove(&ws.id());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn on_message(&mut self, ws: &WebSocket, msg: String) -> Pod {
|
||||
let client = self.clients.get_mut(&ws.id());
|
||||
if client.is_none() {
|
||||
// Impossible, close in case
|
||||
return ws.close();
|
||||
}
|
||||
let worker: &mut Worker = client.unwrap();
|
||||
|
||||
info!("on message: {msg}, {ws}");
|
||||
pub async fn on_message(&mut self, msg: &Message) -> Result<(), String> {
|
||||
info!("on message: {msg}");
|
||||
|
||||
let mut worker_reply: Option<WorkerMessages> = None;
|
||||
let worker_request: WorkerMessages = msg.clone().into();
|
||||
let worker_request: WorkerMessages = match msg.clone().try_into() {
|
||||
Ok(data) => data,
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
let result = match worker_request {
|
||||
WorkerMessages::AuthenticateRequest { login } => {
|
||||
if !worker.is_authenticated() {
|
||||
worker.authenticate(login);
|
||||
if !self.is_authenticated() {
|
||||
self.authenticate(login);
|
||||
return Ok(());
|
||||
} else {
|
||||
error!("Already authenticated: {ws}");
|
||||
error!("Already authenticated");
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
WorkerMessages::ScannerFoundResponse { name, address } => {
|
||||
info!("Detected {name} for {address}");
|
||||
self.new_scanners.insert(name, address);
|
||||
let _ = self
|
||||
.bus_tx
|
||||
.try_send(EventBusWriterEvent::ScannerFoundResponse { name, address });
|
||||
Ok(())
|
||||
}
|
||||
WorkerMessages::GetWorkRequest {} => {
|
||||
worker_reply = Some(WorkerMessages::DoWorkRequest {
|
||||
neworks: vec![Network(IpCidr::from_str("52.189.78.0/24")?)],
|
||||
});
|
||||
worker_reply = Some(WorkerMessages::DoWorkRequest { neworks: vec![] });
|
||||
Ok(())
|
||||
}
|
||||
WorkerMessages::DoWorkRequest { .. } | WorkerMessages::Invalid { .. } => {
|
||||
error!("Unable to understand: {msg}, {ws}");
|
||||
error!("Unable to understand: {msg}");
|
||||
// Unable to understand, close the connection
|
||||
return ws.close();
|
||||
let close_frame = rocket_ws::frame::CloseFrame {
|
||||
code: rocket_ws::frame::CloseCode::Unsupported,
|
||||
reason: "Invalid data received".to_string().into(),
|
||||
};
|
||||
let _ = self.stream.close(Some(close_frame)).await;
|
||||
Err("Unable to understand: {msg}}")
|
||||
} /*msg => {
|
||||
error!("No implemented: {:#?}", msg);
|
||||
Ok(())
|
||||
@ -155,14 +294,14 @@ impl ws2::Handler for Server {
|
||||
// it has a request to send
|
||||
if let Some(worker_reply) = worker_reply {
|
||||
let msg_string: String = worker_reply.to_string();
|
||||
match ws.send(msg_string) {
|
||||
match self.send(rocket_ws::Message::Text(msg_string)).await {
|
||||
Ok(_) => match worker_reply {
|
||||
WorkerMessages::DoWorkRequest { .. } => {}
|
||||
msg => error!("No implemented: {:#?}", msg),
|
||||
},
|
||||
Err(err) => error!("Error sending reply to {ws}: {err}"),
|
||||
Err(err) => error!("Error sending reply: {err}"),
|
||||
}
|
||||
}
|
||||
result
|
||||
Ok(result?)
|
||||
}
|
||||
}
|
||||
|
26
snow-scanner/src/worker/Cargo.toml
Normal file
26
snow-scanner/src/worker/Cargo.toml
Normal file
@ -0,0 +1,26 @@
|
||||
[package]
|
||||
name = "snow-scanner-worker"
|
||||
version = "0.1.0"
|
||||
authors = ["William Desportes <williamdes@wdes.fr>"]
|
||||
edition = "2021"
|
||||
rust-version = "1.81.0" # MSRV
|
||||
description = "The CLI to run a snow-scanner worker"
|
||||
|
||||
[[bin]]
|
||||
name = "snow-scanner-worker"
|
||||
path = "worker.rs"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
tungstenite = { version = "0.24.0", default-features = true, features = ["native-tls"] }
|
||||
rocket_ws = { git = "https://github.com/rwf2/Rocket/", rev = "3bf9ef02d6e803fe9f753777f5a829dda6d2453d", default-features = true}
|
||||
log2 = "0.1.11"
|
||||
diesel = { version = "2", default-features = false, features = [] }
|
||||
dns-ptr-resolver = {git = "https://github.com/wdes/dns-ptr-resolver.git"}
|
||||
hickory-resolver = { version = "0.24.1", default-features = false, features = ["tokio-runtime", "dns-over-h3", "dns-over-https", "dns-over-quic"]}
|
||||
chrono = "0.4.38"
|
||||
uuid = { version = "1.10.0", default-features = false, features = ["v7", "serde", "std"] }
|
||||
cidr = "0.2.2"
|
||||
serde = "1.0.210"
|
||||
serde_json = "1.0.128"
|
@ -8,6 +8,8 @@ use dns_ptr_resolver::ResolvedResult;
|
||||
use hickory_resolver::config::{NameServerConfigGroup, ResolverConfig, ResolverOpts};
|
||||
use hickory_resolver::{Name, Resolver};
|
||||
|
||||
use crate::worker::ip_addr::is_global_hardcoded;
|
||||
|
||||
#[derive(Debug, Clone, Copy, FromSqlRow)]
|
||||
pub enum Scanners {
|
||||
Stretchoid,
|
||||
@ -33,6 +35,14 @@ pub fn get_dns_client() -> Resolver {
|
||||
Resolver::new(config, options).unwrap()
|
||||
}
|
||||
|
||||
pub fn validate_ip(ip: IpAddr) -> bool {
|
||||
// unspecified => 0.0.0.0
|
||||
if ip.is_loopback() || ip.is_multicast() || ip.is_unspecified() {
|
||||
return false;
|
||||
}
|
||||
return is_global_hardcoded(ip);
|
||||
}
|
||||
|
||||
pub fn detect_scanner(ptr_result: &ResolvedResult) -> Result<Option<Scanners>, ()> {
|
||||
match &ptr_result.result {
|
||||
Some(name) => detect_scanner_from_name(&name),
|
||||
|
126
snow-scanner/src/worker/ip_addr.rs
Normal file
126
snow-scanner/src/worker/ip_addr.rs
Normal file
@ -0,0 +1,126 @@
|
||||
//
|
||||
// Port of the official Rust implementation
|
||||
// Source: https://github.com/dani-garcia/vaultwarden/blob/1.32.1/src/util.rs
|
||||
//
|
||||
|
||||
/// TODO: This is extracted from IpAddr::is_global, which is unstable:
|
||||
/// https://doc.rust-lang.org/nightly/std/net/enum.IpAddr.html#method.is_global
|
||||
/// Remove once https://github.com/rust-lang/rust/issues/27709 is merged
|
||||
#[allow(clippy::nonminimal_bool)]
|
||||
#[cfg(any(not(feature = "unstable"), test))]
|
||||
pub fn is_global_hardcoded(ip: std::net::IpAddr) -> bool {
|
||||
match ip {
|
||||
std::net::IpAddr::V4(ip) => {
|
||||
!(ip.octets()[0] == 0 // "This network"
|
||||
|| ip.is_private()
|
||||
|| (ip.octets()[0] == 100 && (ip.octets()[1] & 0b1100_0000 == 0b0100_0000)) //ip.is_shared()
|
||||
|| ip.is_loopback()
|
||||
|| ip.is_link_local()
|
||||
// addresses reserved for future protocols (`192.0.0.0/24`)
|
||||
||(ip.octets()[0] == 192 && ip.octets()[1] == 0 && ip.octets()[2] == 0)
|
||||
|| ip.is_documentation()
|
||||
|| (ip.octets()[0] == 198 && (ip.octets()[1] & 0xfe) == 18) // ip.is_benchmarking()
|
||||
|| (ip.octets()[0] & 240 == 240 && !ip.is_broadcast()) //ip.is_reserved()
|
||||
|| ip.is_broadcast())
|
||||
}
|
||||
std::net::IpAddr::V6(ip) => {
|
||||
!(ip.is_unspecified()
|
||||
|| ip.is_loopback()
|
||||
// IPv4-mapped Address (`::ffff:0:0/96`)
|
||||
|| matches!(ip.segments(), [0, 0, 0, 0, 0, 0xffff, _, _])
|
||||
// IPv4-IPv6 Translat. (`64:ff9b:1::/48`)
|
||||
|| matches!(ip.segments(), [0x64, 0xff9b, 1, _, _, _, _, _])
|
||||
// Discard-Only Address Block (`100::/64`)
|
||||
|| matches!(ip.segments(), [0x100, 0, 0, 0, _, _, _, _])
|
||||
// IETF Protocol Assignments (`2001::/23`)
|
||||
|| (matches!(ip.segments(), [0x2001, b, _, _, _, _, _, _] if b < 0x200)
|
||||
&& !(
|
||||
// Port Control Protocol Anycast (`2001:1::1`)
|
||||
u128::from_be_bytes(ip.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0001
|
||||
// Traversal Using Relays around NAT Anycast (`2001:1::2`)
|
||||
|| u128::from_be_bytes(ip.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0002
|
||||
// AMT (`2001:3::/32`)
|
||||
|| matches!(ip.segments(), [0x2001, 3, _, _, _, _, _, _])
|
||||
// AS112-v6 (`2001:4:112::/48`)
|
||||
|| matches!(ip.segments(), [0x2001, 4, 0x112, _, _, _, _, _])
|
||||
// ORCHIDv2 (`2001:20::/28`)
|
||||
|| matches!(ip.segments(), [0x2001, b, _, _, _, _, _, _] if (0x20..=0x2F).contains(&b))
|
||||
))
|
||||
|| ((ip.segments()[0] == 0x2001) && (ip.segments()[1] == 0xdb8)) // ip.is_documentation()
|
||||
|| ((ip.segments()[0] & 0xfe00) == 0xfc00) //ip.is_unique_local()
|
||||
|| ((ip.segments()[0] & 0xffc0) == 0xfe80)) //ip.is_unicast_link_local()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "unstable"))]
|
||||
pub use is_global_hardcoded as is_global;
|
||||
|
||||
#[cfg(feature = "unstable")]
|
||||
#[inline(always)]
|
||||
pub fn is_global(ip: std::net::IpAddr) -> bool {
|
||||
ip.is_global()
|
||||
}
|
||||
|
||||
/// These are some tests to check that the implementations match
|
||||
/// The IPv4 can be all checked in 30 seconds or so and they are correct as of nightly 2023-07-17
|
||||
/// The IPV6 can't be checked in a reasonable time, so we check over a hundred billion random ones, so far correct
|
||||
/// Note that the is_global implementation is subject to change as new IP RFCs are created
|
||||
///
|
||||
/// To run while showing progress output:
|
||||
/// cargo +nightly test --release --features sqlite,unstable -- --nocapture --ignored
|
||||
#[cfg(test)]
|
||||
#[cfg(feature = "unstable")]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::net::IpAddr;
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_ipv4_global() {
|
||||
for a in 0..u8::MAX {
|
||||
println!("Iter: {}/255", a);
|
||||
for b in 0..u8::MAX {
|
||||
for c in 0..u8::MAX {
|
||||
for d in 0..u8::MAX {
|
||||
let ip = IpAddr::V4(std::net::Ipv4Addr::new(a, b, c, d));
|
||||
assert_eq!(
|
||||
ip.is_global(),
|
||||
is_global_hardcoded(ip),
|
||||
"IP mismatch: {}",
|
||||
ip
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_ipv6_global() {
|
||||
use rand::Rng;
|
||||
|
||||
std::thread::scope(|s| {
|
||||
for t in 0..16 {
|
||||
let handle = s.spawn(move || {
|
||||
let mut v = [0u8; 16];
|
||||
let mut rng = rand::thread_rng();
|
||||
|
||||
for i in 0..20 {
|
||||
println!("Thread {t} Iter: {i}/50");
|
||||
for _ in 0..500_000_000 {
|
||||
rng.fill(&mut v);
|
||||
let ip = IpAddr::V6(std::net::Ipv6Addr::from(v));
|
||||
assert_eq!(
|
||||
ip.is_global(),
|
||||
is_global_hardcoded(ip),
|
||||
"IP mismatch: {ip}"
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
@ -1,2 +1,3 @@
|
||||
pub mod detection;
|
||||
pub mod ip_addr;
|
||||
pub mod modules;
|
||||
|
@ -1,6 +1,7 @@
|
||||
use std::{net::IpAddr, str::FromStr};
|
||||
|
||||
use cidr::IpCidr;
|
||||
use rocket_ws::Message as RocketMessage;
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
@ -66,6 +67,26 @@ impl Into<WorkerMessages> for String {
|
||||
}
|
||||
}
|
||||
|
||||
impl Into<RocketMessage> for WorkerMessages {
|
||||
fn into(self) -> RocketMessage {
|
||||
RocketMessage::Text(self.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl TryInto<WorkerMessages> for RocketMessage {
|
||||
type Error = String;
|
||||
|
||||
fn try_into(self) -> Result<WorkerMessages, Self::Error> {
|
||||
match self {
|
||||
RocketMessage::Text(data) => {
|
||||
let data: WorkerMessages = data.into();
|
||||
Ok(data)
|
||||
}
|
||||
_ => Err("Only text is supported".to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use cidr::IpCidr;
|
||||
|
@ -2,11 +2,13 @@ use std::{env, net::IpAddr};
|
||||
|
||||
use chrono::{Duration, NaiveDateTime, Utc};
|
||||
use detection::detect_scanner;
|
||||
use dns_ptr_resolver::{get_ptr, ResolvedResult};
|
||||
use dns_ptr_resolver::get_ptr;
|
||||
use log2::*;
|
||||
use ws2::{Client, Pod, WebSocket};
|
||||
use tungstenite::stream::MaybeTlsStream;
|
||||
use tungstenite::{connect, Error, Message, WebSocket};
|
||||
|
||||
pub mod detection;
|
||||
pub mod ip_addr;
|
||||
pub mod modules;
|
||||
|
||||
use crate::detection::get_dns_client;
|
||||
@ -17,20 +19,69 @@ pub struct IpToResolve {
|
||||
pub address: IpAddr,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug)]
|
||||
pub struct Worker {
|
||||
pub authenticated: bool,
|
||||
pub tasks: Vec<IpToResolve>,
|
||||
pub last_request_for_work: Option<NaiveDateTime>,
|
||||
ws: WebSocket<MaybeTlsStream<std::net::TcpStream>>,
|
||||
}
|
||||
|
||||
impl Worker {
|
||||
pub fn initial() -> Worker {
|
||||
info!("New worker");
|
||||
impl Into<Worker> for WebSocket<MaybeTlsStream<std::net::TcpStream>> {
|
||||
fn into(self) -> Worker {
|
||||
let wait_time = std::time::Duration::from_secs(1);
|
||||
match self.get_ref() {
|
||||
tungstenite::stream::MaybeTlsStream::Plain(stream) => stream
|
||||
.set_read_timeout(Some(wait_time))
|
||||
.expect("set_nonblocking to work"),
|
||||
tungstenite::stream::MaybeTlsStream::NativeTls(stream) => {
|
||||
stream
|
||||
.get_ref()
|
||||
.set_read_timeout(Some(wait_time))
|
||||
.expect("set_nonblocking to work");
|
||||
()
|
||||
}
|
||||
_ => unimplemented!(),
|
||||
};
|
||||
Worker {
|
||||
authenticated: false,
|
||||
tasks: vec![],
|
||||
last_request_for_work: None,
|
||||
ws: self,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Worker {
|
||||
pub fn wait_for_messages(&mut self) -> Result<bool, Error> {
|
||||
self.tick();
|
||||
match self.ws.read() {
|
||||
Ok(server_request) => {
|
||||
match server_request {
|
||||
Message::Text(msg_string) => {
|
||||
self.receive_request(msg_string.into());
|
||||
}
|
||||
Message::Ping(data) => {
|
||||
let _ = self.ws.write(Message::Pong(data));
|
||||
}
|
||||
Message::Pong(_) => {}
|
||||
Message::Frame(_) => {}
|
||||
Message::Binary(_) => {}
|
||||
Message::Close(_) => {
|
||||
return Ok(true); // Break the processing loop
|
||||
}
|
||||
};
|
||||
Ok(false)
|
||||
}
|
||||
Err(err) => {
|
||||
match err {
|
||||
// Silently drop the error: Processing error: IO error: Resource temporarily unavailable (os error 11)
|
||||
// That occurs when no messages are to be read
|
||||
Error::Io(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(false),
|
||||
Error::Io(ref e) if e.kind() == std::io::ErrorKind::NotConnected => Ok(true), // Break the processing loop
|
||||
_ => Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -49,11 +100,11 @@ impl Worker {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn tick(&mut self, ws_client: &Client) -> &Worker {
|
||||
pub fn tick(&mut self) -> () {
|
||||
let mut request: Option<WorkerMessages> = None;
|
||||
if !self.is_authenticated() {
|
||||
request = Some(WorkerMessages::AuthenticateRequest {
|
||||
login: "williamdes".to_string(),
|
||||
login: env::var("WORKER_NAME").expect("The ENV WORKER_NAME should be set"),
|
||||
});
|
||||
} else {
|
||||
if self.last_request_for_work.is_none()
|
||||
@ -67,14 +118,13 @@ impl Worker {
|
||||
|
||||
// it has a request to send
|
||||
if let Some(request) = request {
|
||||
self.send_request(ws_client, request);
|
||||
self.send_request(request);
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub fn send_request(&mut self, ws_client: &Client, request: WorkerMessages) -> &Worker {
|
||||
pub fn send_request(&mut self, request: WorkerMessages) -> &Worker {
|
||||
let msg_string: String = request.to_string();
|
||||
match ws_client.send(msg_string) {
|
||||
match self.ws.send(Message::Text(msg_string)) {
|
||||
Ok(_) => {
|
||||
match request {
|
||||
WorkerMessages::AuthenticateRequest { login } => {
|
||||
@ -93,7 +143,7 @@ impl Worker {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn receive_request(&mut self, ws: &WebSocket, server_request: WorkerMessages) -> &Worker {
|
||||
pub fn receive_request(&mut self, server_request: WorkerMessages) -> &Worker {
|
||||
match server_request {
|
||||
WorkerMessages::DoWorkRequest { neworks } => {
|
||||
info!("Should work on: {:?}", neworks);
|
||||
@ -115,7 +165,7 @@ impl Worker {
|
||||
address: addr,
|
||||
};
|
||||
let msg_string: String = request.to_string();
|
||||
match ws.send(msg_string) {
|
||||
match self.ws.send(Message::Text(msg_string)) {
|
||||
Ok(_) => {}
|
||||
Err(err) => error!("Unable to send scanner result: {err}"),
|
||||
}
|
||||
@ -124,7 +174,7 @@ impl Worker {
|
||||
|
||||
Err(err) => error!("Error detecting for {addr}: {:?}", err),
|
||||
},
|
||||
Err(err) => {
|
||||
Err(_) => {
|
||||
//debug!("Error processing {addr}: {err}")
|
||||
}
|
||||
};
|
||||
@ -144,24 +194,6 @@ impl Worker {
|
||||
}
|
||||
}
|
||||
|
||||
impl ws2::Handler for Worker {
|
||||
fn on_open(&mut self, ws: &WebSocket) -> Pod {
|
||||
info!("Connected to: {ws}, starting to work");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn on_close(&mut self, ws: &WebSocket) -> Pod {
|
||||
info!("End of the work day: {ws}");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn on_message(&mut self, ws: &WebSocket, msg: String) -> Pod {
|
||||
let server_request: WorkerMessages = msg.clone().into();
|
||||
self.receive_request(ws, server_request);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn main() -> () {
|
||||
let _log2 = log2::stdout()
|
||||
.module(true)
|
||||
@ -171,23 +203,39 @@ fn main() -> () {
|
||||
})
|
||||
.start();
|
||||
info!("Running the worker");
|
||||
let url = "ws://127.0.0.1:8800";
|
||||
let mut worker = Worker::initial();
|
||||
match ws2::connect(url) {
|
||||
Ok(mut ws_client) => {
|
||||
let connected = ws_client.is_open();
|
||||
let url = match env::var("WORKER_URL") {
|
||||
Ok(worker_url) => worker_url,
|
||||
Err(_) => "ws://127.0.0.1:8800".to_string(),
|
||||
};
|
||||
|
||||
match connect(&url) {
|
||||
Ok((socket, response)) => {
|
||||
let connected = response.status() == 101;
|
||||
if connected {
|
||||
info!("Connected to: {url}");
|
||||
} else {
|
||||
info!("Connecting to: {url}");
|
||||
info!("Connecting replied {}: {url}", response.status());
|
||||
}
|
||||
|
||||
let mut worker: Worker = socket.into();
|
||||
loop {
|
||||
match ws_client.process(&mut worker, 0.5) {
|
||||
Ok(_) => {
|
||||
worker.tick(&ws_client);
|
||||
match worker.wait_for_messages() {
|
||||
Ok(true) => {
|
||||
error!("Stopping processing");
|
||||
break;
|
||||
}
|
||||
Err(err) => error!("Processing error: {err}"),
|
||||
Ok(false) => {
|
||||
// Keep processing
|
||||
}
|
||||
Err(tungstenite::Error::ConnectionClosed) => {
|
||||
error!("Stopping processing: connection closed");
|
||||
break;
|
||||
}
|
||||
Err(tungstenite::Error::AlreadyClosed) => {
|
||||
error!("Stopping processing: connection already closed");
|
||||
break;
|
||||
}
|
||||
Err(err) => error!("Processing error: {err} -> {:?}", err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user