This pushes generating spans with pb_show up to the caller. They usually have more context on how to present things, if at all. Change-Id: Icfcaa64a8a57dce50c0261f2d06e7c051e3946c2 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12657 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
203 lines
6.1 KiB
Rust
203 lines
6.1 KiB
Rust
use crate::utils::AsyncIoBridge;
|
|
|
|
use super::{NarCalculationService, RenderError};
|
|
use count_write::CountWrite;
|
|
use nix_compat::nar::writer::r#async as nar_writer;
|
|
use sha2::{Digest, Sha256};
|
|
use tokio::io::{self, AsyncWrite, BufReader};
|
|
use tonic::async_trait;
|
|
use tracing::instrument;
|
|
use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Node};
|
|
|
|
pub struct SimpleRenderer<BS, DS> {
|
|
blob_service: BS,
|
|
directory_service: DS,
|
|
}
|
|
|
|
impl<BS, DS> SimpleRenderer<BS, DS> {
|
|
pub fn new(blob_service: BS, directory_service: DS) -> Self {
|
|
Self {
|
|
blob_service,
|
|
directory_service,
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl<BS, DS> NarCalculationService for SimpleRenderer<BS, DS>
|
|
where
|
|
BS: BlobService + Clone,
|
|
DS: DirectoryService + Clone,
|
|
{
|
|
async fn calculate_nar(
|
|
&self,
|
|
root_node: &Node,
|
|
) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
|
|
calculate_size_and_sha256(
|
|
root_node,
|
|
self.blob_service.clone(),
|
|
self.directory_service.clone(),
|
|
)
|
|
.await
|
|
.map_err(|e| tvix_castore::Error::StorageError(format!("failed rendering nar: {}", e)))
|
|
}
|
|
}
|
|
|
|
/// Invoke [write_nar], and return the size and sha256 digest of the produced
|
|
/// NAR output.
|
|
#[instrument(skip_all)]
|
|
pub async fn calculate_size_and_sha256<BS, DS>(
|
|
root_node: &Node,
|
|
blob_service: BS,
|
|
directory_service: DS,
|
|
) -> Result<(u64, [u8; 32]), RenderError>
|
|
where
|
|
BS: BlobService + Send,
|
|
DS: DirectoryService + Send,
|
|
{
|
|
let mut h = Sha256::new();
|
|
let mut cw = CountWrite::from(&mut h);
|
|
|
|
write_nar(
|
|
// The hasher doesn't speak async. It doesn't
|
|
// actually do any I/O, so it's fine to wrap.
|
|
AsyncIoBridge(&mut cw),
|
|
root_node,
|
|
blob_service,
|
|
directory_service,
|
|
)
|
|
.await?;
|
|
|
|
Ok((cw.count(), h.finalize().into()))
|
|
}
|
|
|
|
/// Accepts a [Node] pointing to the root of a (store) path,
|
|
/// and uses the passed blob_service and directory_service to perform the
|
|
/// necessary lookups as it traverses the structure.
|
|
/// The contents in NAR serialization are writen to the passed [AsyncWrite].
|
|
pub async fn write_nar<W, BS, DS>(
|
|
mut w: W,
|
|
root_node: &Node,
|
|
blob_service: BS,
|
|
directory_service: DS,
|
|
) -> Result<(), RenderError>
|
|
where
|
|
W: AsyncWrite + Unpin + Send,
|
|
BS: BlobService + Send,
|
|
DS: DirectoryService + Send,
|
|
{
|
|
// Initialize NAR writer
|
|
let nar_root_node = nar_writer::open(&mut w)
|
|
.await
|
|
.map_err(RenderError::NARWriterError)?;
|
|
|
|
walk_node(
|
|
nar_root_node,
|
|
root_node,
|
|
b"",
|
|
blob_service,
|
|
directory_service,
|
|
)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Process an intermediate node in the structure.
|
|
/// This consumes the node.
|
|
async fn walk_node<BS, DS>(
|
|
nar_node: nar_writer::Node<'_, '_>,
|
|
castore_node: &Node,
|
|
name: &[u8],
|
|
blob_service: BS,
|
|
directory_service: DS,
|
|
) -> Result<(BS, DS), RenderError>
|
|
where
|
|
BS: BlobService + Send,
|
|
DS: DirectoryService + Send,
|
|
{
|
|
match castore_node {
|
|
Node::Symlink { target, .. } => {
|
|
nar_node
|
|
.symlink(target.as_ref())
|
|
.await
|
|
.map_err(RenderError::NARWriterError)?;
|
|
}
|
|
Node::File {
|
|
digest,
|
|
size,
|
|
executable,
|
|
} => {
|
|
let mut blob_reader = match blob_service
|
|
.open_read(digest)
|
|
.await
|
|
.map_err(RenderError::StoreError)?
|
|
{
|
|
Some(blob_reader) => Ok(BufReader::new(blob_reader)),
|
|
None => Err(RenderError::NARWriterError(io::Error::new(
|
|
io::ErrorKind::NotFound,
|
|
format!("blob with digest {} not found", &digest),
|
|
))),
|
|
}?;
|
|
|
|
nar_node
|
|
.file(*executable, *size, &mut blob_reader)
|
|
.await
|
|
.map_err(RenderError::NARWriterError)?;
|
|
}
|
|
Node::Directory { digest, .. } => {
|
|
// look it up with the directory service
|
|
match directory_service
|
|
.get(digest)
|
|
.await
|
|
.map_err(|e| RenderError::StoreError(e.into()))?
|
|
{
|
|
// if it's None, that's an error!
|
|
None => Err(RenderError::DirectoryNotFound(
|
|
digest.clone(),
|
|
bytes::Bytes::copy_from_slice(name),
|
|
))?,
|
|
Some(directory) => {
|
|
// start a directory node
|
|
let mut nar_node_directory = nar_node
|
|
.directory()
|
|
.await
|
|
.map_err(RenderError::NARWriterError)?;
|
|
|
|
// We put blob_service, directory_service back here whenever we come up from
|
|
// the recursion.
|
|
let mut blob_service = blob_service;
|
|
let mut directory_service = directory_service;
|
|
|
|
// for each node in the directory, create a new entry with its name,
|
|
// and then recurse on that entry.
|
|
for (name, node) in directory.nodes() {
|
|
let child_node = nar_node_directory
|
|
.entry(name.as_ref())
|
|
.await
|
|
.map_err(RenderError::NARWriterError)?;
|
|
|
|
(blob_service, directory_service) = Box::pin(walk_node(
|
|
child_node,
|
|
node,
|
|
name.as_ref(),
|
|
blob_service,
|
|
directory_service,
|
|
))
|
|
.await?;
|
|
}
|
|
|
|
// close the directory
|
|
nar_node_directory
|
|
.close()
|
|
.await
|
|
.map_err(RenderError::NARWriterError)?;
|
|
|
|
return Ok((blob_service, directory_service));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok((blob_service, directory_service))
|
|
}
|