diff --git a/.editorconfig b/.editorconfig index 269d98a..7a6e9c2 100644 --- a/.editorconfig +++ b/.editorconfig @@ -2,5 +2,4 @@ root = true [*] indent_style = tab -indent_size = 2 diff --git a/.env.sample b/.env.sample new file mode 100644 index 0000000..9138ce2 --- /dev/null +++ b/.env.sample @@ -0,0 +1 @@ +DATABASE_URL=postgres://postgres@localhost/grimhilde diff --git a/.gitignore b/.gitignore index ea8c4bf..fedaa2b 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +.env diff --git a/Cargo.lock b/Cargo.lock index f16d114..f9d9e06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -365,6 +365,7 @@ dependencies = [ name = "ingest" version = "0.1.0" dependencies = [ + "dotenv", "ipnetwork 0.18.0", "serde", "serde_json", diff --git a/ingest/Cargo.toml b/ingest/Cargo.toml index 0e79858..1a2573c 100644 --- a/ingest/Cargo.toml +++ b/ingest/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +dotenv = "0.15" tempfile = "3" serde = { version = "1.0", features = [ "derive" ] } serde_json = "1.0" diff --git a/ingest/migrations/20220219194018_initial.down.sql b/ingest/migrations/20220219194018_initial.down.sql new file mode 100644 index 0000000..92e943d --- /dev/null +++ b/ingest/migrations/20220219194018_initial.down.sql @@ -0,0 +1,7 @@ +-- Add down migration script here +DROP TABLE hits; +DROP TABLE referrers; +DROP TABLE user_agents; +DROP TABLE remote_addresses; +DROP TABLE hosts; +DROP TABLE paths; diff --git a/ingest/migrations/20220219194018_initial.up.sql b/ingest/migrations/20220219194018_initial.up.sql new file mode 100644 index 0000000..5b4e93e --- /dev/null +++ b/ingest/migrations/20220219194018_initial.up.sql @@ -0,0 +1,58 @@ +-- Location of the served file +CREATE TABLE paths ( + -- The path string + path text UNIQUE NOT NULL, + -- The id of the path string + id serial PRIMARY KEY NOT NULL +); + +-- Host header strings +CREATE TABLE hosts ( + -- Host header value + host text UNIQUE NOT NULL, + -- The ID of the value + id serial PRIMARY KEY NOT NULL +); + +-- Address for the requester +CREATE TABLE remote_addresses ( + -- The IP address of the requester + addr inet UNIQUE NOT NULL, + -- The id of the address + id serial PRIMARY KEY NOT NULL +); + +-- User agent strings +CREATE TABLE user_agents ( + -- A User-Agent header value. Max length is the default maximum header size in nginx. + agent varchar(8192) NOT NULL, + -- The string's ID + id serial PRIMARY KEY NOT NULL +); + +CREATE TABLE referrers ( + -- A Referer header value. Max length is the default maximum header size in nginx. + referrer varchar(8192) NOT NULL, + -- The string's ID + id serial PRIMARY KEY NOT NULL +); + +-- Requests +CREATE TABLE hits ( + -- The size of the response payload + size integer NOT NULL, + -- Reference to a path id + path serial NOT NULL REFERENCES paths (id), + -- Reference to a remote host id + remote serial NOT NULL REFERENCES remote_addresses (id), + -- Reference to a host header id + host serial NOT NULL REFERENCES hosts (id), + -- Reference to a user agent + user_agent serial NOT NULL REFERENCES user_agents (id), + -- Reference to a referrer + referrer serial NOT NULL REFERENCES referrers (id), + -- Was the request served over HTTPS + secure boolean NOT NULL, + -- The time the request was made + time timestamp with time zone NOT NULL +); diff --git a/ingest/src/db.rs b/ingest/src/db.rs new file mode 100644 index 0000000..f28a99c --- /dev/null +++ b/ingest/src/db.rs @@ -0,0 +1,59 @@ +// grimhilde - mirror stats +// Copyright (C) 2022 Amanda Graven and Emelie Graven +// Licensed under the EUPL +// +use std::error::Error as StdError; + +use sqlx::PgPool; + +use crate::Request; + +/// Connect to a postgress address. +pub async fn connect(address: &str) -> sqlx::Result { + let pool = PgPool::connect(address).await?; + sqlx::migrate!().run(&pool).await?; + Ok(pool) +} + +/// Insert a request hit into the database +pub async fn insert(pool: &PgPool, req: Request) -> Result<(), Box> { + let mut trans = pool.begin().await?; + + // Get the id's for deduplicated values. Insert a value if one is missing. + let path_id = sqlx::query!( + "INSERT INTO paths (path) VALUES ($1) ON CONFLICT DO NOTHING RETURNING id", + &req.path, + ).fetch_one(&mut *trans).await?.id; + let remote_id = sqlx::query!( + "INSERT INTO remote_addresses (addr) VALUES ($1) ON CONFLICT DO NOTHING RETURNING id", + req.addr) + .fetch_one(&mut *trans).await?.id; + let host_id = sqlx::query!( + "INSERT INTO hosts (host) VALUES ($1) ON CONFLICT DO NOTHING RETURNING id", + &req.host) + .fetch_one(&mut *trans).await?.id; + let user_agent_id = sqlx::query!( + "INSERT INTO user_agents (agent) VALUES ($1) ON CONFLICT DO NOTHING RETURNING id", + &req.user_agent, + ).fetch_one(&mut *trans).await?.id; + let referrer_id = sqlx::query!( + "INSERT INTO referrers (referrer) VALUES ($1) ON CONFLICT DO NOTHING RETURNING id", + &req.referrer, + ).fetch_one(&mut *trans).await?.id; + + sqlx::query!( + "INSERT INTO hits (size, path, remote, host, user_agent, referrer, secure, time) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", + req.size, + path_id, + remote_id, + host_id, + user_agent_id, + referrer_id, + req.secure, + req.timestamp, + ).execute(&mut *trans).await?; + + trans.commit().await?; + Ok(()) +} diff --git a/ingest/src/main.rs b/ingest/src/main.rs index 3f6446e..255ad49 100644 --- a/ingest/src/main.rs +++ b/ingest/src/main.rs @@ -1,83 +1,101 @@ // grimhilde - mirror stats -// Copyright (C) 2022 Amanda Graven, & Emelie Graven +// Copyright (C) 2022 Amanda Graven and Emelie Graven // Licensed under the EUPL -use sqlx::types::{time::OffsetDateTime, ipnetwork::IpNetwork}; +use sqlx::{types::{time::OffsetDateTime, ipnetwork::IpNetwork}, PgPool}; use tokio::net::UdpSocket; use std::{io, str}; use serde::{Serialize, Deserialize}; +mod db; + #[derive(Deserialize, Serialize, Debug, Clone)] -struct RawRequest { - size: u64, - path: String, - addr: String, - user_agent: String, - referrer: String, - secure: bool, - timestamp: String, +pub struct RawRequest { + size: i32, + path: String, + addr: String, + host: String, + user_agent: String, + referrer: String, + secure: bool, + timestamp: String, } -#[derive(Serialize)] -struct Request { - size: u64, - path: String, - addr: IpNetwork, - user_agent: String, - referrer: String, - secure: bool, - timestamp: OffsetDateTime +//#[derive(Serialize)] +pub struct Request { + size: i32, + path: String, + addr: IpNetwork, + host: String, + user_agent: String, + referrer: String, + secure: bool, + timestamp: OffsetDateTime } #[tokio::main] async fn main() -> Result<(), Box> { - // Connect to a peer - let socket = UdpSocket::bind("127.0.0.1:9210").await?; + dotenv::dotenv()?; + // Connect to the postgres database + let pool = db::connect(&std::env::var("DATABASE_URL")?).await?; - loop { - // The buffer is **not** included in the async task and will - // only exist on the stack). - let mut buf = [0; 8192]; + // Connect to a peer + let socket = UdpSocket::bind("127.0.0.1:9210").await?; - // Try to recv data, this may still fail with `WouldBlock` - // if the readiness event is a false positive. - match socket.recv(&mut buf).await { - Ok(n) => { - let message = str::from_utf8(&buf[..n])?; - // The incoming nginx logs are already structured as json. - // We identify the start of the log string... - let index = match message.find('{') { - Some(i) => i, - None => continue, - }; - // ...and strip of the prefix. - let (prefix, message) = message.split_at(index); - // Nginx should be configured to tag log entries with "mirror" - // Making sure this tag is present is at least a rudimentary - // protection against data accidentally coming in from - // elsewhere - if prefix.ends_with("mirror: ") { - let req: RawRequest = serde_json::from_str(message)?; - let req: Request = Request { - size: req.size, - path: req.path, - addr: req.addr.parse()?, - user_agent: req.user_agent, - referrer: req.referrer, - secure: req.secure, - timestamp: OffsetDateTime::parse(req.timestamp, "%d/%m/%Y:%T %z")? - }; - - } - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - continue; - } - Err(e) => { - return Err(e.into()); - } - } - } - Ok(()) + loop { + // The buffer is **not** included in the async task and will + // only exist on the stack). + let mut buf = [0; 8192]; + + // Try to recv data, this may still fail with `WouldBlock` + // if the readiness event is a false positive. + match socket.recv(&mut buf).await { + Ok(n) => match receive_message(&pool, &buf[..n]).await { + Ok(()) => (), + Err(e) => { + println!("ERROR: {}", e); + // TODO: println!("Log message was: {}", &buf[..n]); + }, + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + continue; + } + Err(e) => { + return Err(e.into()); + } + } + } +} + +async fn receive_message(pool: &PgPool, message: &[u8]) -> Result<(), Box> { + let message = str::from_utf8(message)?; + // The incoming nginx logs are already structured as json. + // We identify the start of the log string... + let index = match message.find('{') { + Some(i) => i, + None => return Err(String::from("JSON object missing from log").into()), + }; + // ...and strip of the prefix. + let (prefix, json) = message.split_at(index); + // Nginx should be configured to tag log entries with "mirror" + // Making sure this tag is present is at least a rudimentary + // protection against data accidentally coming in from + // elsewhere + if !prefix.ends_with("mirror: ") { + return Err(String::from("mirror prefix not present, got log line").into()); + } + let req: RawRequest = serde_json::from_str(json)?; + let req: Request = Request { + size: req.size, + path: req.path, + addr: req.addr.parse()?, + host: req.host, + user_agent: req.user_agent, + referrer: req.referrer, + secure: req.secure, + timestamp: OffsetDateTime::parse(req.timestamp, "%d/%m/%Y:%T %z")? + }; + db::insert(pool, req).await?; + Ok(()) } diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..4d17809 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,2 @@ +edition = "2021" +hard_tabs = true