Send log entries to postgres db

This commit is contained in:
Amanda Graven 2022-02-19 21:57:51 +01:00
parent 64fa7edc8e
commit e5d612223b
Signed by: amanda
GPG key ID: F747582C5608F4CB
10 changed files with 213 additions and 66 deletions

View file

@ -2,5 +2,4 @@ root = true
[*] [*]
indent_style = tab indent_style = tab
indent_size = 2

1
.env.sample Normal file
View file

@ -0,0 +1 @@
DATABASE_URL=postgres://postgres@localhost/grimhilde

1
.gitignore vendored
View file

@ -1 +1,2 @@
/target /target
.env

1
Cargo.lock generated
View file

@ -365,6 +365,7 @@ dependencies = [
name = "ingest" name = "ingest"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"dotenv",
"ipnetwork 0.18.0", "ipnetwork 0.18.0",
"serde", "serde",
"serde_json", "serde_json",

View file

@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
dotenv = "0.15"
tempfile = "3" tempfile = "3"
serde = { version = "1.0", features = [ "derive" ] } serde = { version = "1.0", features = [ "derive" ] }
serde_json = "1.0" serde_json = "1.0"

View file

@ -0,0 +1,7 @@
-- Add down migration script here
DROP TABLE hits;
DROP TABLE referrers;
DROP TABLE user_agents;
DROP TABLE remote_addresses;
DROP TABLE hosts;
DROP TABLE paths;

View file

@ -0,0 +1,58 @@
-- Location of the served file
CREATE TABLE paths (
-- The path string
path text UNIQUE NOT NULL,
-- The id of the path string
id serial PRIMARY KEY NOT NULL
);
-- Host header strings
CREATE TABLE hosts (
-- Host header value
host text UNIQUE NOT NULL,
-- The ID of the value
id serial PRIMARY KEY NOT NULL
);
-- Address for the requester
CREATE TABLE remote_addresses (
-- The IP address of the requester
addr inet UNIQUE NOT NULL,
-- The id of the address
id serial PRIMARY KEY NOT NULL
);
-- User agent strings
CREATE TABLE user_agents (
-- A User-Agent header value. Max length is the default maximum header size in nginx.
agent varchar(8192) NOT NULL,
-- The string's ID
id serial PRIMARY KEY NOT NULL
);
CREATE TABLE referrers (
-- A Referer header value. Max length is the default maximum header size in nginx.
referrer varchar(8192) NOT NULL,
-- The string's ID
id serial PRIMARY KEY NOT NULL
);
-- Requests
CREATE TABLE hits (
-- The size of the response payload
size integer NOT NULL,
-- Reference to a path id
path serial NOT NULL REFERENCES paths (id),
-- Reference to a remote host id
remote serial NOT NULL REFERENCES remote_addresses (id),
-- Reference to a host header id
host serial NOT NULL REFERENCES hosts (id),
-- Reference to a user agent
user_agent serial NOT NULL REFERENCES user_agents (id),
-- Reference to a referrer
referrer serial NOT NULL REFERENCES referrers (id),
-- Was the request served over HTTPS
secure boolean NOT NULL,
-- The time the request was made
time timestamp with time zone NOT NULL
);

59
ingest/src/db.rs Normal file
View file

@ -0,0 +1,59 @@
// grimhilde - mirror stats
// Copyright (C) 2022 Amanda Graven and Emelie Graven
// Licensed under the EUPL
//
use std::error::Error as StdError;
use sqlx::PgPool;
use crate::Request;
/// Connect to a postgress address.
pub async fn connect(address: &str) -> sqlx::Result<PgPool> {
let pool = PgPool::connect(address).await?;
sqlx::migrate!().run(&pool).await?;
Ok(pool)
}
/// Insert a request hit into the database
pub async fn insert(pool: &PgPool, req: Request) -> Result<(), Box<dyn StdError>> {
let mut trans = pool.begin().await?;
// Get the id's for deduplicated values. Insert a value if one is missing.
let path_id = sqlx::query!(
"INSERT INTO paths (path) VALUES ($1) ON CONFLICT DO NOTHING RETURNING id",
&req.path,
).fetch_one(&mut *trans).await?.id;
let remote_id = sqlx::query!(
"INSERT INTO remote_addresses (addr) VALUES ($1) ON CONFLICT DO NOTHING RETURNING id",
req.addr)
.fetch_one(&mut *trans).await?.id;
let host_id = sqlx::query!(
"INSERT INTO hosts (host) VALUES ($1) ON CONFLICT DO NOTHING RETURNING id",
&req.host)
.fetch_one(&mut *trans).await?.id;
let user_agent_id = sqlx::query!(
"INSERT INTO user_agents (agent) VALUES ($1) ON CONFLICT DO NOTHING RETURNING id",
&req.user_agent,
).fetch_one(&mut *trans).await?.id;
let referrer_id = sqlx::query!(
"INSERT INTO referrers (referrer) VALUES ($1) ON CONFLICT DO NOTHING RETURNING id",
&req.referrer,
).fetch_one(&mut *trans).await?.id;
sqlx::query!(
"INSERT INTO hits (size, path, remote, host, user_agent, referrer, secure, time) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
req.size,
path_id,
remote_id,
host_id,
user_agent_id,
referrer_id,
req.secure,
req.timestamp,
).execute(&mut *trans).await?;
trans.commit().await?;
Ok(())
}

View file

@ -1,83 +1,101 @@
// grimhilde - mirror stats // grimhilde - mirror stats
// Copyright (C) 2022 Amanda Graven, <amanda@graven.dev> & Emelie Graven <emelie@graven.dev> // Copyright (C) 2022 Amanda Graven and Emelie Graven
// Licensed under the EUPL // Licensed under the EUPL
use sqlx::types::{time::OffsetDateTime, ipnetwork::IpNetwork}; use sqlx::{types::{time::OffsetDateTime, ipnetwork::IpNetwork}, PgPool};
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use std::{io, str}; use std::{io, str};
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
mod db;
#[derive(Deserialize, Serialize, Debug, Clone)] #[derive(Deserialize, Serialize, Debug, Clone)]
struct RawRequest { pub struct RawRequest {
size: u64, size: i32,
path: String, path: String,
addr: String, addr: String,
user_agent: String, host: String,
referrer: String, user_agent: String,
secure: bool, referrer: String,
timestamp: String, secure: bool,
timestamp: String,
} }
#[derive(Serialize)] //#[derive(Serialize)]
struct Request { pub struct Request {
size: u64, size: i32,
path: String, path: String,
addr: IpNetwork, addr: IpNetwork,
user_agent: String, host: String,
referrer: String, user_agent: String,
secure: bool, referrer: String,
timestamp: OffsetDateTime secure: bool,
timestamp: OffsetDateTime
} }
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Connect to a peer dotenv::dotenv()?;
let socket = UdpSocket::bind("127.0.0.1:9210").await?; // Connect to the postgres database
let pool = db::connect(&std::env::var("DATABASE_URL")?).await?;
loop { // Connect to a peer
// The buffer is **not** included in the async task and will let socket = UdpSocket::bind("127.0.0.1:9210").await?;
// only exist on the stack).
let mut buf = [0; 8192];
// Try to recv data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match socket.recv(&mut buf).await {
Ok(n) => {
let message = str::from_utf8(&buf[..n])?;
// The incoming nginx logs are already structured as json.
// We identify the start of the log string...
let index = match message.find('{') {
Some(i) => i,
None => continue,
};
// ...and strip of the prefix.
let (prefix, message) = message.split_at(index);
// Nginx should be configured to tag log entries with "mirror"
// Making sure this tag is present is at least a rudimentary
// protection against data accidentally coming in from
// elsewhere
if prefix.ends_with("mirror: ") {
let req: RawRequest = serde_json::from_str(message)?;
let req: Request = Request {
size: req.size,
path: req.path,
addr: req.addr.parse()?,
user_agent: req.user_agent,
referrer: req.referrer,
secure: req.secure,
timestamp: OffsetDateTime::parse(req.timestamp, "%d/%m/%Y:%T %z")?
};
}
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
Ok(()) loop {
// The buffer is **not** included in the async task and will
// only exist on the stack).
let mut buf = [0; 8192];
// Try to recv data, this may still fail with `WouldBlock`
// if the readiness event is a false positive.
match socket.recv(&mut buf).await {
Ok(n) => match receive_message(&pool, &buf[..n]).await {
Ok(()) => (),
Err(e) => {
println!("ERROR: {}", e);
// TODO: println!("Log message was: {}", &buf[..n]);
},
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
}
Err(e) => {
return Err(e.into());
}
}
}
}
async fn receive_message(pool: &PgPool, message: &[u8]) -> Result<(), Box<dyn std::error::Error>> {
let message = str::from_utf8(message)?;
// The incoming nginx logs are already structured as json.
// We identify the start of the log string...
let index = match message.find('{') {
Some(i) => i,
None => return Err(String::from("JSON object missing from log").into()),
};
// ...and strip of the prefix.
let (prefix, json) = message.split_at(index);
// Nginx should be configured to tag log entries with "mirror"
// Making sure this tag is present is at least a rudimentary
// protection against data accidentally coming in from
// elsewhere
if !prefix.ends_with("mirror: ") {
return Err(String::from("mirror prefix not present, got log line").into());
}
let req: RawRequest = serde_json::from_str(json)?;
let req: Request = Request {
size: req.size,
path: req.path,
addr: req.addr.parse()?,
host: req.host,
user_agent: req.user_agent,
referrer: req.referrer,
secure: req.secure,
timestamp: OffsetDateTime::parse(req.timestamp, "%d/%m/%Y:%T %z")?
};
db::insert(pool, req).await?;
Ok(())
} }

2
rustfmt.toml Normal file
View file

@ -0,0 +1,2 @@
edition = "2021"
hard_tabs = true