chore(users): grfn -> aspen

Change-Id: I6c6847fac56f0a9a1a2209792e00a3aec5e672b9
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10809
Autosubmit: aspen <root@gws.fyi>
Reviewed-by: sterni <sternenseemann@systemli.org>
Tested-by: BuildkiteCI
Reviewed-by: lukegb <lukegb@tvl.fyi>
This commit is contained in:
Aspen Smith 2024-02-11 22:00:40 -05:00 committed by clbot
parent 0ba476a426
commit 82ecd61f5c
478 changed files with 75 additions and 77 deletions

View file

@ -0,0 +1 @@
eval "$(lorri direnv)"

View file

@ -0,0 +1 @@
target/

1874
users/aspen/xanthous/server/Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,29 @@
[package]
name = "xanthous-server"
version = "0.1.0"
edition = "2018"
[dependencies]
clap = { version = "3.0", features = [ "derive", "env" ] }
color-eyre = "0.5.11"
eyre = "0.6.5"
thrussh = "0.33.5"
thrussh-keys = "0.21.0"
tracing = "0.1.29"
tracing-subscriber = "0.2.25"
metrics = "0.17.0"
metrics-exporter-prometheus = "0.6.1"
futures = "0.3.17"
libc = "0.2.103"
nix = "0.23.0"
# Pins for rust 1.55 (2018 edition) until we have 1.56 in nixpkgs-unstable
pbkdf2 = "<0.9"
base64ct = "<1.2"
[dependencies.tokio]
version = "1.13"
features = ["rt", "rt-multi-thread", "macros", "net", "process", "fs", "signal"]
[dev-dependencies]
tempfile = "3.2.0"

View file

@ -0,0 +1,24 @@
args@{ depot ? import ../../../.. { }
, pkgs ? depot.third_party.nixpkgs
, ...
}:
depot.third_party.naersk.buildPackage {
name = "xanthous-server";
version = "0.0.1";
src = depot.third_party.gitignoreSource ./.;
# Workaround for a potential Nix bug related to restricted eval.
# See https://github.com/nix-community/naersk/issues/169
root = depot.nix.sparseTree {
root = ./.;
paths = [
./Cargo.toml
./Cargo.lock
];
};
passthru = {
docker = import ./docker.nix args;
};
}

View file

@ -0,0 +1,21 @@
{ depot ? import ../../../.. { }
, pkgs ? depot.third_party.nixpkgs
, ...
}:
let
inherit (depot.users.aspen) xanthous;
xanthous-server = xanthous.server;
in
pkgs.dockerTools.buildLayeredImage {
name = "xanthous-server";
tag = "latest";
contents = [ xanthous xanthous-server ];
config = {
Cmd = [
"${xanthous-server}/bin/xanthous-server"
"--xanthous-binary-path"
"${xanthous}/bin/xanthous"
];
};
}

View file

@ -0,0 +1,49 @@
{ config, lib, pkgs, depot, ... }:
let
cfg = config.services.xanthous-server;
in
{
options = with lib; {
services.xanthous-server = {
enable = mkEnableOption "xanthous server";
port = mkOption {
type = types.int;
default = 2222;
description = "Port to listen to for SSH connections";
};
metricsPort = mkOption {
type = types.int;
default = 9000;
description = "Port to listen to for prometheus metrics";
};
image = mkOption {
type = types.package;
default = depot.users.aspen.xanthous.server.docker;
description = "OCI image file to run";
};
ed25519SecretKeyFile = mkOption {
type = with types; uniq string;
description = "Path to the ed25519 secret key for the server";
};
};
};
config = lib.mkIf cfg.enable {
virtualisation.oci-containers.containers."xanthous-server" = {
autoStart = true;
image = "${cfg.image.imageName}:${cfg.image.imageTag}";
imageFile = cfg.image;
ports = [
"${toString cfg.port}:22"
"${toString cfg.metricsPort}:9000"
];
environment.SECRET_KEY_FILE = "/secret-key";
volumes = [ "/etc/secrets/xanthous-server-secret-key:/secret-key" ];
};
};
}

View file

@ -0,0 +1,11 @@
let
depot = import ../../../.. { };
pkgs = depot.third_party.nixpkgs;
in
pkgs.mkShell {
buildInputs = with pkgs; [
rustup
rust-analyzer
];
}

View file

@ -0,0 +1,385 @@
use std::net::SocketAddr;
use std::path::PathBuf;
use std::pin::Pin;
use std::process::Command;
use std::str;
use std::sync::Arc;
use clap::Parser;
use color_eyre::eyre::Result;
use eyre::{bail, Context};
use futures::future::{ready, Ready};
use futures::Future;
use metrics_exporter_prometheus::PrometheusBuilder;
use nix::pty::Winsize;
use pty::ChildHandle;
use thrussh::server::{self, Auth, Session};
use thrussh::{ChannelId, CryptoVec};
use thrussh_keys::decode_secret_key;
use thrussh_keys::key::KeyPair;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::select;
use tokio::time::Instant;
use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use tracing_subscriber::EnvFilter;
use crate::pty::WaitPid;
mod metrics;
mod pty;
use crate::metrics::reported::*;
use crate::metrics::{decrement_gauge, histogram, increment_counter, increment_gauge};
/// SSH-compatible server for playing Xanthous
#[derive(Parser, Debug)]
struct Opts {
/// Address to bind to
#[clap(long, short = 'a', default_value = "0.0.0.0:22")]
address: String,
/// Address to listen to for metrics
#[clap(long, default_value = "0.0.0.0:9000")]
metrics_address: SocketAddr,
/// Format to use when emitting log events
#[clap(
long,
env = "LOG_FORMAT",
default_value = "full",
possible_values = &["compact", "full", "pretty", "json"]
)]
log_format: String,
/// Full path to the xanthous binary
#[clap(long, env = "XANTHOUS_BINARY_PATH")]
xanthous_binary_path: String,
/// Path to a file containing the ed25519 secret key for the server
#[clap(long, env = "SECRET_KEY_FILE")]
secret_key_file: PathBuf,
/// Level to log at
#[clap(long, env = "LOG_LEVEL", default_value = "info")]
log_level: String,
}
impl Opts {
async fn read_secret_key(&self) -> Result<KeyPair> {
let mut file = File::open(&self.secret_key_file)
.await
.context("Reading secret key file")?;
let mut secret_key = Vec::with_capacity(464);
file.read_to_end(&mut secret_key).await?;
Ok(decode_secret_key(str::from_utf8(&secret_key)?, None)?)
}
async fn ssh_server_config(&self) -> Result<server::Config> {
let key_pair = self.read_secret_key().await?;
Ok(server::Config {
server_id: "SSH-2.0-xanthous".to_owned(),
keys: vec![key_pair],
..Default::default()
})
}
fn init_logging(&self) -> Result<()> {
let filter = EnvFilter::try_new(&self.log_level)?;
let s = tracing_subscriber::fmt().with_env_filter(filter);
match self.log_format.as_str() {
"compact" => s.compact().init(),
"full" => s.init(),
"pretty" => s.pretty().init(),
"json" => s.json().with_current_span(true).init(),
f => bail!("Invalid log format `{}`", f),
}
Ok(())
}
}
struct Handler {
address: SocketAddr,
xanthous_binary_path: &'static str,
username: Option<String>,
child: Option<ChildHandle>,
}
async fn run_child(
mut child: pty::Child,
mut server_handle: server::Handle,
channel_id: ChannelId,
) -> Result<()> {
let mut buf = [0; 2048];
loop {
select! {
r = child.tty.read(&mut buf) => {
let read_bytes = r?;
if read_bytes == 0 {
info!("EOF received from process");
let _ = server_handle.close(channel_id).await;
return Ok(())
} else {
trace!(?read_bytes, "read bytes from child");
let _ = server_handle.data(channel_id, CryptoVec::from_slice(&buf[..read_bytes])).await;
}
}
status = WaitPid::new(child.pid) => {
match status {
Ok(_status) => info!("Child exited"),
Err(error) => error!(%error, "Child failed"),
}
let _ = server_handle.close(channel_id).await;
return Ok(())
}
}
}
}
impl Handler {
async fn spawn_shell(
&mut self,
mut handle: server::Handle,
channel_id: ChannelId,
term: String,
winsize: Winsize,
) -> Result<()> {
let mut cmd = Command::new(self.xanthous_binary_path);
cmd.env("TERM", term);
if let Some(username) = &self.username {
cmd.args(["--name", username]);
}
cmd.arg("--disable-saving");
let child = pty::spawn(cmd, Some(winsize), None).await?;
info!(pid = %child.pid, "Spawned child");
increment_gauge!(RUNNING_PROCESSES, 1.0);
self.child = Some(child.handle().await?);
tokio::spawn(
async move {
let span = info_span!("child", pid = %child.pid);
if let Err(error) = run_child(child, handle.clone(), channel_id)
.instrument(span.clone())
.await
{
span.in_scope(|| error!(%error, "Error running child"));
let _ = handle.close(channel_id).await;
}
decrement_gauge!(RUNNING_PROCESSES, 1.0);
}
.in_current_span(),
);
Ok(())
}
}
#[allow(clippy::type_complexity)]
impl server::Handler for Handler {
type Error = eyre::Error;
type FutureAuth = Ready<Result<(Self, Auth)>>;
type FutureUnit = Pin<Box<dyn Future<Output = Result<(Self, Session)>> + Send + 'static>>;
type FutureBool = Ready<Result<(Self, Session, bool)>>;
fn finished_auth(self, auth: Auth) -> Self::FutureAuth {
ready(Ok((self, auth)))
}
fn finished_bool(self, b: bool, session: Session) -> Self::FutureBool {
ready(Ok((self, session, b)))
}
fn finished(self, session: Session) -> Self::FutureUnit {
Box::pin(ready(Ok((self, session))))
}
fn auth_none(mut self, username: &str) -> Self::FutureAuth {
info!(%username, "Accepted new connection");
self.username = Some(username.to_owned());
self.finished_auth(Auth::Accept)
}
fn auth_password(mut self, username: &str, _password: &str) -> Self::FutureAuth {
info!(%username, "Accepted new connection");
self.username = Some(username.to_owned());
self.finished_auth(Auth::Accept)
}
fn auth_publickey(
mut self,
username: &str,
_: &thrussh_keys::key::PublicKey,
) -> Self::FutureAuth {
info!(%username, "Accepted new connection");
self.username = Some(username.to_owned());
self.finished_auth(Auth::Accept)
}
fn pty_request(
mut self,
channel: thrussh::ChannelId,
term: &str,
col_width: u32,
row_height: u32,
pix_width: u32,
pix_height: u32,
modes: &[(thrussh::Pty, u32)],
session: Session,
) -> Self::FutureUnit {
let term = term.to_owned();
let modes = modes.to_vec();
Box::pin(async move {
debug!(
%term,
%col_width,
%row_height,
%pix_width,
%pix_height,
?modes,
"PTY Requested"
);
self.spawn_shell(
session.handle(),
channel,
term,
Winsize {
ws_row: row_height as _,
ws_col: col_width as _,
ws_xpixel: pix_width as _,
ws_ypixel: pix_height as _,
},
)
.await?;
Ok((self, session))
})
}
fn window_change_request(
mut self,
_channel: ChannelId,
col_width: u32,
row_height: u32,
pix_width: u32,
pix_height: u32,
session: Session,
) -> Self::FutureUnit {
Box::pin(async move {
if let Some(child) = self.child.as_mut() {
trace!(%row_height, %col_width, "Window resize request received");
child
.resize_window(Winsize {
ws_row: row_height as _,
ws_col: col_width as _,
ws_xpixel: pix_width as _,
ws_ypixel: pix_height as _,
})
.await?;
} else {
warn!("Resize request received without child process; ignoring");
}
Ok((self, session))
})
}
fn data(
mut self,
_channel: thrussh::ChannelId,
data: &[u8],
session: Session,
) -> Self::FutureUnit {
trace!(data = %String::from_utf8_lossy(data), raw_data = ?data);
let data = data.to_owned();
Box::pin(async move {
if let Some(child) = self.child.as_mut() {
child.write_all(&data).await?;
} else {
warn!("Data received without child process; ignoring");
}
Ok((self, session))
})
}
}
#[tokio::main]
async fn main() -> Result<()> {
color_eyre::install()?;
let opts = Box::leak::<'static>(Box::new(Opts::parse()));
opts.init_logging()?;
PrometheusBuilder::new()
.listen_address(opts.metrics_address)
.install()?;
metrics::register();
let config = Arc::new(opts.ssh_server_config().await?);
info!(address = %opts.address, "Listening for new SSH connections");
let listener = TcpListener::bind(&opts.address).await?;
loop {
let (stream, address) = listener.accept().await?;
increment_counter!(CONNECTIONS_ACCEPTED);
increment_gauge!(ACTIVE_CONNECTIONS, 1.0);
let config = config.clone();
let handler = Handler {
xanthous_binary_path: &opts.xanthous_binary_path,
address,
username: None,
child: None,
};
tokio::spawn(async move {
let span = info_span!("client", address = %handler.address);
let start = Instant::now();
if let Err(error) = server::run_stream(config, stream, handler)
.instrument(span.clone())
.await
{
span.in_scope(|| error!(%error));
}
let duration = start.elapsed();
span.in_scope(|| info!(duration_ms = %duration.as_millis(), "Client disconnected"));
histogram!(CONNECTION_DURATION, duration);
decrement_gauge!(ACTIVE_CONNECTIONS, 1.0);
});
}
}
#[cfg(test)]
mod tests {
use tempfile::NamedTempFile;
use super::*;
#[tokio::test]
async fn read_secret_key() {
use std::io::Write;
let mut file = NamedTempFile::new().unwrap();
file.write_all(
b"
-----BEGIN OPENSSH PRIVATE KEY-----
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
QyNTUxOQAAACAYz80xcK7jYxZMAl6apIHKRtB0Z2U78gG39c1QaIhgMwAAAJB9vxK9fb8S
vQAAAAtzc2gtZWQyNTUxOQAAACAYz80xcK7jYxZMAl6apIHKRtB0Z2U78gG39c1QaIhgMw
AAAEDNZ0d3lLNBGU6Im4JOpr490TOjm+cB7kMVXjVg3iCowBjPzTFwruNjFkwCXpqkgcpG
0HRnZTvyAbf1zVBoiGAzAAAACHRlc3Qta2V5AQIDBAU=
-----END OPENSSH PRIVATE KEY-----
",
)
.unwrap();
let opts: Opts = Opts::parse_from(&[
"xanthous-server".as_ref(),
"--xanthous-binary-path".as_ref(),
"/bin/xanthous".as_ref(),
"--secret-key-file".as_ref(),
file.path().as_os_str(),
]);
opts.read_secret_key().await.unwrap();
}
}

View file

@ -0,0 +1,24 @@
pub use ::metrics::*;
pub mod reported {
/// Counter: Connections accepted on the TCP listener
pub const CONNECTIONS_ACCEPTED: &str = "ssh.connections.accepted";
/// Histogram: Connection duration
pub const CONNECTION_DURATION: &str = "ssh.connections.duration";
/// Gauge: Currently active connections
pub const ACTIVE_CONNECTIONS: &str = "ssh.connections.active";
/// Gauge: Currently running xanthous processes
pub const RUNNING_PROCESSES: &str = "ssh.child.processes";
}
pub fn register() {
use reported::*;
register_counter!(CONNECTIONS_ACCEPTED);
register_histogram!(CONNECTION_DURATION);
register_gauge!(ACTIVE_CONNECTIONS);
register_gauge!(RUNNING_PROCESSES);
}

View file

@ -0,0 +1,172 @@
use std::io::{self};
use std::os::unix::prelude::{AsRawFd, CommandExt, FromRawFd};
use std::pin::Pin;
use std::process::{abort, Command};
use std::task::{Context, Poll};
use eyre::{bail, Result};
use futures::Future;
use nix::pty::{forkpty, Winsize};
use nix::sys::termios::Termios;
use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus};
use nix::unistd::{ForkResult, Pid};
use tokio::fs::File;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::signal::unix::{signal, Signal, SignalKind};
use tokio::task::spawn_blocking;
mod ioctl {
use super::Winsize;
use libc::TIOCSWINSZ;
use nix::ioctl_write_ptr_bad;
ioctl_write_ptr_bad!(tiocswinsz, TIOCSWINSZ, Winsize);
}
async fn asyncify<F, T>(f: F) -> Result<T>
where
F: FnOnce() -> Result<T> + Send + 'static,
T: Send + 'static,
{
match spawn_blocking(f).await {
Ok(res) => res,
Err(_) => bail!("background task failed",),
}
}
pub struct Child {
pub tty: File,
pub pid: Pid,
}
pub struct ChildHandle {
pub tty: File,
}
pub struct WaitPid {
pid: Pid,
signal: Signal,
}
impl WaitPid {
pub fn new(pid: Pid) -> Self {
Self {
pid,
signal: signal(SignalKind::child()).unwrap(),
}
}
}
impl Future for WaitPid {
type Output = nix::Result<WaitStatus>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let _ = self.signal.poll_recv(cx);
match waitpid(self.pid, Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::StillAlive) => Poll::Pending,
result => Poll::Ready(result),
}
}
}
impl Child {
pub async fn handle(&self) -> io::Result<ChildHandle> {
Ok(ChildHandle {
tty: self.tty.try_clone().await?,
})
}
}
impl ChildHandle {
pub async fn resize_window(&mut self, winsize: Winsize) -> Result<()> {
let fd = self.tty.as_raw_fd();
asyncify(move || unsafe {
ioctl::tiocswinsz(fd, &winsize as *const Winsize)?;
Ok(())
})
.await
}
}
pub async fn spawn(
mut cmd: Command,
winsize: Option<Winsize>,
termios: Option<Termios>,
) -> Result<Child> {
asyncify(move || unsafe {
let res = forkpty(winsize.as_ref(), termios.as_ref())?;
match res.fork_result {
ForkResult::Parent { child } => Ok(Child {
pid: child,
tty: File::from_raw_fd(res.master),
}),
ForkResult::Child => {
cmd.exec();
abort();
}
}
})
.await
}
impl AsyncRead for Child {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.tty).poll_read(cx, buf)
}
}
impl AsyncWrite for Child {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.tty).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.tty).poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.tty).poll_shutdown(cx)
}
}
impl AsyncRead for ChildHandle {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.tty).poll_read(cx, buf)
}
}
impl AsyncWrite for ChildHandle {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.tty).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.tty).poll_flush(cx)
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.tty).poll_shutdown(cx)
}
}