diff --git a/snow-scanner/Cargo.toml b/snow-scanner/Cargo.toml index a7081c7..ce1dbf8 100644 --- a/snow-scanner/Cargo.toml +++ b/snow-scanner/Cargo.toml @@ -34,9 +34,9 @@ rouille = "3.6.2" hmac = "0.12.1" sha2 = "0.10.8" hex = "0.4.3" -rusqlite = { version = "0.31.0", features = ["bundled", "chrono"] } +diesel = { version = "2.2.0", default-features = false, features = ["mysql", "chrono", "uuid"] } dns-ptr-resolver = "1.2.0" hickory-client = { version = "0.24.1", default-features = false } chrono = "0.4.38" -uuid7 = "1.0.0" +uuid = { version = "1.10.0", default-features = false, features = ["v7", "serde", "std"] } cidr = "0.2.2" diff --git a/snow-scanner/src/main.rs b/snow-scanner/src/main.rs index e66b24a..b760079 100644 --- a/snow-scanner/src/main.rs +++ b/snow-scanner/src/main.rs @@ -1,31 +1,37 @@ +#![feature(trivial_bounds)] #[macro_use] extern crate rouille; use chrono::{NaiveDateTime, Utc}; -use cidr::IpCidr; +use diesel::deserialize::{self, FromSqlRow}; +use diesel::mysql::{Mysql, MysqlValue}; +use diesel::sql_types::Text; use hmac::{Hmac, Mac}; use rouille::{Request, Response, ResponseBody}; -use rusqlite::types::ToSqlOutput; -use rusqlite::Error as RusqliteError; -use rusqlite::{named_params, Connection, OpenFlags, Result, ToSql}; use sha2::Sha256; +use std::io::Write; use std::str::FromStr; -use std::sync::Mutex; -use std::time::Duration; use std::{env, fmt, thread}; -use uuid7::Uuid; +use uuid::Uuid; use hickory_client::client::SyncClient; use hickory_client::rr::Name; use hickory_client::tcp::TcpClientConnection; +use diesel::serialize::IsNull; +use diesel::{serialize, Connection, MysqlConnection}; use dns_ptr_resolver::{get_ptr, ResolvedResult}; +pub mod models; +pub mod schema; + +use crate::models::*; + // Create alias for HMAC-SHA256 type HmacSha256 = Hmac; -#[derive(Debug, Clone)] -enum Scanners { +#[derive(Debug, Clone, Copy, FromSqlRow)] +pub enum Scanners { Stretchoid, Binaryedge, Censys, @@ -45,9 +51,13 @@ impl IsStatic for Scanners { } } } + +#[derive(Debug, PartialEq, Eq)] +struct ParseScannerError; + impl FromStr for Scanners { - type Err = (); - fn from_str(input: &str) -> Result { + type Err = ParseScannerError; + fn from_str(input: &str) -> Result { match input { "stretchoid" => Ok(Scanners::Stretchoid), "binaryedge" => Ok(Scanners::Binaryedge), @@ -55,7 +65,7 @@ impl FromStr for Scanners { "binaryedge.txt" => Ok(Scanners::Binaryedge), "censys.txt" => Ok(Scanners::Censys), "internet-measurement.com.txt" => Ok(Scanners::InternetMeasurement), - _ => Err(()), + _ => Err(ParseScannerError {}), } } } @@ -74,90 +84,32 @@ impl fmt::Display for Scanners { ) } } -impl ToSql for Scanners { - /// Converts Rust value to SQLite value - fn to_sql(&self) -> Result> { - match self { - Self::Stretchoid => Ok("stretchoid".into()), - Self::Binaryedge => Ok("binaryedge".into()), - Self::Censys => Ok("censys".into()), - Self::InternetMeasurement => Ok("internet-measurement.com".into()), + +impl serialize::ToSql for Scanners { + fn to_sql(&self, out: &mut serialize::Output) -> serialize::Result { + match *self { + Self::Stretchoid => out.write_all(b"stretchoid")?, + Self::Binaryedge => out.write_all(b"binaryedge")?, + Self::Censys => out.write_all(b"censys")?, + Self::InternetMeasurement => out.write_all(b"internet-measurement.com")?, + }; + + Ok(IsNull::No) + } +} + +impl deserialize::FromSql for Scanners { + fn from_sql(bytes: MysqlValue) -> deserialize::Result { + let value = >::from_sql(bytes)?; + match &value as &str { + "stretchoid" => Ok(Scanners::Stretchoid), + "binaryedge" => Ok(Scanners::Binaryedge), + "internet-measurement.com" => Ok(Scanners::InternetMeasurement), + _ => Err("Unrecognized enum variant".into()), } } } -#[derive(Debug)] -struct Scanner { - ip: String, - ip_type: u8, - scanner_name: Scanners, - ip_ptr: Option, - created_at: NaiveDateTime, - updated_at: Option, - last_seen_at: Option, - last_checked_at: Option, -} - -#[derive(Debug)] -struct ScanTask { - task_group_id: Uuid, - cidr: String, - created_by_username: String, - created_at: NaiveDateTime, - updated_at: Option, - started_at: Option, - still_processing_at: Option, - ended_at: Option, -} - -fn save_scanner(conn: &Connection, scanner: &Scanner) -> Result<(), ()> { - match conn.execute( - "INSERT INTO scanners (ip, ip_type, ip_ptr, scanner_name, created_at, updated_at, last_seen_at, last_checked_at) - VALUES (:ip, :ip_type, :ip_ptr, :scanner_name, :created_at, :updated_at, :last_seen_at, :last_checked_at) - ON CONFLICT(ip, ip_type) DO UPDATE SET updated_at = :updated_at, last_seen_at = :last_seen_at, last_checked_at = :last_checked_at, ip_ptr = :ip_ptr;", - named_params! { - ":ip": &scanner.ip, - ":ip_type": &scanner.ip_type, - ":ip_ptr": &scanner.ip_ptr, - ":scanner_name": &scanner.scanner_name, - ":created_at": &scanner.created_at.to_string(), - ":updated_at": &scanner.updated_at, - ":last_seen_at": &scanner.last_seen_at, - ":last_checked_at": &scanner.last_checked_at - }, - ) { - Ok(_) => { - Ok(()) - }, - Err(_) => { - Err(()) - }, - } -} - -fn save_scan_task(conn: &Connection, scan_task: &ScanTask) -> Result<(), RusqliteError> { - match conn.execute( - "INSERT INTO scan_tasks (task_group_id, cidr, created_by_username, created_at, updated_at, started_at, ended_at, still_processing_at) - VALUES (:task_group_id, :cidr, :created_by_username, :created_at, :updated_at, :started_at, :ended_at, :still_processing_at) - ON CONFLICT(cidr, task_group_id) DO UPDATE SET updated_at = :updated_at, started_at = :started_at, ended_at = :ended_at, still_processing_at = :still_processing_at;", - named_params! { - ":task_group_id": &scan_task.task_group_id.to_string(), - ":cidr": &scan_task.cidr, - ":created_by_username": &scan_task.created_by_username, - ":created_at": &scan_task.created_at.to_string(), - ":updated_at": &scan_task.updated_at, - ":started_at": &scan_task.started_at, - ":ended_at": &scan_task.ended_at, - ":still_processing_at": &scan_task.still_processing_at, - }, - ) { - Ok(_) => { - Ok(()) - }, - Err(err) => Err(err), - } -} - fn detect_scanner(ptr_result: &ResolvedResult) -> Result { match ptr_result.result { Some(ref x) @@ -176,42 +128,7 @@ fn detect_scanner(ptr_result: &ResolvedResult) -> Result { } } -fn handle_ip2(conn: &Connection, ip: String) -> Result> { - let query_address = ip.parse().expect(format!("To parse: {}", ip).as_str()); - - let client = get_dns_client(); - let ptr_result: ResolvedResult = if let Ok(res) = get_ptr(query_address, client) { - res - } else { - return Err(None); - }; - - match detect_scanner(&ptr_result) { - Ok(scanner_name) => { - let ip_type = if ip.contains(':') { 6 } else { 4 }; - let scanner = Scanner { - ip: ip, - ip_type: ip_type, - scanner_name: scanner_name.clone(), - ip_ptr: match ptr_result.result { - Some(ptr) => Some(ptr.to_string()), - None => None, - }, - created_at: Utc::now().naive_utc(), - updated_at: None, - last_seen_at: None, - last_checked_at: None, - }; - let db = conn; - save_scanner(&db, &scanner).unwrap(); - Ok(scanner) - } - - Err(_) => Err(Some(ptr_result)), - } -} - -fn handle_ip(conn: &Mutex, ip: String) -> Result> { +fn handle_ip(conn: &mut MysqlConnection, ip: String) -> Result> { let query_address = ip.parse().expect("To parse"); let client = get_dns_client(); @@ -237,9 +154,11 @@ fn handle_ip(conn: &Mutex, ip: String) -> Result Ok(scanner), + Err(_) => Err(None), + } } Err(_) => Err(Some(ptr_result)), @@ -269,7 +188,7 @@ static FORM: &str = r#" "#; -fn handle_scan(conn: &Mutex, request: &Request) -> Response { +fn handle_scan(conn: &mut MysqlConnection, request: &Request) -> Response { let data = try_or_400!(post_input!(request, { username: String, ips: String, @@ -284,8 +203,7 @@ fn handle_scan(conn: &Mutex, request: &Request) -> Response { }; } - let db = conn.lock().unwrap(); - let task_group_id = uuid7::uuid7(); + let task_group_id: Uuid = Uuid::now_v7(); for ip in data.ips.lines() { let scan_task = ScanTask { @@ -298,7 +216,7 @@ fn handle_scan(conn: &Mutex, request: &Request) -> Response { still_processing_at: None, ended_at: None, }; - match save_scan_task(&db, &scan_task) { + match scan_task.save(conn) { Ok(_) => println!("Added {}", ip.to_string()), Err(err) => eprintln!("Not added: {:?}", err), } @@ -307,7 +225,7 @@ fn handle_scan(conn: &Mutex, request: &Request) -> Response { rouille::Response::html(format!("New task added: {} !", task_group_id)) } -fn handle_report(conn: &Mutex, request: &Request) -> Response { +fn handle_report(conn: &mut MysqlConnection, request: &Request) -> Response { let data = try_or_400!(post_input!(request, { ip: String, })); @@ -356,7 +274,7 @@ fn handle_get_collection(request: &Request, static_data_dir: &str) -> Response { } fn handle_list_scanners( - conn: &Mutex, + conn: &mut MysqlConnection, static_data_dir: &str, scanner_name: Scanners, request: &Request, @@ -379,21 +297,20 @@ fn handle_list_scanners( upgrade: None, }; } - let db = conn.lock().unwrap(); - let mut stmt = db.prepare("SELECT ip FROM scanners WHERE scanner_name = :scanner_name ORDER BY ip_type, created_at").unwrap(); - let mut rows = stmt - .query(named_params! { ":scanner_name": scanner_name }) - .unwrap(); - let mut ips: Vec = vec![]; - while let Some(row) = rows.next().unwrap() { - ips.push(row.get(0).unwrap()); - } - - Response { - status_code: 200, - headers: vec![("Content-Type".into(), "text/plain; charset=utf-8".into())], - data: ResponseBody::from_string(ips.join("\n")), - upgrade: None, + if let Ok(scanners) = Scanner::list_names(scanner_name, conn) { + Response { + status_code: 200, + headers: vec![("Content-Type".into(), "text/plain; charset=utf-8".into())], + data: ResponseBody::from_string(scanners.join("\n")), + upgrade: None, + } + } else { + Response { + status_code: 500, + headers: vec![("Content-Type".into(), "text/plain; charset=utf-8".into())], + data: ResponseBody::from_string("Unable to list scanners"), + upgrade: None, + } } } @@ -422,90 +339,49 @@ static SCAN_TASKS_FOOT: &str = r#" "#; -fn handle_list_scan_tasks(conn: &Mutex) -> Response { - let db = conn.lock().unwrap(); - - let mut stmt = db - .prepare( - r#" - SELECT task_group_id, cidr, created_by_username, started_at, still_processing_at, ended_at - FROM scan_tasks - ORDER BY created_at, task_group_id ASC - "#, - ) - .unwrap(); - let mut rows = stmt.query(named_params! {}).unwrap(); +fn handle_list_scan_tasks(conn: &mut MysqlConnection) -> Response { let mut html_data: Vec = vec![SCAN_TASKS_HEAD.to_string()]; - while let Some(row) = rows.next().unwrap() { - let cidr: String = row.get(1).unwrap(); - let started_at: Option = row.get(3).unwrap(); - let still_processing_at: Option = row.get(4).unwrap(); - let ended_at: Option = row.get(5).unwrap(); - html_data.push(format!( - " - - {cidr} - {:#?} - {:#?} - {:#?} - - ", - started_at, still_processing_at, ended_at - )); - } + if let Ok(scan_tasks) = ScanTask::list(conn) { + for row in scan_tasks { + let cidr: String = row.cidr; + let started_at: Option = row.started_at; + let still_processing_at: Option = row.still_processing_at; + let ended_at: Option = row.ended_at; + html_data.push(format!( + " + + {cidr} + {:#?} + {:#?} + {:#?} + + ", + started_at, still_processing_at, ended_at + )); + } - html_data.push(SCAN_TASKS_FOOT.to_string()); + html_data.push(SCAN_TASKS_FOOT.to_string()); - Response { - status_code: 200, - headers: vec![("Content-Type".into(), "text/html; charset=utf-8".into())], - data: ResponseBody::from_string(html_data.join("\n")), - upgrade: None, + Response { + status_code: 200, + headers: vec![("Content-Type".into(), "text/html; charset=utf-8".into())], + data: ResponseBody::from_string(html_data.join("\n")), + upgrade: None, + } + } else { + Response { + status_code: 500, + headers: vec![("Content-Type".into(), "text/plain; charset=utf-8".into())], + data: ResponseBody::from_string("Unable to list scan tasks"), + upgrade: None, + } } } -fn get_connection(db_path: &str) -> Connection { - let conn = Connection::open_with_flags( - db_path, - OpenFlags::SQLITE_OPEN_READ_WRITE - | OpenFlags::SQLITE_OPEN_CREATE - | OpenFlags::SQLITE_OPEN_FULL_MUTEX, - ) - .unwrap(); - conn.execute( - "CREATE TABLE IF NOT EXISTS scanners ( - ip VARCHAR(255) NOT NULL, - ip_type TINYINT(1) NOT NULL, - scanner_name VARCHAR(255) NOT NULL, - ip_ptr VARCHAR(255) NULL, - created_at DATETIME NOT NULL, - updated_at DATETIME NULL, - last_seen_at DATETIME NULL, - last_checked_at DATETIME NULL, - PRIMARY KEY (ip, ip_type) - )", - (), // empty list of parameters. - ) - .unwrap(); - conn.execute( - "CREATE TABLE IF NOT EXISTS scan_tasks ( - task_group_id VARCHAR(255) NOT NULL, - cidr VARCHAR(255) NOT NULL, - created_by_username VARCHAR(255) NOT NULL, - created_at DATETIME NOT NULL, - updated_at DATETIME NULL, - started_at DATETIME NULL, - still_processing_at DATETIME NULL, - ended_at DATETIME NULL, - PRIMARY KEY (task_group_id, cidr) - )", - (), // empty list of parameters. - ) - .unwrap(); - conn.pragma_update_and_check(None, "journal_mode", &"WAL", |_| Ok(())) - .unwrap(); - conn +fn get_connection(database_url: &str) -> MysqlConnection { + MysqlConnection::establish(&database_url) + .unwrap_or_else(|_| panic!("Error connecting to {}", database_url)) } fn get_dns_client() -> SyncClient { @@ -515,7 +391,7 @@ fn get_dns_client() -> SyncClient { SyncClient::new(dns_conn) } -fn main() -> Result<()> { +fn main() -> Result<(), ()> { let server_address: String = if let Ok(env) = env::var("SERVER_ADDRESS") { env } else { @@ -524,7 +400,7 @@ fn main() -> Result<()> { println!("Now listening on {}", server_address); - let db_file: String = if let Ok(env) = env::var("DB_FILE") { + let db_url: String = if let Ok(env) = env::var("DB_URL") { env } else { "./snow-scanner.sqlite".to_string() @@ -535,16 +411,10 @@ fn main() -> Result<()> { Err(_) => "../data/".to_string(), }; - println!("Database will be saved at: {}", db_file); - - let conn = Mutex::new(get_connection(db_file.as_str())); - conn.lock() - .unwrap() - .execute("SELECT 0 WHERE 0;", named_params! {}) - .expect("Failed to initialize database"); - + let conn = &mut get_connection(db_url.as_str()); + /* thread::spawn(move || { - let conn = get_connection(db_file.as_str()); + let conn = &mut get_connection(db_url.as_str()); // Reset scan tasks let _ = conn.execute("UPDATE scan_tasks SET updated_at = :updated_at, still_processing_at = NULL, started_at = NULL WHERE (still_processing_at IS NOT NULL OR started_at IS NOT NULL) AND ended_at IS NULL", named_params! { @@ -572,7 +442,7 @@ fn main() -> Result<()> { let count = addresses.count(); let mut current = 0; for addr in addresses { - match handle_ip2(&conn, addr.to_string()) { + match handle_ip(conn, addr.to_string()) { Ok(scanner) => println!("Processed {}", scanner.ip), Err(_) => println!("Processed {}", addr), } @@ -599,7 +469,7 @@ fn main() -> Result<()> { let two_hundred_millis = Duration::from_millis(500); thread::sleep(two_hundred_millis); } - }); + });*/ rouille::start_server(server_address, move |request| { router!(request, @@ -611,10 +481,10 @@ fn main() -> Result<()> { rouille::Response::text("pong") }, - (POST) (/report) => {handle_report(&conn, &request)}, - (POST) (/scan) => {handle_scan(&conn, &request)}, + (POST) (/report) => {handle_report(conn, &request)}, + (POST) (/scan) => {handle_scan(conn, &request)}, (GET) (/scan/tasks) => { - handle_list_scan_tasks(&conn) + handle_list_scan_tasks(conn) }, (POST) (/register) => { @@ -642,7 +512,7 @@ fn main() -> Result<()> { }, (GET) (/scanners/{scanner_name: Scanners}) => { - handle_list_scanners(&conn, &static_data_dir, scanner_name, &request) + handle_list_scanners(conn, &static_data_dir, scanner_name, &request) }, (GET) (/collections/{vendor_name: String}/{file_name: String}) => { handle_get_collection(&request, &static_data_dir) diff --git a/snow-scanner/src/models.rs b/snow-scanner/src/models.rs new file mode 100644 index 0000000..8f3b2c0 --- /dev/null +++ b/snow-scanner/src/models.rs @@ -0,0 +1,172 @@ +use std::str::FromStr; + +use crate::Scanners; +use chrono::NaiveDateTime; +use diesel::deserialize::FromSqlRow; +use diesel::dsl::insert_into; +use diesel::prelude::*; +use diesel::result::Error as DieselError; +use uuid::Uuid; + +use crate::schema::scan_tasks::dsl::scan_tasks; +use crate::schema::scanners::dsl::scanners; + +#[derive(Queryable, Selectable, Debug)] +#[diesel(table_name = crate::schema::scanners)] +#[diesel(check_for_backend(diesel::mysql::Mysql))] +pub struct Scanner { + pub ip: String, + pub ip_type: u8, + pub scanner_name: Scanners, + pub ip_ptr: Option, + pub created_at: NaiveDateTime, + pub updated_at: Option, + pub last_seen_at: Option, + pub last_checked_at: Option, +} + +impl Scanner { + pub fn list_names( + scanner_name: Scanners, + conn: &mut MysqlConnection, + ) -> Result, DieselError> { + use crate::schema::scanners; + use crate::schema::scanners::ip; + + scanners + .select(ip) + .filter(scanners::scanner_name.eq(scanner_name.to_string())) + .order((scanners::ip_type.desc(), scanners::created_at.desc())) + .load::(conn) + } + + pub fn save(self: Scanner, conn: &mut MysqlConnection) -> Result { + let new_scanner = NewScanner::from_scanner(&self); + match insert_into(scanners) + .values(&new_scanner) + .on_conflict(diesel::dsl::DuplicatedKeys) + .do_update() + .set(&new_scanner) + .execute(conn) + { + Ok(_) => Ok(self), + Err(err) => Err(err), + } + } +} + +#[derive(Debug, Insertable, AsChangeset)] +#[diesel(table_name = crate::schema::scanners)] +#[diesel(check_for_backend(diesel::mysql::Mysql))] +pub struct NewScanner { + pub ip: String, + pub ip_type: u8, + pub scanner_name: String, + pub ip_ptr: Option, + pub created_at: NaiveDateTime, + pub updated_at: Option, + pub last_seen_at: Option, + pub last_checked_at: Option, +} + +impl NewScanner { + pub fn from_scanner<'x>(scanner: &Scanner) -> NewScanner { + NewScanner { + ip: scanner.ip.to_string(), + ip_type: scanner.ip_type, + scanner_name: scanner.scanner_name.to_string(), + ip_ptr: scanner.ip_ptr.to_owned(), + created_at: scanner.created_at, + updated_at: scanner.updated_at, + last_seen_at: scanner.last_seen_at, + last_checked_at: scanner.last_checked_at, + } + } +} + +#[derive(Queryable, Selectable, Debug)] +#[diesel(table_name = crate::schema::scan_tasks)] +#[diesel(check_for_backend(diesel::mysql::Mysql))] +pub struct ScanTask { + pub task_group_id: uuid::Uuid, + pub cidr: String, + pub created_by_username: String, + pub created_at: NaiveDateTime, + pub updated_at: Option, + pub started_at: Option, + pub still_processing_at: Option, + pub ended_at: Option, +} + +#[derive(Selectable, Debug, Queryable)] +#[diesel(table_name = crate::schema::scan_tasks)] +#[diesel(check_for_backend(diesel::mysql::Mysql))] +pub struct ScanTaskitem { + pub task_group_id: String, + pub cidr: String, + pub created_by_username: String, + pub started_at: Option, + pub still_processing_at: Option, + pub ended_at: Option, +} + +impl ScanTask { + pub fn list(conn: &mut MysqlConnection) -> Result, DieselError> { + use crate::schema::scan_tasks; + + let res = scan_tasks + .select(ScanTaskitem::as_select()) + .order(( + scan_tasks::created_at.desc(), + scan_tasks::task_group_id.asc(), + )) + .load::(conn); + match res { + Ok(rows) => Ok(rows), + Err(err) => Err(err), + } + } + + pub fn save(self: &ScanTask, conn: &mut MysqlConnection) -> Result<(), DieselError> { + let new_scan_task = NewScanTask::from_scan_task(self); + match insert_into(scan_tasks) + .values(&new_scan_task) + .on_conflict(diesel::dsl::DuplicatedKeys) + .do_update() + .set(&new_scan_task) + .execute(conn) + { + Ok(_) => Ok(()), + Err(err) => Err(err), + } + } +} + +#[derive(Debug, Insertable, AsChangeset)] +#[diesel(table_name = crate::schema::scan_tasks)] +#[diesel(check_for_backend(diesel::mysql::Mysql))] +pub struct NewScanTask { + pub task_group_id: String, + pub cidr: String, + pub created_by_username: String, + pub created_at: NaiveDateTime, + pub updated_at: Option, + pub started_at: Option, + pub still_processing_at: Option, + pub ended_at: Option, +} + +impl NewScanTask { + pub fn from_scan_task<'x>(scan_task: &ScanTask) -> NewScanTask { + NewScanTask { + task_group_id: scan_task.task_group_id.to_string(), + cidr: scan_task.cidr.to_owned(), + created_by_username: scan_task.created_by_username.to_owned(), + created_at: scan_task.created_at, + updated_at: scan_task.updated_at, + started_at: scan_task.started_at, + still_processing_at: scan_task.still_processing_at, + ended_at: scan_task.ended_at, + } + } +} diff --git a/snow-scanner/src/schema.rs b/snow-scanner/src/schema.rs index a011a88..cdf7285 100644 --- a/snow-scanner/src/schema.rs +++ b/snow-scanner/src/schema.rs @@ -32,7 +32,4 @@ diesel::table! { } } -diesel::allow_tables_to_appear_in_same_query!( - scan_tasks, - scanners, -); +diesel::allow_tables_to_appear_in_same_query!(scan_tasks, scanners,);