37 Commits

Author SHA1 Message Date
5717f0ad85 Swap out blocked from France DNS servers
Some checks failed
Build IP lists / build-aws-cloudfront (push) Has been cancelled
2024-10-10 15:57:59 +02:00
1495950484 Re-work crate management 2024-10-10 15:09:16 +02:00
565e268d01 Fixup build 2024-10-10 12:08:33 +02:00
6b0c5467b6 Some re-working, adding security and fix handling shadowserver 2024-10-10 11:50:48 +02:00
8acf084467 Implement shadowserver.org 2024-10-10 10:30:01 +02:00
1783fe5c93 Improve the CIDR parsing before starting the task 2024-10-08 01:50:53 +02:00
dbbbdc4818 Fix the path to static scanners
Some checks failed
Build IP lists / Build scanners list (binaryedge) (push) Failing after -2m31s
Build IP lists / Build scanners list (stretchoid) (push) Failing after -2m33s
Build IP lists / build-aws-cloudfront (push) Failing after -2m32s
2024-10-08 01:30:45 +02:00
75dc88bcc1 Simplify and fixup implementations for paths 2024-10-08 01:18:26 +02:00
bc3f3fe34c Validate IP addresses before insert 2024-10-08 00:03:03 +02:00
32d1abdcee Implement saving the received results 2024-10-07 23:31:33 +02:00
f589d4c11e Broadcast work requests to each node 2024-10-07 00:15:29 +02:00
e5c3b38121 Finally make it capable of talking to all hosts 2024-10-06 23:56:09 +02:00
2a7ea4c969 Implement broadcast to all nodes 2024-10-03 21:22:36 +02:00
8bf201b3e5 Better websocket processing 2024-10-03 13:56:06 +02:00
fd4d43596f Re-implement the ws client 2024-10-03 12:49:33 +02:00
25df2642e9 Move to Rocket ! 2024-10-03 10:34:56 +02:00
04aea8558f Move worker to tungstenite and add WORKER_NAME ENV 2024-09-28 01:09:48 +02:00
cad1073448 Apply formatting 2024-09-27 22:28:54 +02:00
c01177e4c8 Add tasks not started query 2024-09-27 22:28:40 +02:00
d84918851b Split the worker to a sub package 2024-09-27 22:28:23 +02:00
36468e4baf Support ENV WORKER_URL for CLI 2024-09-27 21:02:49 +02:00
b731f6dc21 Fix value validation
Some checks failed
Build IP lists / Build scanners list (binaryedge) (push) Failing after 11m59s
Build IP lists / Build scanners list (stretchoid) (push) Failing after 19m21s
Build IP lists / build-aws-cloudfront (push) Failing after 26m43s
2024-09-27 20:49:10 +02:00
de3b21e210 Make a working scan worker/server 2024-09-24 04:20:39 +02:00
39d9ffe1db Make a working client server 2024-09-24 01:37:00 +02:00
27c3f7ecd1 Wrap the data into a request type 2024-09-23 22:34:10 +02:00
58d6ed043e First working version of client and server 2024-09-23 17:20:50 +02:00
e48493cf6a Fix IP reporting and update the row
Some checks failed
Build IP lists / Build scanners list (binaryedge) (push) Failing after -2m8s
Build IP lists / Build scanners list (stretchoid) (push) Failing after -2m10s
Build IP lists / build-aws-cloudfront (push) Failing after -2m10s
2024-09-23 00:06:03 +02:00
d6757902f6 Upgrade dns-ptr-resolver 2024-09-22 23:32:10 +02:00
43e9176b49 Make collections search safer 2024-09-22 10:14:56 +02:00
bb52edc4c8 Fix path for files 2024-09-20 21:14:50 +02:00
299621ee6f Re-write all using actix
Some checks failed
Build IP lists / Build scanners list (binaryedge) (push) Failing after -2m0s
Build IP lists / Build scanners list (stretchoid) (push) Failing after -2m2s
Build IP lists / build-aws-cloudfront (push) Failing after -2m2s
2024-09-20 17:45:04 +02:00
110484a967 Re-write all the data management with Diesel 2024-09-20 00:51:24 +02:00
18bd7ce3ab Write migrations with Diesel 2024-09-20 00:07:01 +02:00
2f297a3557 Add one more scanner robot 2024-09-19 01:49:04 +02:00
abd2bda8e0 docs: about PDFs
Some checks failed
Build IP lists / Build scanners list (binaryedge) (push) Failing after -1m54s
Build IP lists / Build scanners list (stretchoid) (push) Failing after -1m53s
Build IP lists / build-aws-cloudfront (push) Failing after -1m55s
2024-07-03 14:23:34 +02:00
543a7fd3f1 New collections
Some checks failed
Build IP lists / Build scanners list (binaryedge) (push) Failing after 7m6s
Build IP lists / Build scanners list (stretchoid) (push) Failing after 7m40s
Build IP lists / build-aws-cloudfront (push) Failing after 1m36s
2024-06-29 14:28:23 +02:00
cf05597a30 Cleanup the not finished tasks 2024-06-28 19:07:13 +02:00
25 changed files with 2303 additions and 635 deletions

View File

@ -11,79 +11,6 @@ on:
- cron: "30 0 */5 * *"
jobs:
build-scanners-list:
name: Build scanners list
environment:
name: sudo-bot
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
type: ["stretchoid", "binaryedge"]
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Cache cargo binaries
uses: actions/cache@v4
id: cache-dns-ptr-resolver
with:
path: ~/.cargo/bin/dns-ptr-resolver
key: ${{ runner.os }}-cargo-bin-dns-ptr-resolver-1.1.0
- name: Set up toolchain
if: steps.cache-dns-ptr-resolver.outputs.cache-hit != 'true'
uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: 1.67
override: true
- name: Install dns-ptr-resolver
if: steps.cache-dns-ptr-resolver.outputs.cache-hit != 'true'
run: cargo install dns-ptr-resolver@1.1.0
- name: Build the ${{ matrix.type }} list
run: ./make-${{ matrix.type }}.sh
- name: Post the summary
run: |
git add -A
printf '### Diff\n```diff\n%s\n```\n' "$(git diff --staged)" >> $GITHUB_STEP_SUMMARY
- name: Extract secrets
run: |
printf '%s' "${{ secrets.GH_APP_JWT_PRIV_PEM_CONTENTS }}" > ${HOME}/.secret_jwt.pem
printf '%s' "${{ secrets.GPG_PRIVATE_KEY }}" > ${HOME}/.private-key.asc
- uses: actions/setup-node@v4
with:
node-version: 18
- name: Get yarn cache directory path
id: yarn-cache-dir-path
run: echo "dir=$(yarn cache dir)" >> $GITHUB_OUTPUT
- name: yarn cache
uses: actions/cache@v4
with:
path: ${{ steps.yarn-cache-dir-path.outputs.dir }}
key: ${{ runner.os }}-yarn-${{ hashFiles('**/yarn.lock') }}
restore-keys: |
${{ runner.os }}-yarn-
- name: Install sudo-bot
run: yarn global add sudo-bot
- name: Run sudo-bot
run: |
sudo-bot --verbose \
--jwt-file="${HOME}/.secret_jwt.pem" \
--gh-app-id='17453' \
--installation-id="${{ secrets.INSTALLATION_ID }}" \
--repository-slug='wdes/security' \
--target-branch='main' \
--assign='williamdes' \
--commit-author-email='sudo-bot@wdes.fr' \
--commit-author-name='Sudo Bot' \
--gpg-private-key-file="${HOME}/.private-key.asc" \
--template="$GITHUB_WORKSPACE/.github/sudo-bot-template.js" \
--gpg-private-key-passphrase="${{ secrets.GPG_PASSPHRASE }}"
- name: Purge secrets
if: always()
run: |
rm -v ${HOME}/.secret_jwt.pem
rm -v ${HOME}/.private-key.asc
build-aws-cloudfront:
runs-on: ubuntu-latest
steps:

17
PDF-SECURITY.md Normal file
View File

@ -0,0 +1,17 @@
# PDF security
## Links
- https://web-in-security.blogspot.com/2021/01/insecure-features-in-pdfs.html
- https://github.com/corkami/pocs/tree/master/pdf
- https://insert-script.blogspot.com/2019/01/adobe-reader-pdf-callback-via-xslt.html
- https://github.com/PortSwigger/portable-data-exfiltration/tree/main
- https://insert-script.blogspot.com/2015/05/pdf-mess-with-web.html
- https://portswigger.net/research/portable-data-exfiltration
- https://github.com/jonaslejon/malicious-pdf/issues/13
- https://github.com/michelcrypt4d4mus/pdfalyzer/blob/master/pdfalyzer/yara_rules/lprat.static_file_analysis.yara
- https://github.com/michelcrypt4d4mus/pdfalyzer
- https://www.sentinelone.com/blog/malicious-pdfs-revealing-techniques-behind-attacks/
- https://github.com/pdf-association/safedocs/tree/main/Miscellaneous%20Targeted%20Test%20PDFs
- https://opensource.adobe.com/dc-acrobat-sdk-docs/pdfstandards/PDF32000_2008.pdf
- https://github.com/J-F-Liu/lopdf/issues/142

View File

@ -2,12 +2,21 @@
## Security lists
### Scanners
- `https://security.wdes.eu/scanners/stretchoid.txt` (List of all known stretchoid IPs)
- `https://security.wdes.eu/scanners/binaryedge.txt` (List of all known binaryedge IPs)
- `https://security.wdes.eu/scanners/shadowserver.txt` (List of all known shadowserver IPs)
- `https://security.wdes.eu/scanners/censys.txt` (List of all IPs declared by censys scanner on their [FAQ](https://support.censys.io/hc/en-us/articles/360043177092-Opt-Out-of-Data-Collection)
- `https://security.wdes.eu/scanners/internet-measurement.com.txt` (List of all IPs declared by internet-measurement.com on [their website](https://internet-measurement.com/#ips))
### Collections (by vendor)
- `https://security.wdes.eu/collections/wdes/bad-networks.txt` (List of some hand picked bad networks)
- `https://security.wdes.eu/collections/bad-ips.txt` (List of some hand picked bad IPs that caused harm/attacks/scans to mail servers)
- `https://security.wdes.eu/collections/wdes/bad-ips.txt` (List of some hand picked bad IPs that caused harm/attacks/scans to mail servers)
- `https://security.wdes.eu/collections/microsoft/email-servers.txt` (List of the Microsoft IPs for it's mail servers)
- `https://security.wdes.eu/collections/amazon/cloudfront-ips.txt` (List of AWS CloudFront IPs)
## Other similar projects

View File

@ -152,3 +152,4 @@
79.124.60.142
185.242.226.41
162.216.18.113
59.110.115.16

View File

@ -3,7 +3,7 @@ name = "snow-scanner"
version = "0.1.0"
authors = ["William Desportes <williamdes@wdes.fr>"]
edition = "2021"
rust-version = "1.78.0" # MSRV
rust-version = "1.81.0" # MSRV
description = "A program to scan internet and find scanners"
homepage = "https://github.com/wdes/snow-scanner/tree/v1.2.0-dev#readme"
repository = "https://github.com/wdes/snow-scanner"
@ -23,20 +23,56 @@ is-it-maintained-issue-resolution = { repository = "security" }
is-it-maintained-open-issues = { repository = "security" }
maintenance = { status = "passively-maintained" }
# docker pull clux/muslrust:stable
# docker run -v $PWD:/volume --rm -t clux/muslrust:stable cargo build --release
[[bin]]
name = "snow-scanner"
path = "src/main.rs"
[workspace]
members = [
"src/worker"
]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
# Enable unstable features, requires nightly
# Currently only used to enable rusts official ip support
unstable = []
[dependencies]
rouille = "3.6.2"
hmac = "0.12.1"
sha2 = "0.10.8"
hex = "0.4.3"
rusqlite = { version = "0.31.0", features = ["bundled", "chrono"] }
dns-ptr-resolver = "1.2.0"
hickory-client = { version = "0.24.1", default-features = false }
rocket_db_pools = { git = "https://github.com/rwf2/Rocket/", rev = "3bf9ef02d6e803fe9f753777f5a829dda6d2453d", default-features = false, features = ["diesel_mysql"] }
snow-scanner-worker = {path = "./src/worker"}
diesel.workspace = true
dns-ptr-resolver.workspace = true
hickory-resolver.workspace = true
uuid.workspace = true
rocket.workspace = true
rocket_ws.workspace = true
ws.workspace = true
chrono.workspace = true
serde.workspace = true
serde_json.workspace = true
cidr.workspace = true
weighted-rs.workspace = true
[workspace.dependencies]
# mariadb-dev on Alpine
# "mysqlclient-src" "mysql_backend"
diesel = { version = "^2", default-features = false, features = ["mysql", "chrono", "uuid"] }
ws = { package = "rocket_ws", version = "0.1.1" }
dns-ptr-resolver = {git = "https://github.com/wdes/dns-ptr-resolver.git"}
hickory-resolver = { version = "0.24.1", default-features = false, features = ["tokio-runtime", "dns-over-h3", "dns-over-https", "dns-over-quic"]}
rocket = { git = "https://github.com/rwf2/Rocket/", rev = "3bf9ef02d6e803fe9f753777f5a829dda6d2453d"}
rocket_ws = { git = "https://github.com/rwf2/Rocket/", rev = "3bf9ef02d6e803fe9f753777f5a829dda6d2453d"}
chrono = "0.4.38"
uuid7 = "1.0.0"
cidr = "0.2.2"
uuid = { version = "1.10.0", default-features = false, features = ["v7", "serde", "std"] }
cidr = "0.3.0"
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"
weighted-rs = "0.1.3"

9
snow-scanner/diesel.toml Normal file
View File

@ -0,0 +1,9 @@
# For documentation on how to configure this file,
# see https://diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "src/schema.rs"
custom_type_derives = ["diesel::query_builder::QueryId", "Clone"]
[migrations_directory]
dir = "/mnt/Dev/@wdes/security.wdes.eu/snow-scanner/migrations"

View File

@ -0,0 +1 @@
DROP TABLE `scanners`;

View File

@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS `scanners` (
ip VARCHAR(255) NOT NULL,
ip_type TINYINT(1) UNSIGNED NOT NULL,
scanner_name VARCHAR(255) NOT NULL,
ip_ptr VARCHAR(255) NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME NULL,
last_seen_at DATETIME NULL,
last_checked_at DATETIME NULL,
PRIMARY KEY (ip, ip_type)
);

View File

@ -0,0 +1 @@
DROP TABLE `scan_tasks`;

View File

@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS `scan_tasks` (
task_group_id VARCHAR(255) NOT NULL,
cidr VARCHAR(255) NOT NULL,
created_by_username VARCHAR(255) NOT NULL,
created_at DATETIME NOT NULL,
updated_at DATETIME NULL,
started_at DATETIME NULL,
still_processing_at DATETIME NULL,
ended_at DATETIME NULL,
PRIMARY KEY (task_group_id, cidr)
);

View File

@ -0,0 +1,135 @@
use std::{net::IpAddr, str::FromStr};
use crate::{DbConnection, SnowDb};
use hickory_resolver::Name;
use rocket::futures::channel::mpsc as rocket_mpsc;
use rocket::futures::StreamExt;
use rocket::tokio;
use snow_scanner_worker::detection::{detect_scanner_from_name, validate_ip};
use crate::Scanner;
/// Handles all the raw events being streamed from balancers and parses and filters them into only the events we care about.
pub struct EventBus {
events_rx: rocket_mpsc::Receiver<EventBusWriterEvent>,
events_tx: rocket_mpsc::Sender<EventBusWriterEvent>,
bus_tx: tokio::sync::broadcast::Sender<EventBusEvent>,
}
impl EventBus {
pub fn new() -> Self {
let (events_tx, events_rx) = rocket_mpsc::channel(100);
let (bus_tx, _) = tokio::sync::broadcast::channel(100);
Self {
events_rx,
events_tx,
bus_tx,
}
}
// db: &Connection<SnowDb>
pub async fn run(&mut self, mut conn: DbConnection<SnowDb>) {
info!("EventBus started");
loop {
tokio::select! {
Some(event) = self.events_rx.next() => {
self.handle_event(event, &mut conn).await;
}
else => {
warn!("EventBus stopped");
break;
}
}
}
}
async fn handle_event(&self, event: EventBusWriterEvent, db: &mut DbConnection<SnowDb>) {
info!("Received event");
if self.bus_tx.receiver_count() == 0 {
return;
}
match event {
EventBusWriterEvent::ScannerFoundResponse { name, address } => {
let ip: IpAddr = address.into();
if !validate_ip(ip) {
error!("Invalid IP address: {ip}");
return;
}
let name = Name::from_str(name.as_str()).unwrap();
match detect_scanner_from_name(&name) {
Ok(Some(scanner_type)) => {
match Scanner::find_or_new(ip, scanner_type, Some(name), db).await {
Ok(scanner) => {
let _ = scanner.save(db).await;
}
Err(err) => {
error!("Error find or save: {:?}", err);
}
}
}
Ok(None) => {
error!("No name detected for: {:?}", name);
}
Err(err) => {
error!("No name detected error: {:?}", err);
}
};
}
EventBusWriterEvent::BroadcastMessage(msg) => match self.bus_tx.send(msg) {
Ok(count) => {
info!("Event sent to {count} subscribers");
}
Err(err) => {
error!("Error sending event to subscribers: {}", err);
}
},
}
}
pub fn subscriber(&self) -> EventBusSubscriber {
EventBusSubscriber::new(self.bus_tx.clone())
}
pub fn writer(&self) -> EventBusWriter {
EventBusWriter::new(self.events_tx.clone())
}
}
pub type EventBusEvent = rocket_ws::Message;
/// Enables subscriptions to the event bus
pub struct EventBusSubscriber {
bus_tx: tokio::sync::broadcast::Sender<EventBusEvent>,
}
/// Enables subscriptions to the event bus
pub struct EventBusWriter {
bus_tx: rocket_mpsc::Sender<EventBusWriterEvent>,
}
pub enum EventBusWriterEvent {
BroadcastMessage(rocket_ws::Message),
ScannerFoundResponse { name: String, address: IpAddr },
}
impl EventBusWriter {
pub fn new(bus_tx: rocket_mpsc::Sender<EventBusWriterEvent>) -> Self {
Self { bus_tx }
}
pub fn write(&self) -> rocket_mpsc::Sender<EventBusWriterEvent> {
self.bus_tx.clone()
}
}
impl EventBusSubscriber {
pub fn new(bus_tx: tokio::sync::broadcast::Sender<EventBusEvent>) -> Self {
Self { bus_tx }
}
pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<EventBusEvent> {
self.bus_tx.subscribe()
}
}

File diff suppressed because it is too large Load Diff

248
snow-scanner/src/models.rs Normal file
View File

@ -0,0 +1,248 @@
use std::net::IpAddr;
use crate::{DbConn, Scanners};
use chrono::{NaiveDateTime, Utc};
use hickory_resolver::Name;
use rocket_db_pools::diesel::{dsl::insert_into, prelude::*, result::Error as DieselError};
use crate::schema::scan_tasks::dsl::scan_tasks;
use crate::schema::scanners::dsl::scanners;
#[derive(Queryable, Selectable, Debug)]
#[diesel(table_name = crate::schema::scanners)]
#[diesel(check_for_backend(diesel::mysql::Mysql))]
pub struct Scanner {
pub ip: String,
pub ip_type: u8,
pub scanner_name: Scanners,
pub ip_ptr: Option<String>,
pub created_at: NaiveDateTime,
pub updated_at: Option<NaiveDateTime>,
pub last_seen_at: Option<NaiveDateTime>,
pub last_checked_at: Option<NaiveDateTime>,
}
impl Scanner {
pub async fn find_or_new(
query_address: IpAddr,
scanner_name: Scanners,
ptr: Option<Name>,
conn: &mut DbConn,
) -> Result<Scanner, DieselError> {
let ip_type = if query_address.is_ipv6() { 6 } else { 4 };
let scanner_row_result = Scanner::find(query_address.to_string(), ip_type, conn).await;
let scanner_row = match scanner_row_result {
Ok(scanner_row) => scanner_row,
Err(err) => return Err(err),
};
let scanner = if let Some(mut scanner) = scanner_row {
scanner.last_seen_at = Some(Utc::now().naive_utc());
scanner.last_checked_at = Some(Utc::now().naive_utc());
scanner.updated_at = Some(Utc::now().naive_utc());
scanner
} else {
Scanner {
ip: query_address.to_string(),
ip_type: ip_type,
scanner_name: scanner_name.clone(),
ip_ptr: match ptr {
Some(ptr) => Some(ptr.to_string()),
None => None,
},
created_at: Utc::now().naive_utc(),
updated_at: None,
last_seen_at: None,
last_checked_at: None,
}
};
match scanner.save(conn).await {
Ok(scanner) => Ok(scanner),
Err(err) => Err(err),
}
}
pub async fn find(
ip_address: String,
ip_type: u8,
conn: &mut DbConn,
) -> Result<Option<Scanner>, DieselError> {
use crate::schema::scanners;
scanners
.select(Scanner::as_select())
.filter(scanners::ip.eq(ip_address))
.filter(scanners::ip_type.eq(ip_type))
.order((scanners::ip_type.desc(), scanners::created_at.desc()))
.first(conn)
.await
.optional()
}
pub async fn list_names(
scanner_name: Scanners,
conn: &mut DbConn,
) -> Result<Vec<String>, DieselError> {
use crate::schema::scanners;
use crate::schema::scanners::ip;
scanners
.select(ip)
.filter(scanners::scanner_name.eq(scanner_name.to_string()))
.order((scanners::ip_type.desc(), scanners::created_at.desc()))
.load::<String>(conn)
.await
}
pub async fn save(self: Scanner, conn: &mut DbConn) -> Result<Scanner, DieselError> {
use crate::schema::scanners;
let new_scanner: NewScanner = NewScanner::from_scanner(&self).await;
match insert_into(scanners::table)
.values(&new_scanner)
.on_conflict(diesel::dsl::DuplicatedKeys)
.do_update()
.set(&new_scanner)
.execute(conn)
.await
{
Ok(_) => Ok(self),
Err(err) => Err(err),
}
}
}
#[derive(Debug, Insertable, AsChangeset)]
#[diesel(table_name = crate::schema::scanners)]
#[diesel(check_for_backend(diesel::mysql::Mysql))]
pub struct NewScanner {
pub ip: String,
pub ip_type: u8,
pub scanner_name: String,
pub ip_ptr: Option<String>,
pub created_at: NaiveDateTime,
pub updated_at: Option<NaiveDateTime>,
pub last_seen_at: Option<NaiveDateTime>,
pub last_checked_at: Option<NaiveDateTime>,
}
impl NewScanner {
pub async fn from_scanner<'x>(scanner: &Scanner) -> NewScanner {
NewScanner {
ip: scanner.ip.to_string(),
ip_type: scanner.ip_type,
scanner_name: scanner.scanner_name.to_string(),
ip_ptr: scanner.ip_ptr.to_owned(),
created_at: scanner.created_at,
updated_at: scanner.updated_at,
last_seen_at: scanner.last_seen_at,
last_checked_at: scanner.last_checked_at,
}
}
}
#[derive(Queryable, Selectable, Debug)]
#[diesel(table_name = crate::schema::scan_tasks)]
#[diesel(check_for_backend(diesel::mysql::Mysql))]
pub struct ScanTask {
pub task_group_id: String,
pub cidr: String,
pub created_by_username: String,
pub created_at: NaiveDateTime,
pub updated_at: Option<NaiveDateTime>,
pub started_at: Option<NaiveDateTime>,
pub still_processing_at: Option<NaiveDateTime>,
pub ended_at: Option<NaiveDateTime>,
}
#[derive(Selectable, Debug, Queryable)]
#[diesel(table_name = crate::schema::scan_tasks)]
#[diesel(check_for_backend(diesel::mysql::Mysql))]
pub struct ScanTaskitem {
pub task_group_id: String,
pub cidr: String,
pub created_by_username: String,
pub started_at: Option<NaiveDateTime>,
pub still_processing_at: Option<NaiveDateTime>,
pub ended_at: Option<NaiveDateTime>,
}
impl ScanTask {
pub async fn list_not_started(mut conn: DbConn) -> Result<Vec<ScanTaskitem>, DieselError> {
use crate::schema::scan_tasks;
let res = scan_tasks
.select(ScanTaskitem::as_select())
.filter(scan_tasks::started_at.is_null())
.order((scan_tasks::created_at.asc(),))
.load::<ScanTaskitem>(&mut conn)
.await;
match res {
Ok(rows) => Ok(rows),
Err(err) => Err(err),
}
}
pub async fn list(conn: &mut DbConn) -> Result<Vec<ScanTaskitem>, DieselError> {
use crate::schema::scan_tasks;
let res = scan_tasks
.select(ScanTaskitem::as_select())
.order((
scan_tasks::created_at.desc(),
scan_tasks::task_group_id.asc(),
))
.load::<ScanTaskitem>(conn)
.await;
match res {
Ok(rows) => Ok(rows),
Err(err) => Err(err),
}
}
pub async fn save(self: &ScanTask, conn: &mut DbConn) -> Result<(), DieselError> {
use crate::schema::scan_tasks;
let new_scan_task: NewScanTask = NewScanTask::from_scan_task(self).await;
match insert_into(scan_tasks::table)
.values(&new_scan_task)
.on_conflict(diesel::dsl::DuplicatedKeys)
.do_update()
.set(&new_scan_task)
.execute(conn)
.await
{
Ok(_) => Ok(()),
Err(err) => Err(err),
}
}
}
#[derive(Debug, Insertable, AsChangeset)]
#[diesel(table_name = crate::schema::scan_tasks)]
#[diesel(check_for_backend(diesel::mysql::Mysql))]
pub struct NewScanTask {
pub task_group_id: String,
pub cidr: String,
pub created_by_username: String,
pub created_at: NaiveDateTime,
pub updated_at: Option<NaiveDateTime>,
pub started_at: Option<NaiveDateTime>,
pub still_processing_at: Option<NaiveDateTime>,
pub ended_at: Option<NaiveDateTime>,
}
impl NewScanTask {
pub async fn from_scan_task<'x>(scan_task: &ScanTask) -> NewScanTask {
NewScanTask {
task_group_id: scan_task.task_group_id.to_string(),
cidr: scan_task.cidr.to_owned(),
created_by_username: scan_task.created_by_username.to_owned(),
created_at: scan_task.created_at,
updated_at: scan_task.updated_at,
started_at: scan_task.started_at,
still_processing_at: scan_task.still_processing_at,
ended_at: scan_task.ended_at,
}
}
}

View File

@ -0,0 +1,35 @@
// @generated automatically by Diesel CLI.
diesel::table! {
scan_tasks (task_group_id, cidr) {
#[max_length = 255]
task_group_id -> Varchar,
#[max_length = 255]
cidr -> Varchar,
#[max_length = 255]
created_by_username -> Varchar,
created_at -> Datetime,
updated_at -> Nullable<Datetime>,
started_at -> Nullable<Datetime>,
still_processing_at -> Nullable<Datetime>,
ended_at -> Nullable<Datetime>,
}
}
diesel::table! {
scanners (ip, ip_type) {
#[max_length = 255]
ip -> Varchar,
ip_type -> Unsigned<Tinyint>,
#[max_length = 255]
scanner_name -> Varchar,
#[max_length = 255]
ip_ptr -> Nullable<Varchar>,
created_at -> Datetime,
updated_at -> Nullable<Datetime>,
last_seen_at -> Nullable<Datetime>,
last_checked_at -> Nullable<Datetime>,
}
}
diesel::allow_tables_to_appear_in_same_query!(scan_tasks, scanners,);

305
snow-scanner/src/server.rs Normal file
View File

@ -0,0 +1,305 @@
use rocket::futures::{stream::Next, SinkExt, StreamExt};
use rocket_ws::{frame::CloseFrame, Message};
use std::pin::Pin;
use crate::event_bus::{EventBusEvent, EventBusWriter, EventBusWriterEvent};
use rocket::futures::channel::mpsc as rocket_mpsc;
use snow_scanner_worker::modules::WorkerMessages;
pub struct Server {}
type HandleBox = Pin<
Box<dyn std::future::Future<Output = Result<(), rocket_ws::result::Error>> + std::marker::Send>,
>;
impl Server {
pub fn handle(
stream: rocket_ws::stream::DuplexStream,
bus_rx: rocket::tokio::sync::broadcast::Receiver<EventBusEvent>,
bus_tx: rocket_mpsc::Sender<EventBusWriterEvent>,
ws_receiver: rocket_mpsc::Receiver<rocket_ws::Message>,
) -> HandleBox {
use rocket::tokio;
Box::pin(async move {
let work_fn = Worker::work(stream, bus_rx, bus_tx, ws_receiver);
tokio::spawn(work_fn);
tokio::signal::ctrl_c().await.unwrap();
Ok(())
})
}
pub fn new() -> Server {
Server {}
}
/*pub fn add_worker(tx: rocket_mpsc::Sender<Message>, workers: &Mutex<WorkersList>) -> () {
let workers_lock = workers.try_lock();
if let Ok(mut workers) = workers_lock {
workers.push(tx);
info!("Clients: {}", workers.len());
std::mem::drop(workers);
} else {
error!("Unable to lock workers");
}
}*/
pub fn shutdown_to_all(server: &EventBusWriter) -> () {
let res = server
.write()
.try_send(EventBusWriterEvent::BroadcastMessage(Message::Close(Some(
CloseFrame {
code: rocket_ws::frame::CloseCode::Away,
reason: "Server stop".into(),
},
))));
match res {
Ok(_) => {
info!("Worker did receive stop signal.");
}
Err(err) => {
error!("Send error: {err}");
}
};
}
/*pub fn send_to_all(workers: &Mutex<WorkersList>, message: &str) -> () {
let workers_lock = workers.try_lock();
if let Ok(ref workers) = workers_lock {
workers.iter().for_each(|tx| {
let res = tx.clone().try_send(Message::Text(message.to_string()));
match res {
Ok(_) => {
info!("Message sent to worker !");
}
Err(err) => {
error!("Send error: {err}");
}
};
});
info!("Currently {} workers online.", workers.len());
std::mem::drop(workers_lock);
} else {
error!("Unable to lock workers");
}
}*/
}
pub struct Worker<'a> {
authenticated: bool,
login: Option<String>,
stream: &'a mut rocket_ws::stream::DuplexStream,
bus_tx: &'a mut rocket_mpsc::Sender<EventBusWriterEvent>,
}
impl<'a> Worker<'a> {
pub fn initial(
stream: &'a mut rocket_ws::stream::DuplexStream,
bus_tx: &'a mut rocket_mpsc::Sender<EventBusWriterEvent>,
) -> Worker<'a> {
info!("New worker");
Worker {
authenticated: false,
login: None,
stream,
bus_tx,
}
}
pub async fn work(
mut stream: rocket_ws::stream::DuplexStream,
mut bus_rx: rocket::tokio::sync::broadcast::Receiver<EventBusEvent>,
mut bus_tx: rocket_mpsc::Sender<EventBusWriterEvent>,
mut ws_receiver: rocket_mpsc::Receiver<rocket_ws::Message>,
) {
use crate::rocket::futures::StreamExt;
use rocket::tokio;
let mut worker = Worker::initial(&mut stream, &mut bus_tx);
let mut interval = rocket::tokio::time::interval(std::time::Duration::from_secs(60));
loop {
tokio::select! {
_ = interval.tick() => {
// Send message every X seconds
if let Ok(true) = worker.tick().await {
break;
}
}
result = bus_rx.recv() => {
let message = match result {
Ok(message) => message,
Err(err) => {
error!("Bus error: {err}");
continue;
}
};
if let Err(err) = worker.send(message).await {
error!("Error sending event to Event bus WebSocket: {}", err);
break;
}
}
Some(message) = ws_receiver.next() => {
info!("Received message from other client: {:?}", message);
let _ = worker.send(message).await;
},
Ok(false) = worker.poll() => {
// Continue the loop
}
else => {
break;
}
}
}
}
pub async fn send(&mut self, msg: Message) -> Result<(), rocket_ws::result::Error> {
self.stream.send(msg).await
}
pub fn next(&mut self) -> Next<'_, rocket_ws::stream::DuplexStream> {
self.stream.next()
}
pub async fn tick(&mut self) -> Result<bool, ()> {
match self.send(rocket_ws::Message::Ping(vec![])).await {
Ok(_) => Ok(false),
Err(err) => {
error!("Processing error: {err}");
Ok(true) // Break processing loop
}
}
}
pub async fn poll(&mut self) -> Result<bool, ()> {
let message = self.next();
match message.await {
Some(Ok(message)) => {
match message {
rocket_ws::Message::Text(_) => match self.on_message(&message).await {
Ok(_) => {}
Err(err) => error!("Processing error: {err}"),
},
rocket_ws::Message::Binary(data) => {
// Handle Binary message
info!("Received Binary message: {:?}", data);
}
rocket_ws::Message::Close(close_frame) => {
// Handle Close message
info!("Received Close message: {:?}", close_frame);
let close_frame = rocket_ws::frame::CloseFrame {
code: rocket_ws::frame::CloseCode::Normal,
reason: "Client disconected".to_string().into(),
};
let _ = self.stream.close(Some(close_frame)).await;
return Ok(true); // Break processing loop
}
rocket_ws::Message::Ping(ping_data) => {
match self.send(rocket_ws::Message::Pong(ping_data)).await {
Ok(_) => {}
Err(err) => error!("Processing error: {err}"),
}
}
rocket_ws::Message::Pong(pong_data) => {
// Handle Pong message
info!("Received Pong message: {:?}", pong_data);
}
_ => {
info!("Received other message: {:?}", message);
}
};
Ok(false)
}
Some(Err(_)) => {
info!("Connection closed");
let close_frame = rocket_ws::frame::CloseFrame {
code: rocket_ws::frame::CloseCode::Normal,
reason: "Client disconected".to_string().into(),
};
let _ = self.stream.close(Some(close_frame)).await;
// The connection is closed by the client
Ok(true) // Break processing loop
}
None => Ok(false),
}
}
pub fn is_authenticated(&self) -> bool {
self.authenticated
}
pub fn authenticate(&mut self, login: String) -> &Worker {
if self.authenticated {
warn!(
"Worker is already authenticated as {}",
self.login.clone().unwrap_or("".to_string())
);
return self;
} else {
info!("Worker is now authenticated as {login}");
}
self.login = Some(login);
self.authenticated = true;
self
}
pub async fn on_message(&mut self, msg: &Message) -> Result<(), String> {
info!("on message: {msg}");
let mut worker_reply: Option<WorkerMessages> = None;
let worker_request: WorkerMessages = match msg.clone().try_into() {
Ok(data) => data,
Err(err) => return Err(err),
};
let result = match worker_request {
WorkerMessages::AuthenticateRequest { login } => {
if !self.is_authenticated() {
self.authenticate(login);
return Ok(());
} else {
error!("Already authenticated");
return Ok(());
}
}
WorkerMessages::ScannerFoundResponse { name, address } => {
info!("Detected {name} for {address}");
let _ = self
.bus_tx
.try_send(EventBusWriterEvent::ScannerFoundResponse { name, address });
Ok(())
}
WorkerMessages::GetWorkRequest {} => {
worker_reply = Some(WorkerMessages::DoWorkRequest { neworks: vec![] });
Ok(())
}
WorkerMessages::DoWorkRequest { .. } | WorkerMessages::Invalid { .. } => {
error!("Unable to understand: {msg}");
// Unable to understand, close the connection
let close_frame = rocket_ws::frame::CloseFrame {
code: rocket_ws::frame::CloseCode::Unsupported,
reason: "Invalid data received".to_string().into(),
};
let _ = self.stream.close(Some(close_frame)).await;
Err("Unable to understand: {msg}}")
} /*msg => {
error!("No implemented: {:#?}", msg);
Ok(())
}*/
};
// it has a request to send
if let Some(worker_reply) = worker_reply {
let msg_string: String = worker_reply.to_string();
match self.send(rocket_ws::Message::Text(msg_string)).await {
Ok(_) => match worker_reply {
WorkerMessages::DoWorkRequest { .. } => {}
msg => error!("No implemented: {:#?}", msg),
},
Err(err) => error!("Error sending reply: {err}"),
}
}
Ok(result?)
}
}

View File

@ -0,0 +1,34 @@
[package]
name = "snow-scanner-worker"
version = "0.1.0"
authors = ["William Desportes <williamdes@wdes.fr>"]
edition = "2021"
rust-version = "1.81.0" # MSRV
description = "The CLI to run a snow-scanner worker"
[[bin]]
name = "snow-scanner-worker"
path = "worker.rs"
[lib]
name = "snow_scanner_worker"
path = "mod.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tungstenite = { version = "0.24.0", default-features = true, features = ["native-tls"] }
rocket.workspace = true
rocket_ws.workspace = true
log2 = "0.1.11"
diesel.workspace = true
dns-ptr-resolver.workspace = true
hickory-resolver.workspace = true
chrono.workspace = true
uuid.workspace = true
cidr.workspace = true
serde.workspace = true
serde_json.workspace = true
weighted-rs.workspace = true
rayon = "1.10.0"
rand = "0.8.5"

View File

@ -0,0 +1,100 @@
use std::net::IpAddr;
use std::str::FromStr;
use std::time::Duration;
use crate::scanners::Scanners;
use dns_ptr_resolver::ResolvedResult;
use hickory_resolver::config::{NameServerConfigGroup, ResolverConfig, ResolverOpts};
use hickory_resolver::{Name, Resolver};
use crate::ip_addr::is_global_hardcoded;
pub fn get_dns_server_config(server_ips: &Vec<IpAddr>) -> NameServerConfigGroup {
NameServerConfigGroup::from_ips_clear(
server_ips, 53, // Port 53
true,
)
}
pub fn get_dns_client(server: &NameServerConfigGroup) -> Resolver {
let config = ResolverConfig::from_parts(None, vec![], server.clone());
let mut options = ResolverOpts::default();
options.timeout = Duration::from_secs(5);
options.attempts = 1; // One try
Resolver::new(config, options).unwrap()
}
pub fn validate_ip(ip: IpAddr) -> bool {
// unspecified => 0.0.0.0
if ip.is_loopback() || ip.is_multicast() || ip.is_unspecified() {
return false;
}
return is_global_hardcoded(ip);
}
pub fn detect_scanner(ptr_result: &ResolvedResult) -> Result<Option<Scanners>, ()> {
match &ptr_result.result {
Some(name) => detect_scanner_from_name(&name),
None => Ok(None),
}
}
pub fn detect_scanner_from_name(name: &Name) -> Result<Option<Scanners>, ()> {
match name {
ref name
if name
.trim_to(2)
.eq_case(&Name::from_str("binaryedge.ninja.").expect("Should parse")) =>
{
Ok(Some(Scanners::Binaryedge))
}
ref name
if name
.trim_to(2)
.eq_case(&Name::from_str("stretchoid.com.").expect("Should parse")) =>
{
Ok(Some(Scanners::Stretchoid))
}
ref name
if name
.trim_to(2)
.eq_case(&Name::from_str("shadowserver.org.").expect("Should parse")) =>
{
Ok(Some(Scanners::Shadowserver))
}
&_ => Ok(None),
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_detect_scanner_from_name() {
let ptr = Name::from_str("scan-47e.shadowserver.org.").unwrap();
assert_eq!(
detect_scanner_from_name(&ptr).unwrap(),
Some(Scanners::Shadowserver)
);
}
#[test]
fn test_detect_scanner() {
let cname_ptr = Name::from_str("111.0-24.197.62.64.in-addr.arpa.").unwrap();
let ptr = Name::from_str("scan-47e.shadowserver.org.").unwrap();
assert_eq!(
detect_scanner(&ResolvedResult {
query: cname_ptr,
result: Some(ptr),
error: None
})
.unwrap(),
Some(Scanners::Shadowserver)
);
}
}

View File

@ -0,0 +1,126 @@
//
// Port of the official Rust implementation
// Source: https://github.com/dani-garcia/vaultwarden/blob/1.32.1/src/util.rs
//
/// TODO: This is extracted from IpAddr::is_global, which is unstable:
/// https://doc.rust-lang.org/nightly/std/net/enum.IpAddr.html#method.is_global
/// Remove once https://github.com/rust-lang/rust/issues/27709 is merged
#[allow(clippy::nonminimal_bool)]
#[cfg(any(not(feature = "unstable"), test))]
pub fn is_global_hardcoded(ip: std::net::IpAddr) -> bool {
match ip {
std::net::IpAddr::V4(ip) => {
!(ip.octets()[0] == 0 // "This network"
|| ip.is_private()
|| (ip.octets()[0] == 100 && (ip.octets()[1] & 0b1100_0000 == 0b0100_0000)) //ip.is_shared()
|| ip.is_loopback()
|| ip.is_link_local()
// addresses reserved for future protocols (`192.0.0.0/24`)
||(ip.octets()[0] == 192 && ip.octets()[1] == 0 && ip.octets()[2] == 0)
|| ip.is_documentation()
|| (ip.octets()[0] == 198 && (ip.octets()[1] & 0xfe) == 18) // ip.is_benchmarking()
|| (ip.octets()[0] & 240 == 240 && !ip.is_broadcast()) //ip.is_reserved()
|| ip.is_broadcast())
}
std::net::IpAddr::V6(ip) => {
!(ip.is_unspecified()
|| ip.is_loopback()
// IPv4-mapped Address (`::ffff:0:0/96`)
|| matches!(ip.segments(), [0, 0, 0, 0, 0, 0xffff, _, _])
// IPv4-IPv6 Translat. (`64:ff9b:1::/48`)
|| matches!(ip.segments(), [0x64, 0xff9b, 1, _, _, _, _, _])
// Discard-Only Address Block (`100::/64`)
|| matches!(ip.segments(), [0x100, 0, 0, 0, _, _, _, _])
// IETF Protocol Assignments (`2001::/23`)
|| (matches!(ip.segments(), [0x2001, b, _, _, _, _, _, _] if b < 0x200)
&& !(
// Port Control Protocol Anycast (`2001:1::1`)
u128::from_be_bytes(ip.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0001
// Traversal Using Relays around NAT Anycast (`2001:1::2`)
|| u128::from_be_bytes(ip.octets()) == 0x2001_0001_0000_0000_0000_0000_0000_0002
// AMT (`2001:3::/32`)
|| matches!(ip.segments(), [0x2001, 3, _, _, _, _, _, _])
// AS112-v6 (`2001:4:112::/48`)
|| matches!(ip.segments(), [0x2001, 4, 0x112, _, _, _, _, _])
// ORCHIDv2 (`2001:20::/28`)
|| matches!(ip.segments(), [0x2001, b, _, _, _, _, _, _] if (0x20..=0x2F).contains(&b))
))
|| ((ip.segments()[0] == 0x2001) && (ip.segments()[1] == 0xdb8)) // ip.is_documentation()
|| ((ip.segments()[0] & 0xfe00) == 0xfc00) //ip.is_unique_local()
|| ((ip.segments()[0] & 0xffc0) == 0xfe80)) //ip.is_unicast_link_local()
}
}
}
#[cfg(not(feature = "unstable"))]
pub use is_global_hardcoded as is_global;
#[cfg(feature = "unstable")]
#[inline(always)]
pub fn is_global(ip: std::net::IpAddr) -> bool {
ip.is_global()
}
/// These are some tests to check that the implementations match
/// The IPv4 can be all checked in 30 seconds or so and they are correct as of nightly 2023-07-17
/// The IPV6 can't be checked in a reasonable time, so we check over a hundred billion random ones, so far correct
/// Note that the is_global implementation is subject to change as new IP RFCs are created
///
/// To run while showing progress output:
/// cargo +nightly test --release --features sqlite,unstable -- --nocapture --ignored
#[cfg(test)]
#[cfg(feature = "unstable")]
mod tests {
use super::*;
use std::net::IpAddr;
#[test]
#[ignore]
fn test_ipv4_global() {
for a in 0..u8::MAX {
println!("Iter: {}/255", a);
for b in 0..u8::MAX {
for c in 0..u8::MAX {
for d in 0..u8::MAX {
let ip = IpAddr::V4(std::net::Ipv4Addr::new(a, b, c, d));
assert_eq!(
ip.is_global(),
is_global_hardcoded(ip),
"IP mismatch: {}",
ip
)
}
}
}
}
}
#[test]
#[ignore]
fn test_ipv6_global() {
use rand::Rng;
std::thread::scope(|s| {
for t in 0..16 {
let handle = s.spawn(move || {
let mut v = [0u8; 16];
let mut rng = rand::thread_rng();
for i in 0..20 {
println!("Thread {t} Iter: {i}/50");
for _ in 0..500_000_000 {
rng.fill(&mut v);
let ip = IpAddr::V6(std::net::Ipv6Addr::from(v));
assert_eq!(
ip.is_global(),
is_global_hardcoded(ip),
"IP mismatch: {ip}"
);
}
}
});
}
});
}
}

View File

@ -0,0 +1,5 @@
pub mod detection;
pub mod ip_addr;
pub mod modules;
pub mod scanners;
pub mod utils;

View File

@ -0,0 +1,120 @@
use std::{net::IpAddr, str::FromStr};
use cidr::IpCidr;
use rocket_ws::Message as RocketMessage;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
#[derive(Debug, Clone, PartialEq)]
pub struct Network(pub IpCidr);
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[serde(tag = "type", content = "request")]
pub enum WorkerMessages {
#[serde(rename = "auth_request")]
AuthenticateRequest { login: String },
#[serde(rename = "get_work")]
GetWorkRequest {},
#[serde(rename = "do_work")]
DoWorkRequest { neworks: Vec<Network> },
#[serde(rename = "scanner_found")]
ScannerFoundResponse { name: String, address: IpAddr },
#[serde(rename = "")]
Invalid { err: String },
}
impl<'de> Deserialize<'de> for Network {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = <String>::deserialize(deserializer)?;
let k: &str = s.as_str();
match IpCidr::from_str(k) {
Ok(d) => Ok(Network(d)),
Err(err) => Err(serde::de::Error::custom(format!(
"Unsupported value {k}: {err}"
))),
}
}
}
impl Serialize for Network {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.0.to_string().as_str())
}
}
impl ToString for WorkerMessages {
fn to_string(&self) -> String {
serde_json::to_string(&self).expect("To serialize").into()
}
}
impl Into<WorkerMessages> for String {
fn into(self) -> WorkerMessages {
let req: Result<WorkerMessages, serde_json::Error> = serde_json::from_str(self.as_str());
match req {
Ok(d) => d,
Err(err) => WorkerMessages::Invalid {
err: err.to_string(),
},
}
}
}
impl Into<RocketMessage> for WorkerMessages {
fn into(self) -> RocketMessage {
RocketMessage::Text(self.to_string())
}
}
impl TryInto<WorkerMessages> for RocketMessage {
type Error = String;
fn try_into(self) -> Result<WorkerMessages, Self::Error> {
match self {
RocketMessage::Text(data) => {
let data: WorkerMessages = data.into();
Ok(data)
}
_ => Err("Only text is supported".to_string()),
}
}
}
#[cfg(test)]
mod tests {
use cidr::IpCidr;
use super::*;
#[test]
fn deserialize_do_work_empty() {
let data = "{\"type\":\"do_work\",\"request\":{\"neworks\":[]}}";
let result: WorkerMessages = data.to_string().into();
assert_eq!(
result,
WorkerMessages::DoWorkRequest {
neworks: [].to_vec()
}
);
}
#[test]
fn deserialize_do_work() {
let data = "{\"type\":\"do_work\",\"request\":{\"neworks\":[\"127.0.0.0/31\"]}}";
let result: WorkerMessages = data.to_string().into();
let cidr: IpCidr = IpCidr::from_str("127.0.0.0/31").unwrap();
assert_eq!(
result,
WorkerMessages::DoWorkRequest {
neworks: [Network(cidr)].to_vec()
}
);
}
}

View File

@ -0,0 +1,133 @@
use diesel::deserialize;
use diesel::deserialize::FromSqlRow;
use diesel::mysql::Mysql;
use diesel::mysql::MysqlValue;
use diesel::serialize;
use diesel::serialize::IsNull;
use diesel::sql_types::Text;
use rocket::request::FromParam;
use serde::{Deserialize, Deserializer};
use std::fmt;
use std::io::Write;
#[derive(Debug, Clone, Copy, FromSqlRow, PartialEq)]
pub enum Scanners {
Stretchoid,
Binaryedge,
Shadowserver,
Censys,
InternetMeasurement,
}
pub trait IsStatic {
fn is_static(self: &Self) -> bool;
}
impl IsStatic for Scanners {
fn is_static(self: &Self) -> bool {
match self {
Scanners::Censys => true,
Scanners::InternetMeasurement => true,
_ => false,
}
}
}
impl FromParam<'_> for Scanners {
type Error = String;
fn from_param(param: &'_ str) -> Result<Self, Self::Error> {
match param {
"stretchoid" => Ok(Scanners::Stretchoid),
"binaryedge" => Ok(Scanners::Binaryedge),
"shadowserver" => Ok(Scanners::Shadowserver),
"stretchoid.txt" => Ok(Scanners::Stretchoid),
"binaryedge.txt" => Ok(Scanners::Binaryedge),
"shadowserver.txt" => Ok(Scanners::Shadowserver),
"censys.txt" => Ok(Scanners::Censys),
"internet-measurement.com.txt" => Ok(Scanners::InternetMeasurement),
v => Err(format!("Unknown value: {v}")),
}
}
}
impl<'de> Deserialize<'de> for Scanners {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let s = <Vec<String>>::deserialize(deserializer)?;
let k: &str = s[0].as_str();
match k {
"stretchoid" => Ok(Scanners::Stretchoid),
"binaryedge" => Ok(Scanners::Binaryedge),
"shadowserver" => Ok(Scanners::Shadowserver),
"stretchoid.txt" => Ok(Scanners::Stretchoid),
"binaryedge.txt" => Ok(Scanners::Binaryedge),
"shadowserver.txt" => Ok(Scanners::Shadowserver),
"censys.txt" => Ok(Scanners::Censys),
"internet-measurement.com.txt" => Ok(Scanners::InternetMeasurement),
v => Err(serde::de::Error::custom(format!(
"Unknown value: {}",
v.to_string()
))),
}
}
}
impl fmt::Display for Scanners {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}",
match self {
Self::Stretchoid => "stretchoid",
Self::Binaryedge => "binaryedge",
Self::Censys => "censys",
Self::InternetMeasurement => "internet-measurement.com",
Self::Shadowserver => "shadowserver",
}
)
}
}
impl serialize::ToSql<Text, Mysql> for Scanners {
fn to_sql(&self, out: &mut serialize::Output<Mysql>) -> serialize::Result {
match *self {
Self::Stretchoid => out.write_all(b"stretchoid")?,
Self::Binaryedge => out.write_all(b"binaryedge")?,
Self::Censys => out.write_all(b"censys")?,
Self::InternetMeasurement => out.write_all(b"internet-measurement.com")?,
Self::Shadowserver => out.write_all(b"shadowserver")?,
};
Ok(IsNull::No)
}
}
impl deserialize::FromSql<Text, Mysql> for Scanners {
fn from_sql(bytes: MysqlValue) -> deserialize::Result<Self> {
let value = <String as deserialize::FromSql<Text, Mysql>>::from_sql(bytes)?;
let value = &value as &str;
let value: Result<Scanners, String> = value.try_into();
match value {
Ok(d) => Ok(d),
Err(err) => Err(err.into()),
}
}
}
impl TryInto<Scanners> for &str {
type Error = String;
fn try_into(self) -> Result<Scanners, Self::Error> {
match self {
"stretchoid" => Ok(Scanners::Stretchoid),
"binaryedge" => Ok(Scanners::Binaryedge),
"internet-measurement.com" => Ok(Scanners::InternetMeasurement),
"shadowserver" => Ok(Scanners::Shadowserver),
value => Err(format!("Invalid value: {value}")),
}
}
}

View File

@ -0,0 +1,33 @@
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::net::IpAddr;
use weighted_rs::{RoundrobinWeight, Weight};
pub fn get_dns_rr() -> RoundrobinWeight<Vec<IpAddr>> {
use std::str::FromStr;
// https://gist.github.com/mutin-sa/5dcbd35ee436eb629db7872581093bc5
let dns_servers: Vec<IpAddr> = vec![
IpAddr::from_str("1.1.1.1").unwrap(),
IpAddr::from_str("1.0.0.1").unwrap(),
IpAddr::from_str("8.8.8.8").unwrap(),
IpAddr::from_str("8.8.4.4").unwrap(),
IpAddr::from_str("9.9.9.9").unwrap(),
IpAddr::from_str("9.9.9.10").unwrap(),
IpAddr::from_str("2.56.220.2").unwrap(), // G-Core DNS
IpAddr::from_str("95.85.95.85").unwrap(), // G-Core DNS
IpAddr::from_str("193.110.81.0").unwrap(), // dns0.eu AS50902
IpAddr::from_str("185.253.5.0").unwrap(), // dns0.eu AS50902
IpAddr::from_str("74.82.42.42").unwrap(), // Hurricane Electric [AS6939]
];
let mut rr: RoundrobinWeight<Vec<IpAddr>> = RoundrobinWeight::new();
// For each entry in the list we create a lot of two DNS servers to use
for _ in &dns_servers {
let mut client_servers = dns_servers.clone();
client_servers.shuffle(&mut thread_rng());
client_servers.truncate(2);
rr.add(client_servers, 1);
}
rr
}

View File

@ -0,0 +1,368 @@
use std::{env, net::IpAddr};
use chrono::{Duration, NaiveDateTime, Utc};
use cidr::IpCidr;
use detection::detect_scanner;
use dns_ptr_resolver::{get_ptr, ResolvedResult};
use log2::*;
use scanners::Scanners;
use tungstenite::stream::MaybeTlsStream;
use tungstenite::{connect, Error, Message, WebSocket};
use weighted_rs::Weight;
pub mod detection;
pub mod ip_addr;
pub mod modules;
pub mod scanners;
pub mod utils;
use crate::detection::{get_dns_client, get_dns_server_config};
use crate::modules::WorkerMessages;
use crate::utils::get_dns_rr;
#[derive(Debug, Clone)]
pub struct IpToResolve {
pub address: IpAddr,
}
#[derive(Debug)]
pub struct Worker {
pub authenticated: bool,
pub tasks: Vec<IpToResolve>,
pub last_request_for_work: Option<NaiveDateTime>,
ws: WebSocket<MaybeTlsStream<std::net::TcpStream>>,
}
impl Into<Worker> for WebSocket<MaybeTlsStream<std::net::TcpStream>> {
fn into(self) -> Worker {
let wait_time = std::time::Duration::from_secs(1);
match self.get_ref() {
tungstenite::stream::MaybeTlsStream::Plain(stream) => stream
.set_read_timeout(Some(wait_time))
.expect("set_nonblocking to work"),
tungstenite::stream::MaybeTlsStream::NativeTls(stream) => {
stream
.get_ref()
.set_read_timeout(Some(wait_time))
.expect("set_nonblocking to work");
()
}
_ => unimplemented!(),
};
Worker {
authenticated: false,
tasks: vec![],
last_request_for_work: None,
ws: self,
}
}
}
impl Worker {
pub fn wait_for_messages(&mut self) -> Result<bool, Error> {
self.tick();
match self.ws.read() {
Ok(server_request) => {
match server_request {
Message::Text(msg_string) => {
self.receive_request(msg_string.into());
}
Message::Ping(data) => {
let _ = self.ws.write(Message::Pong(data));
}
Message::Pong(_) => {}
Message::Frame(_) => {}
Message::Binary(_) => {}
Message::Close(_) => {
return Ok(true); // Break the processing loop
}
};
Ok(false)
}
Err(err) => {
match err {
// Silently drop the error: Processing error: IO error: Resource temporarily unavailable (os error 11)
// That occurs when no messages are to be read
Error::Io(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(false),
Error::Io(ref e) if e.kind() == std::io::ErrorKind::NotConnected => Ok(true), // Break the processing loop
_ => Err(err),
}
}
}
}
pub fn is_authenticated(&self) -> bool {
self.authenticated
}
pub fn authenticate(&mut self, login: String) -> &Worker {
if self.authenticated {
warn!("Worker is already authenticated");
return self;
} else {
info!("Worker is now authenticated as {login}");
}
self.authenticated = true;
self
}
pub fn tick(&mut self) -> () {
let mut request: Option<WorkerMessages> = None;
if !self.is_authenticated() {
request = Some(WorkerMessages::AuthenticateRequest {
login: env::var("WORKER_NAME").expect("The ENV WORKER_NAME should be set"),
});
} else {
if self.last_request_for_work.is_none()
|| (self.last_request_for_work.is_some()
&& Utc::now().naive_utc()
> (self.last_request_for_work.unwrap() + Duration::minutes(10)))
{
request = Some(WorkerMessages::GetWorkRequest {});
}
}
// it has a request to send
if let Some(request) = request {
self.send_request(request);
}
}
pub fn send_request(&mut self, request: WorkerMessages) -> &Worker {
let msg_string: String = request.to_string();
match self.ws.send(Message::Text(msg_string)) {
Ok(_) => {
match request {
WorkerMessages::AuthenticateRequest { login } => {
self.authenticated = true; // Anyway, it will kick us if this is not success
info!("Logged in as: {login}")
}
WorkerMessages::GetWorkRequest {} => {
self.last_request_for_work = Some(Utc::now().naive_utc());
info!("Asked for work: {:?}", self.last_request_for_work)
}
msg => error!("No implemented: {:#?}", msg),
}
}
Err(err) => error!("Unable to send: {err}"),
}
self
}
fn work_on_cidr(&mut self, cidr: IpCidr) {
info!("Picking up: {cidr}");
info!("Range, from {} to {}", cidr.first(), cidr.last());
let addresses = cidr.iter().addresses();
let count = addresses.count();
let mut current = 0;
let mut rr_dns_servers = get_dns_rr();
for addr in addresses {
let client = get_dns_client(&get_dns_server_config(&rr_dns_servers.next().unwrap()));
match get_ptr(addr, client) {
Ok(result) => match detect_scanner(&result) {
Ok(Some(scanner_name)) => {
self.report_detection(scanner_name, addr, result);
}
Ok(None) => {}
Err(err) => error!("Error detecting for {addr}: {:?}", err),
},
Err(_) => {
//debug!("Error processing {addr}: {err}")
}
};
current += 1;
if current % 10 == 0 {
info!("Progress: {count}/{current}");
}
}
}
fn report_detection(&mut self, scanner_name: Scanners, addr: IpAddr, result: ResolvedResult) {
info!("Detected {:?} for {addr}", scanner_name);
let request = WorkerMessages::ScannerFoundResponse {
name: result.result.unwrap().to_string(),
address: addr,
};
let msg_string: String = request.to_string();
match self.ws.send(Message::Text(msg_string)) {
Ok(_) => {}
Err(err) => error!("Unable to send scanner result: {err}"),
}
}
pub fn receive_request(&mut self, server_request: WorkerMessages) -> &Worker {
match server_request {
WorkerMessages::DoWorkRequest { neworks } => {
info!("Work request received for neworks: {:?}", neworks);
for cidr in neworks {
let cidr = cidr.0;
self.work_on_cidr(cidr);
}
}
WorkerMessages::AuthenticateRequest { .. }
| WorkerMessages::ScannerFoundResponse { .. }
| WorkerMessages::GetWorkRequest {}
| WorkerMessages::Invalid { .. } => {
error!("Unable to understand message: {:?}", server_request);
}
}
self
}
}
/*fn resolve_file(addresses: InetAddressIterator<IpAddr>, dns_servers: Vec<&str>) {
let mut ips = vec![];
for address in addresses {
match IpAddr::from_str(address) {
Ok(addr) => ips.push(IpToResolve {
address: addr,
server: rr.next().unwrap(),
}),
Err(err) => {
eprintln!(
"Something went wrong while parsing the IP ({}): {}",
address, err
);
process::exit(1);
}
}
}
match rayon::ThreadPoolBuilder::new()
.num_threads(30)
.build_global()
{
Ok(r) => r,
Err(err) => {
eprintln!(
"Something went wrong while building the thread pool: {}",
err
);
process::exit(1);
}
}
ips.into_par_iter()
.enumerate()
.for_each(|(_i, to_resolve)| {
let server = NameServerConfigGroup::from_ips_clear(
&[to_resolve.server.ip()],
to_resolve.server.port(),
true,
);
let ptr_result = get_ptr(to_resolve.address, resolver);
match ptr_result {
Ok(ptr) => match ptr.result {
Some(res) => println!("{} # {}", to_resolve.address, res),
None => println!("{}", to_resolve.address),
},
Err(err) => {
let two_hundred_millis = Duration::from_millis(400);
thread::sleep(two_hundred_millis);
eprintln!(
"[{}] Error for {} -> {}",
to_resolve.server, to_resolve.address, err.message
)
}
}
});
}*/
fn main() -> () {
let _log2 = log2::stdout()
.module(true)
.level(match env::var("RUST_LOG") {
Ok(level) => level,
Err(_) => "debug".to_string(),
})
.start();
info!("Running the worker");
let url = match env::var("WORKER_URL") {
Ok(worker_url) => worker_url,
Err(_) => "ws://127.0.0.1:8800".to_string(),
};
match connect(&url) {
Ok((socket, response)) => {
let connected = response.status() == 101;
if connected {
info!("Connected to: {url}");
} else {
info!("Connecting replied {}: {url}", response.status());
}
let mut worker: Worker = socket.into();
loop {
match worker.wait_for_messages() {
Ok(true) => {
error!("Stopping processing");
break;
}
Ok(false) => {
// Keep processing
}
Err(tungstenite::Error::ConnectionClosed) => {
error!("Stopping processing: connection closed");
break;
}
Err(tungstenite::Error::AlreadyClosed) => {
error!("Stopping processing: connection already closed");
break;
}
Err(err) => error!("Processing error: {err} -> {:?}", err),
}
}
}
Err(err) => error!("Unable to connect to {url}: {err}"),
}
}
/*
thread::spawn(move || {
let conn = &mut get_connection(db_url.as_str());
// Reset scan tasks
let _ = conn.execute("UPDATE scan_tasks SET updated_at = :updated_at, still_processing_at = NULL, started_at = NULL WHERE (still_processing_at IS NOT NULL OR started_at IS NOT NULL) AND ended_at IS NULL",
named_params! {
":updated_at": Utc::now().naive_utc().to_string(),
}).unwrap();
loop {
let mut stmt = conn.prepare("SELECT task_group_id, cidr FROM scan_tasks WHERE started_at IS NULL ORDER BY created_at ASC").unwrap();
let mut rows = stmt.query(named_params! {}).unwrap();
println!("Waiting for jobs");
while let Some(row) = rows.next().unwrap() {
let _ = conn.execute("UPDATE scan_tasks SET updated_at = :updated_at, started_at = :started_at WHERE cidr = :cidr AND task_group_id = :task_group_id",
named_params! {
":updated_at": Utc::now().naive_utc().to_string(),
":started_at": Utc::now().naive_utc().to_string(),
":cidr": cidr_str,
":task_group_id": task_group_id,
}).unwrap();
if (current / count) % 10 == 0 {
let _ = conn.execute("UPDATE scan_tasks SET updated_at = :updated_at, still_processing_at = :still_processing_at WHERE cidr = :cidr AND task_group_id = :task_group_id",
named_params! {
":updated_at": Utc::now().naive_utc().to_string(),
":still_processing_at": Utc::now().naive_utc().to_string(),
":cidr": cidr_str,
":task_group_id": task_group_id,
}).unwrap();
}
}
let _ = conn.execute("UPDATE scan_tasks SET updated_at = :updated_at, ended_at = :ended_at WHERE cidr = :cidr AND task_group_id = :task_group_id",
named_params! {
":updated_at": Utc::now().naive_utc().to_string(),
":ended_at": Utc::now().naive_utc().to_string(),
":cidr": cidr_str,
":task_group_id": task_group_id,
}).unwrap();
}
}
});*/