Initial commit

This commit is contained in:
Emelie Graven 2022-02-19 20:30:21 +01:00
commit 64fa7edc8e
Signed by: emelie
GPG key ID: C11123726DBB55A1
10 changed files with 1870 additions and 0 deletions

22
ingest/Cargo.toml Normal file
View file

@ -0,0 +1,22 @@
[package]
name = "ingest"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tempfile = "3"
serde = { version = "1.0", features = [ "derive" ] }
serde_json = "1.0"
ipnetwork = "0.18"
[dependencies.tokio]
version = "1"
features = [ "full" ]
[dependencies.sqlx]
version = "0.5"
features = [ "runtime-tokio-rustls", "postgres", "ipnetwork", "migrate", "macros", "time" ]

83
ingest/src/main.rs Normal file
View file

@ -0,0 +1,83 @@
// grimhilde - mirror stats
// Copyright (C) 2022 Amanda Graven, <amanda@graven.dev> & Emelie Graven <emelie@graven.dev>
// Licensed under the EUPL
use sqlx::types::{time::OffsetDateTime, ipnetwork::IpNetwork};
use tokio::net::UdpSocket;
use std::{io, str};
use serde::{Serialize, Deserialize};
#[derive(Deserialize, Serialize, Debug, Clone)]
struct RawRequest {
size: u64,
path: String,
addr: String,
user_agent: String,
referrer: String,
secure: bool,
timestamp: String,
}
#[derive(Serialize)]
struct Request {
size: u64,
path: String,
addr: IpNetwork,
user_agent: String,
referrer: String,
secure: bool,
timestamp: OffsetDateTime
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to a peer
let socket = UdpSocket::bind("127.0.0.1:9210").await?;
loop {
// The buffer is **not** included in the async task and will
// only exist on the stack).
let mut buf = [0; 8192];
// Try to recv data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match socket.recv(&mut buf).await {
Ok(n) => {
let message = str::from_utf8(&buf[..n])?;
// The incoming nginx logs are already structured as json.
// We identify the start of the log string...
let index = match message.find('{') {
Some(i) => i,
None => continue,
};
// ...and strip of the prefix.
let (prefix, message) = message.split_at(index);
// Nginx should be configured to tag log entries with "mirror"
// Making sure this tag is present is at least a rudimentary
// protection against data accidentally coming in from
// elsewhere
if prefix.ends_with("mirror: ") {
let req: RawRequest = serde_json::from_str(message)?;
let req: Request = Request {
size: req.size,
path: req.path,
addr: req.addr.parse()?,
user_agent: req.user_agent,
referrer: req.referrer,
secure: req.secure,
timestamp: OffsetDateTime::parse(req.timestamp, "%d/%m/%Y:%T %z")?
};
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
Ok(())
}