Change-Id: Ifadd190e10ec22570ab3ccb4df54f64ae5ef0a44 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12674 Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de>
266 lines
9.4 KiB
Rust
266 lines
9.4 KiB
Rust
use anyhow::Context;
|
|
use bstr::BStr;
|
|
use oci_spec::runtime::{LinuxIdMapping, LinuxIdMappingBuilder};
|
|
use tokio::process::{Child, Command};
|
|
use tonic::async_trait;
|
|
use tracing::{debug, instrument, warn, Span};
|
|
use tvix_castore::{
|
|
blobservice::BlobService,
|
|
directoryservice::DirectoryService,
|
|
fs::fuse::FuseDaemon,
|
|
import::fs::ingest_path,
|
|
refscan::{ReferencePattern, ReferenceScanner},
|
|
};
|
|
use uuid::Uuid;
|
|
|
|
use crate::buildservice::BuildRequest;
|
|
use crate::{
|
|
oci::{get_host_output_paths, make_bundle, make_spec},
|
|
proto::{self, build::OutputNeedles},
|
|
};
|
|
use std::{ffi::OsStr, path::PathBuf, process::Stdio};
|
|
|
|
use super::BuildService;
|
|
|
|
const SANDBOX_SHELL: &str = env!("TVIX_BUILD_SANDBOX_SHELL");
|
|
const MAX_CONCURRENT_BUILDS: usize = 2; // TODO: make configurable
|
|
|
|
pub struct OCIBuildService<BS, DS> {
|
|
/// Root path in which all bundles are created in
|
|
bundle_root: PathBuf,
|
|
|
|
/// uid mappings to set up for the workloads
|
|
uid_mappings: Vec<LinuxIdMapping>,
|
|
/// uid mappings to set up for the workloads
|
|
gid_mappings: Vec<LinuxIdMapping>,
|
|
|
|
/// Handle to a [BlobService], used by filesystems spawned during builds.
|
|
blob_service: BS,
|
|
/// Handle to a [DirectoryService], used by filesystems spawned during builds.
|
|
directory_service: DS,
|
|
|
|
// semaphore to track number of concurrently running builds.
|
|
// this is necessary, as otherwise we very quickly run out of open file handles.
|
|
concurrent_builds: tokio::sync::Semaphore,
|
|
}
|
|
|
|
impl<BS, DS> OCIBuildService<BS, DS> {
|
|
pub fn new(bundle_root: PathBuf, blob_service: BS, directory_service: DS) -> Self {
|
|
// We map root inside the container to the uid/gid this is running at,
|
|
// and allocate one for uid 1000 into the container from the range we
|
|
// got in /etc/sub{u,g}id.
|
|
// TODO: actually read uid, and /etc/subuid. Maybe only when we try to build?
|
|
// FUTUREWORK: use different uids?
|
|
Self {
|
|
bundle_root,
|
|
blob_service,
|
|
directory_service,
|
|
uid_mappings: vec![
|
|
LinuxIdMappingBuilder::default()
|
|
.host_id(1000_u32)
|
|
.container_id(0_u32)
|
|
.size(1_u32)
|
|
.build()
|
|
.unwrap(),
|
|
LinuxIdMappingBuilder::default()
|
|
.host_id(100000_u32)
|
|
.container_id(1000_u32)
|
|
.size(1_u32)
|
|
.build()
|
|
.unwrap(),
|
|
],
|
|
gid_mappings: vec![
|
|
LinuxIdMappingBuilder::default()
|
|
.host_id(100_u32)
|
|
.container_id(0_u32)
|
|
.size(1_u32)
|
|
.build()
|
|
.unwrap(),
|
|
LinuxIdMappingBuilder::default()
|
|
.host_id(100000_u32)
|
|
.container_id(100_u32)
|
|
.size(1_u32)
|
|
.build()
|
|
.unwrap(),
|
|
],
|
|
concurrent_builds: tokio::sync::Semaphore::new(MAX_CONCURRENT_BUILDS),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl<BS, DS> BuildService for OCIBuildService<BS, DS>
|
|
where
|
|
BS: BlobService + Clone + 'static,
|
|
DS: DirectoryService + Clone + 'static,
|
|
{
|
|
#[instrument(skip_all, err)]
|
|
async fn do_build(&self, request: BuildRequest) -> std::io::Result<proto::Build> {
|
|
let _permit = self.concurrent_builds.acquire().await.unwrap();
|
|
|
|
let bundle_name = Uuid::new_v4();
|
|
let bundle_path = self.bundle_root.join(bundle_name.to_string());
|
|
|
|
let span = Span::current();
|
|
span.record("bundle_name", bundle_name.to_string());
|
|
|
|
let mut runtime_spec = make_spec(&request, true, SANDBOX_SHELL)
|
|
.context("failed to create spec")
|
|
.map_err(std::io::Error::other)?;
|
|
|
|
let mut linux = runtime_spec.linux().clone().unwrap();
|
|
|
|
// edit the spec, we need to setup uid/gid mappings.
|
|
linux.set_uid_mappings(Some(self.uid_mappings.clone()));
|
|
linux.set_gid_mappings(Some(self.gid_mappings.clone()));
|
|
|
|
runtime_spec.set_linux(Some(linux));
|
|
|
|
make_bundle(&request, &runtime_spec, &bundle_path)
|
|
.context("failed to produce bundle")
|
|
.map_err(std::io::Error::other)?;
|
|
|
|
// pre-calculate the locations we want to later ingest, in the order of
|
|
// the original outputs.
|
|
// If we can't find calculate that path, don't start the build in first place.
|
|
let host_output_paths = get_host_output_paths(&request, &bundle_path)
|
|
.context("failed to calculate host output paths")
|
|
.map_err(std::io::Error::other)?;
|
|
|
|
// assemble a BTreeMap of Nodes to pass into TvixStoreFs.
|
|
let patterns = ReferencePattern::new(request.refscan_needles.clone());
|
|
// NOTE: impl Drop for FuseDaemon unmounts, so if the call is cancelled, umount.
|
|
let _fuse_daemon = tokio::task::spawn_blocking({
|
|
let blob_service = self.blob_service.clone();
|
|
let directory_service = self.directory_service.clone();
|
|
|
|
let dest = bundle_path.join("inputs");
|
|
|
|
let root_nodes = Box::new(request.inputs.clone());
|
|
move || {
|
|
let fs = tvix_castore::fs::TvixStoreFs::new(
|
|
blob_service,
|
|
directory_service,
|
|
root_nodes,
|
|
true,
|
|
false,
|
|
);
|
|
// mount the filesystem and wait for it to be unmounted.
|
|
// FUTUREWORK: make fuse daemon threads configurable?
|
|
FuseDaemon::new(fs, dest, 4, true).context("failed to start fuse daemon")
|
|
}
|
|
})
|
|
.await?
|
|
.context("mounting")
|
|
.map_err(std::io::Error::other)?;
|
|
|
|
debug!(bundle.path=?bundle_path, bundle.name=%bundle_name, "about to spawn bundle");
|
|
|
|
// start the bundle as another process.
|
|
let child = spawn_bundle(bundle_path, &bundle_name.to_string())?;
|
|
|
|
// wait for the process to exit
|
|
// FUTUREWORK: change the trait to allow reporting progress / logs…
|
|
let child_output = child
|
|
.wait_with_output()
|
|
.await
|
|
.context("failed to run process")
|
|
.map_err(std::io::Error::other)?;
|
|
|
|
// Check the exit code
|
|
if !child_output.status.success() {
|
|
let stdout = BStr::new(&child_output.stdout);
|
|
let stderr = BStr::new(&child_output.stderr);
|
|
|
|
warn!(stdout=%stdout, stderr=%stderr, exit_code=%child_output.status, "build failed");
|
|
|
|
return Err(std::io::Error::new(
|
|
std::io::ErrorKind::Other,
|
|
"nonzero exit code".to_string(),
|
|
));
|
|
}
|
|
|
|
// Ingest build outputs into the castore.
|
|
// We use try_join_all here. No need to spawn new tasks, as this is
|
|
// mostly IO bound.
|
|
let (outputs, outputs_needles) = futures::future::try_join_all(
|
|
host_output_paths.into_iter().enumerate().map(|(i, p)| {
|
|
let output_path = request.outputs[i].clone();
|
|
let patterns = patterns.clone();
|
|
async move {
|
|
debug!(host.path=?p, output.path=?output_path, "ingesting path");
|
|
|
|
let scanner = ReferenceScanner::new(patterns);
|
|
let output_node = ingest_path(
|
|
self.blob_service.clone(),
|
|
&self.directory_service,
|
|
p,
|
|
Some(&scanner),
|
|
)
|
|
.await
|
|
.map_err(|e| {
|
|
std::io::Error::new(
|
|
std::io::ErrorKind::InvalidData,
|
|
format!("Unable to ingest output: {}", e),
|
|
)
|
|
})?;
|
|
|
|
let needles = OutputNeedles {
|
|
needles: scanner
|
|
.matches()
|
|
.into_iter()
|
|
.enumerate()
|
|
.filter(|(_, val)| *val)
|
|
.map(|(idx, _)| idx as u64)
|
|
.collect(),
|
|
};
|
|
|
|
Ok::<_, std::io::Error>((
|
|
tvix_castore::proto::Node::from_name_and_node(
|
|
output_path
|
|
.file_name()
|
|
.and_then(|s| s.to_str())
|
|
.map(|s| s.to_string())
|
|
.unwrap_or("".into())
|
|
.into(),
|
|
output_node,
|
|
),
|
|
needles,
|
|
))
|
|
}
|
|
}),
|
|
)
|
|
.await?
|
|
.into_iter()
|
|
.unzip();
|
|
|
|
Ok(proto::Build {
|
|
build_request: Some(request.into()),
|
|
outputs,
|
|
outputs_needles,
|
|
})
|
|
}
|
|
}
|
|
|
|
/// Spawns runc with the bundle at bundle_path.
|
|
/// On success, returns the child.
|
|
#[instrument(err)]
|
|
fn spawn_bundle(
|
|
bundle_path: impl AsRef<OsStr> + std::fmt::Debug,
|
|
bundle_name: &str,
|
|
) -> std::io::Result<Child> {
|
|
let mut command = Command::new("runc");
|
|
|
|
command
|
|
.args(&[
|
|
"run".into(),
|
|
"--bundle".into(),
|
|
bundle_path.as_ref().to_os_string(),
|
|
bundle_name.into(),
|
|
])
|
|
.stderr(Stdio::piped())
|
|
.stdout(Stdio::piped())
|
|
.stdin(Stdio::null());
|
|
|
|
command.spawn()
|
|
}
|