grimhilde/ingest/src/main.rs

106 lines
2.8 KiB
Rust

// grimhilde - mirror stats
// Copyright (C) 2022 Amanda Graven and Emelie Graven
// Licensed under the EUPL
use sqlx::{types::{time::OffsetDateTime, ipnetwork::IpNetwork}, PgPool};
use tokio::net::UdpSocket;
use std::{io, str};
use serde::{Serialize, Deserialize};
mod db;
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct RawRequest {
size: i32,
path: String,
addr: String,
host: String,
user_agent: String,
referrer: String,
secure: String,
timestamp: String,
}
#[derive(Debug)]
pub struct Request {
size: i32,
path: String,
addr: IpNetwork,
host: String,
user_agent: String,
referrer: String,
secure: bool,
timestamp: OffsetDateTime
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenv::dotenv()?;
// Connect to the postgres database
let pool = db::connect(&std::env::var("DATABASE_URL")?).await?;
// Connect to a peer
let socket = UdpSocket::bind("127.0.0.1:9210").await?;
loop {
// The buffer is **not** included in the async task and will
// only exist on the stack).
let mut buf = [0; 8192];
// Try to recv data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match socket.recv(&mut buf).await {
Ok(n) => match receive_message(&pool, &buf[..n]).await {
Ok(()) => (),
Err(e) => {
println!("ERROR: {}", e);
// TODO: println!("Log message was: {}", &buf[..n]);
},
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
}
async fn receive_message(pool: &PgPool, message: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
let message = str::from_utf8(message)?;
// The incoming nginx logs are already structured as json.
// We identify the start of the log string...
let index = match message.find('{') {
Some(i) => i,
None => return Err(String::from("JSON object missing from log").into()),
};
// ...and strip of the prefix.
let (prefix, json) = message.split_at(index);
// Nginx should be configured to tag log entries with "mirror"
// Making sure this tag is present is at least a rudimentary
// protection against data accidentally coming in from
// elsewhere
if !prefix.ends_with("mirror: ") {
return Err(String::from("mirror prefix not present, got log line").into());
}
let req: RawRequest = serde_json::from_str(json)?;
let req: Request = Request {
size: req.size,
path: req.path,
addr: req.addr.parse()?,
host: req.host,
user_agent: req.user_agent,
referrer: req.referrer,
secure: match req.secure.as_str() {
"on" => true,
"" => false,
_ => return Err(String::from("Unexpected content in $https").into()),
},
timestamp: OffsetDateTime::parse(req.timestamp, "%d/%b/%Y:%T %z")?
};
db::insert(pool, req).await?;
Ok(())
}