style(rust): Format all Rust code with rustfmt

Change-Id: Iab7e00cc26a4f9727d3ab98691ef379921a33052
Reviewed-on: https://cl.tvl.fyi/c/depot/+/5240
Tested-by: BuildkiteCI
Reviewed-by: kanepyork <rikingcoding@gmail.com>
Reviewed-by: Profpatsch <mail@profpatsch.de>
Reviewed-by: grfn <grfn@gws.fyi>
Reviewed-by: tazjin <tazjin@tvl.su>
This commit is contained in:
Vincent Ambo 2022-02-07 18:49:59 +03:00 committed by tazjin
parent 3318982f81
commit 3d8ee62087
42 changed files with 1253 additions and 876 deletions

View file

@ -1,6 +1,5 @@
extern crate pkg_config;
fn main() {
pkg_config::probe_library("libsystemd")
.expect("Could not probe libsystemd");
pkg_config::probe_library("libsystemd").expect("Could not probe libsystemd");
}

View file

@ -31,11 +31,16 @@
//! `GOOGLE_APPLICATION_CREDENTIALS`, `GOOGLE_CLOUD_PROJECT` and
//! `LOG_NAME` environment variables.
#[macro_use] extern crate failure;
#[macro_use] extern crate log;
#[macro_use] extern crate serde_derive;
#[macro_use] extern crate serde_json;
#[macro_use] extern crate lazy_static;
#[macro_use]
extern crate failure;
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate serde_json;
#[macro_use]
extern crate lazy_static;
extern crate chrono;
extern crate env_logger;
@ -48,13 +53,11 @@ use chrono::offset::LocalResult;
use chrono::prelude::{DateTime, TimeZone, Utc};
use failure::ResultExt;
use serde_json::{from_str, Value};
use std::env;
use std::fs::{self, File, rename};
use std::io::{self, Read, ErrorKind, Write};
use std::mem;
use std::fs::{self, rename, File};
use std::io::{self, ErrorKind, Read, Write};
use std::path::PathBuf;
use std::process;
use std::time::{Duration, Instant};
use std::{env, mem, process};
use systemd::journal::{Journal, JournalFiles, JournalRecord, JournalSeek};
#[cfg(test)]
@ -62,10 +65,12 @@ mod tests;
const LOGGING_SERVICE: &str = "https://logging.googleapis.com/google.logging.v2.LoggingServiceV2";
const ENTRIES_WRITE_URL: &str = "https://logging.googleapis.com/v2/entries:write";
const METADATA_TOKEN_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
const METADATA_TOKEN_URL: &str =
"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
const METADATA_ID_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/id";
const METADATA_ZONE_URL: &str = "http://metadata.google.internal/computeMetadata/v1/instance/zone";
const METADATA_PROJECT_URL: &str = "http://metadata.google.internal/computeMetadata/v1/project/project-id";
const METADATA_PROJECT_URL: &str =
"http://metadata.google.internal/computeMetadata/v1/project/project-id";
/// Convenience type alias for results using failure's `Error` type.
type Result<T> = std::result::Result<T, failure::Error>;
@ -134,14 +139,17 @@ fn get_metadata(url: &str) -> Result<String> {
if response.ok() {
// Whitespace is trimmed to remove newlines from responses.
let body = response.into_string()
let body = response
.into_string()
.context("Failed to decode metadata response")?
.trim().to_string();
.trim()
.to_string();
Ok(body)
} else {
let status = response.status_line().to_string();
let body = response.into_string()
let body = response
.into_string()
.unwrap_or_else(|e| format!("Metadata body error: {}", e));
bail!("Metadata failure: {} ({})", body, status)
}
@ -186,11 +194,9 @@ fn determine_monitored_resource() -> Value {
}
})
} else {
let instance_id = get_metadata(METADATA_ID_URL)
.expect("Could not determine instance ID");
let instance_id = get_metadata(METADATA_ID_URL).expect("Could not determine instance ID");
let zone = get_metadata(METADATA_ZONE_URL)
.expect("Could not determine instance zone");
let zone = get_metadata(METADATA_ZONE_URL).expect("Could not determine instance zone");
json!({
"type": "gce_instance",
@ -253,7 +259,8 @@ fn sign_service_account_token(credentials: &Credentials) -> Result<Token> {
use medallion::{Algorithm, Header, Payload};
let iat = Utc::now();
let exp = iat.checked_add_signed(chrono::Duration::seconds(3600))
let exp = iat
.checked_add_signed(chrono::Duration::seconds(3600))
.ok_or_else(|| format_err!("Failed to calculate token expiry"))?;
let header = Header {
@ -323,7 +330,9 @@ enum Payload {
/// text format.
fn message_to_payload(message: Option<String>) -> Payload {
match message {
None => Payload::TextPayload { text_payload: "empty log entry".into() },
None => Payload::TextPayload {
text_payload: "empty log entry".into(),
},
Some(text_payload) => {
// Attempt to deserialize the text payload as a generic
// JSON value.
@ -333,7 +342,7 @@ fn message_to_payload(message: Option<String>) -> Payload {
// expect other types of JSON payload) and return it
// in that case.
if json_payload.is_object() {
return Payload::JsonPayload { json_payload }
return Payload::JsonPayload { json_payload };
}
}
@ -450,9 +459,7 @@ impl From<JournalRecord> for LogEntry {
// Journald uses syslogd's concept of priority. No idea if this is
// always present, but it's optional in the Stackdriver API, so we just
// omit it if we can't find or parse it.
let severity = record
.remove("PRIORITY")
.and_then(priority_to_severity);
let severity = record.remove("PRIORITY").and_then(priority_to_severity);
LogEntry {
payload,
@ -468,8 +475,7 @@ impl From<JournalRecord> for LogEntry {
/// Attempt to read from the journal. If no new entry is present,
/// await the next one up to the specified timeout.
fn receive_next_record(timeout: Duration, journal: &mut Journal)
-> Result<Option<JournalRecord>> {
fn receive_next_record(timeout: Duration, journal: &mut Journal) -> Result<Option<JournalRecord>> {
let next_record = journal.next_record()?;
if next_record.is_some() {
return Ok(next_record);
@ -525,11 +531,10 @@ fn persist_cursor(cursor: String) -> Result<()> {
if cursor.is_empty() {
error!("Received empty journald cursor position, refusing to persist!");
error!("Please report this message at https://github.com/tazjin/journaldriver/issues/2");
return Ok(())
return Ok(());
}
let mut file = File::create(&*CURSOR_TMP_FILE)
.context("Failed to create cursor file")?;
let mut file = File::create(&*CURSOR_TMP_FILE).context("Failed to create cursor file")?;
write!(file, "{}", cursor).context("Failed to write cursor file")?;
@ -547,9 +552,7 @@ fn persist_cursor(cursor: String) -> Result<()> {
///
/// If flushing is successful the last cursor position will be
/// persisted to disk.
fn flush(token: &mut Token,
entries: Vec<LogEntry>,
cursor: String) -> Result<()> {
fn flush(token: &mut Token, entries: Vec<LogEntry>, cursor: String) -> Result<()> {
if token.is_expired() {
debug!("Refreshing Google metadata access token");
let new_token = get_token()?;
@ -598,7 +601,8 @@ fn write_entries(token: &Token, request: Value) -> Result<()> {
Ok(())
} else {
let status = response.status_line().to_string();
let body = response.into_string()
let body = response
.into_string()
.unwrap_or_else(|_| "no response body".into());
bail!("Write failure: {} ({})", body, status)
}
@ -624,14 +628,12 @@ fn initial_cursor() -> Result<JournalSeek> {
Err(ref err) if err.kind() == ErrorKind::NotFound => {
info!("No previous cursor position, reading from journal tail");
Ok(JournalSeek::Tail)
},
Err(err) => {
(Err(err).context("Could not read cursor position"))?
}
Err(err) => (Err(err).context("Could not read cursor position"))?,
}
}
fn main () {
fn main() {
env_logger::init();
// The directory in which cursor positions are persisted should
@ -641,17 +643,17 @@ fn main () {
process::exit(1);
}
let cursor_position_dir = CURSOR_FILE.parent()
let cursor_position_dir = CURSOR_FILE
.parent()
.expect("Invalid cursor position file path");
fs::create_dir_all(cursor_position_dir)
.expect("Could not create directory to store cursor position in");
let mut journal = Journal::open(JournalFiles::All, false, true)
.expect("Failed to open systemd journal");
let mut journal =
Journal::open(JournalFiles::All, false, true).expect("Failed to open systemd journal");
let seek_position = initial_cursor()
.expect("Failed to determine initial cursor position");
let seek_position = initial_cursor().expect("Failed to determine initial cursor position");
match journal.seek(seek_position) {
Ok(cursor) => info!("Opened journal at cursor '{}'", cursor),

View file

@ -15,7 +15,10 @@ fn test_text_entry_serialization() {
let expected = "{\"labels\":null,\"textPayload\":\"test entry\"}";
let result = to_string(&entry).expect("serialization failed");
assert_eq!(expected, result, "Plain text payload should serialize correctly")
assert_eq!(
expected, result,
"Plain text payload should serialize correctly"
)
}
#[test]
@ -26,7 +29,7 @@ fn test_json_entry_serialization() {
payload: Payload::JsonPayload {
json_payload: json!({
"message": "JSON test"
})
}),
},
severity: None,
};
@ -45,7 +48,10 @@ fn test_plain_text_payload() {
text_payload: "plain text payload".into(),
};
assert_eq!(expected, payload, "Plain text payload should be detected correctly");
assert_eq!(
expected, payload,
"Plain text payload should be detected correctly"
);
}
#[test]
@ -55,7 +61,10 @@ fn test_empty_payload() {
text_payload: "empty log entry".into(),
};
assert_eq!(expected, payload, "Empty payload should be handled correctly");
assert_eq!(
expected, payload,
"Empty payload should be handled correctly"
);
}
#[test]
@ -66,10 +75,13 @@ fn test_json_payload() {
json_payload: json!({
"someKey": "someValue",
"otherKey": 42
})
}),
};
assert_eq!(expected, payload, "JSON payload should be detected correctly");
assert_eq!(
expected, payload,
"JSON payload should be detected correctly"
);
}
#[test]
@ -82,14 +94,16 @@ fn test_json_no_object() {
text_payload: "42".into(),
};
assert_eq!(expected, payload, "Non-object JSON payload should be plain text");
assert_eq!(
expected, payload,
"Non-object JSON payload should be plain text"
);
}
#[test]
fn test_parse_microseconds() {
let input: String = "1529175149291187".into();
let expected: DateTime<Utc> = "2018-06-16T18:52:29.291187Z"
.to_string().parse().unwrap();
let expected: DateTime<Utc> = "2018-06-16T18:52:29.291187Z".to_string().parse().unwrap();
assert_eq!(Some(expected), parse_microseconds(input));
}

View file

@ -1,36 +1,38 @@
extern crate clap;
extern crate posix_mq;
extern crate libc;
extern crate nix;
extern crate posix_mq;
use clap::{App, SubCommand, Arg, ArgMatches, AppSettings};
use posix_mq::{Name, Queue, Message};
use clap::{App, AppSettings, Arg, ArgMatches, SubCommand};
use posix_mq::{Message, Name, Queue};
use std::fs::{read_dir, File};
use std::io::{self, Read, Write};
use std::process::exit;
fn run_ls() {
let mqueues = read_dir("/dev/mqueue")
.expect("Could not read message queues");
let mqueues = read_dir("/dev/mqueue").expect("Could not read message queues");
for queue in mqueues {
let path = queue.unwrap().path();
let status = {
let mut file = File::open(&path)
.expect("Could not open queue file");
let mut file = File::open(&path).expect("Could not open queue file");
let mut content = String::new();
file.read_to_string(&mut content).expect("Could not read queue file");
file.read_to_string(&mut content)
.expect("Could not read queue file");
content
};
let queue_name = path.components().last().unwrap()
let queue_name = path
.components()
.last()
.unwrap()
.as_os_str()
.to_string_lossy();
println!("/{}: {}", queue_name, status)
};
}
}
fn run_inspect(queue_name: &str) {
@ -47,8 +49,7 @@ fn run_create(cmd: &ArgMatches) {
set_rlimit(rlimit.parse().expect("Invalid rlimit value"));
}
let name = Name::new(cmd.value_of("queue").unwrap())
.expect("Invalid queue name");
let name = Name::new(cmd.value_of("queue").unwrap()).expect("Invalid queue name");
let max_pending: i64 = cmd.value_of("max-pending").unwrap().parse().unwrap();
let max_size: i64 = cmd.value_of("max-size").unwrap().parse().unwrap();
@ -56,11 +57,11 @@ fn run_create(cmd: &ArgMatches) {
let queue = Queue::create(name, max_pending, max_size * 1024);
match queue {
Ok(_) => println!("Queue created successfully"),
Ok(_) => println!("Queue created successfully"),
Err(e) => {
writeln!(io::stderr(), "Could not create queue: {}", e).ok();
exit(1);
},
}
};
}
@ -120,7 +121,12 @@ fn run_rlimit() {
};
if errno != 0 {
writeln!(io::stderr(), "Could not get message queue rlimit: {}", errno).ok();
writeln!(
io::stderr(),
"Could not get message queue rlimit: {}",
errno
)
.ok();
} else {
println!("Message queue rlimit:");
println!("Current limit: {}", rlimit.rlim_cur);
@ -170,16 +176,20 @@ fn main() {
.about("Create a new queue")
.arg(&queue_arg)
.arg(&rlimit_arg)
.arg(Arg::with_name("max-size")
.help("maximum message size (in kB)")
.long("max-size")
.required(true)
.takes_value(true))
.arg(Arg::with_name("max-pending")
.help("maximum # of pending messages")
.long("max-pending")
.required(true)
.takes_value(true));
.arg(
Arg::with_name("max-size")
.help("maximum message size (in kB)")
.long("max-size")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("max-pending")
.help("maximum # of pending messages")
.long("max-pending")
.required(true)
.takes_value(true),
);
let receive = SubCommand::with_name("receive")
.about("Receive a message from a queue")
@ -188,9 +198,11 @@ fn main() {
let send = SubCommand::with_name("send")
.about("Send a message to a queue")
.arg(&queue_arg)
.arg(Arg::with_name("message")
.help("the message to send")
.required(true));
.arg(
Arg::with_name("message")
.help("the message to send")
.required(true),
);
let rlimit = SubCommand::with_name("rlimit")
.about("Get the message queue rlimit")
@ -211,13 +223,13 @@ fn main() {
match matches.subcommand() {
("ls", _) => run_ls(),
("inspect", Some(cmd)) => run_inspect(cmd.value_of("queue").unwrap()),
("create", Some(cmd)) => run_create(cmd),
("create", Some(cmd)) => run_create(cmd),
("receive", Some(cmd)) => run_receive(cmd.value_of("queue").unwrap()),
("send", Some(cmd)) => run_send(
("send", Some(cmd)) => run_send(
cmd.value_of("queue").unwrap(),
cmd.value_of("message").unwrap()
cmd.value_of("message").unwrap(),
),
("rlimit", _) => run_rlimit(),
("rlimit", _) => run_rlimit(),
_ => unimplemented!(),
}
}

View file

@ -1,8 +1,5 @@
use nix;
use std::error;
use std::fmt;
use std::io;
use std::num;
use std::{error, fmt, io, num};
/// This module implements a simple error type to match the errors that can be thrown from the C
/// functions as well as some extra errors resulting from internal validations.

View file

@ -4,8 +4,7 @@ use super::*;
fn test_open_delete() {
// Simple test with default queue settings
let name = Name::new("/test-queue").unwrap();
let queue = Queue::open_or_create(name)
.expect("Opening queue failed");
let queue = Queue::open_or_create(name).expect("Opening queue failed");
let message = Message {
data: "test-message".as_bytes().to_vec(),