feat(main): Implement receiver & flushing logic

The only thing missing for a 0.1 test run is the actual gRPC call to
Stackdriver.
This commit is contained in:
Vincent Ambo 2018-05-27 23:57:24 +02:00
parent c5cd12b81f
commit 6793b25a67
4 changed files with 331 additions and 88 deletions

View file

@ -1,71 +0,0 @@
//! This module contains FFI-bindings to the journald APi. See
//! sd-journal(3) for detailed information about the API.
//!
//! Only calls required by journaldriver are implemented.
/// This type represents an opaque pointer to an `sd_journal` struct.
/// It should be changed to an `extern type` once RF1861 is
/// stabilized.
enum SdJournal {}
use failure::Error;
use std::mem;
extern {
fn sd_journal_open(sd_journal: *mut SdJournal, flags: usize) -> usize;
fn sd_journal_close(sd_journal: *mut SdJournal);
fn sd_journal_next(sd_journal: *mut SdJournal) -> usize;
}
// Safe interface:
/// This struct contains the opaque data used by libsystemd to
/// reference the journal.
pub struct Journal {
sd_journal: *mut SdJournal,
}
impl Drop for Journal {
fn drop(&mut self) {
unsafe {
sd_journal_close(self.sd_journal);
}
}
}
/// Open the journal for reading. No flags are supplied to libsystemd,
/// which means that all journal entries will become available.
pub fn open_journal() -> Result<Journal, Error> {
let (mut sd_journal, result) = unsafe {
let mut journal: SdJournal = mem::uninitialized();
let result = sd_journal_open(&mut journal, 0);
(journal, result)
};
ensure!(result == 0, "Could not open journal (errno: {})", result);
Ok(Journal { sd_journal: &mut sd_journal })
}
#[derive(Debug)]
pub enum NextEntry {
/// If no new entries are available in the journal this variant is
/// returned.
NoEntry,
Entry,
}
impl Journal {
pub fn read_next(&self) -> Result<NextEntry, Error> {
let result = unsafe {
sd_journal_next(self.sd_journal)
};
match result {
0 => Ok(NextEntry::NoEntry),
1 => Ok(NextEntry::Entry),
n if n > 1 => bail!("Journal unexpectedly advanced by {} entries!", n),
_ => bail!("An error occured while advancing the journal (errno: {})", result),
}
}
}

View file

@ -1,22 +1,107 @@
#[macro_use] extern crate failure;
extern crate libc;
// #[macro_use] extern crate failure;
#[macro_use] extern crate log;
mod journald;
extern crate env_logger;
extern crate systemd;
use systemd::journal::*;
use std::process;
use std::thread;
use std::sync::mpsc::{channel, Receiver};
use std::time::{Duration, Instant};
use std::collections::vec_deque::{VecDeque, Drain};
fn main() {
let mut journal = match journald::open_journal() {
Ok(journal) => journal,
Err(e) => {
println!("{}", e);
process::exit(1);
},
};
println!("foo");
let entry = journal.read_next();
println!("Entry: {:?}", entry)
#[derive(Debug)]
struct Record {
message: Option<String>,
hostname: Option<String>,
unit: Option<String>,
timestamp: Option<String>,
}
impl From<JournalRecord> for Record {
fn from(mut record: JournalRecord) -> Record {
Record {
// The message field is technically just a convention, but
// journald seems to default to it when ingesting unit
// output.
message: record.remove("MESSAGE"),
// Presumably this is always set, but who can be sure
// about anything in this world.
hostname: record.remove("_HOSTNAME"),
// The unit is seemingly missing on kernel entries, but
// present on all others.
unit: record.remove("_SYSTEMD_UNIT"),
// This timestamp is present on most log entries
// (seemingly all that are ingested from the output
// systemd units).
timestamp: record.remove("_SOURCE_REALTIME_TIMESTAMP"),
}
}
}
/// This function spawns a double-looped, blocking receiver. It will
/// buffer messages for a second before flushing them to Stackdriver.
fn receiver_loop(rx: Receiver<Record>) {
let mut buf = VecDeque::new();
let iteration = Duration::from_millis(500);
loop {
trace!("Beginning outer iteration");
let now = Instant::now();
loop {
if now.elapsed() > iteration {
break;
}
if let Ok(record) = rx.recv_timeout(iteration) {
buf.push_back(record);
}
}
if !buf.is_empty() {
flush(buf.drain(..));
}
trace!("Done outer iteration");
}
}
/// Flushes all drained records to Stackdriver.
fn flush(drain: Drain<Record>) {
let record_count = drain.count();
debug!("Flushed {} records", record_count);
}
fn main () {
env_logger::init();
let mut journal = Journal::open(JournalFiles::All, false, true)
.expect("Failed to open systemd journal");
match journal.seek(JournalSeek::Tail) {
Ok(cursor) => info!("Opened journal at cursor '{}'", cursor),
Err(err) => {
error!("Failed to set initial journal position: {}", err);
process::exit(1)
}
}
let (tx, rx) = channel();
let _receiver = thread::spawn(move || receiver_loop(rx));
journal.watch_all_elements(move |record| {
let record: Record = record.into();
if record.message.is_some() {
tx.send(record).ok();
}
Ok(())
}).expect("Failed to read new entries from journal");
}