refactor(tvix/store/nar/renderer): don't require Arc, Clone or Sync
To render NARs, we're fine with a simple AsRef to a BlobService and DirectoryService. We just need to have the function pass back the references, so we can reuse it after the recursion. Change-Id: I8a1b899134ddda26cf14aa829a08383986101850 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10605 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: raitobezarius <tvl@lahfa.xyz> Tested-by: BuildkiteCI
This commit is contained in:
parent
9cdb5e17a4
commit
7d51193f7d
1 changed files with 45 additions and 28 deletions
|
|
@ -5,7 +5,6 @@ use nix_compat::nar::writer::r#async as nar_writer;
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
use std::{
|
use std::{
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
sync::Arc,
|
|
||||||
task::{self, Poll},
|
task::{self, Poll},
|
||||||
};
|
};
|
||||||
use tokio::io::{self, AsyncWrite, BufReader};
|
use tokio::io::{self, AsyncWrite, BufReader};
|
||||||
|
|
@ -18,11 +17,15 @@ use tvix_castore::{
|
||||||
|
|
||||||
/// Invoke [write_nar], and return the size and sha256 digest of the produced
|
/// Invoke [write_nar], and return the size and sha256 digest of the produced
|
||||||
/// NAR output.
|
/// NAR output.
|
||||||
pub async fn calculate_size_and_sha256(
|
pub async fn calculate_size_and_sha256<BS, DS>(
|
||||||
root_node: &castorepb::node::Node,
|
root_node: &castorepb::node::Node,
|
||||||
blob_service: Arc<dyn BlobService>,
|
blob_service: BS,
|
||||||
directory_service: Arc<dyn DirectoryService>,
|
directory_service: DS,
|
||||||
) -> Result<(u64, [u8; 32]), RenderError> {
|
) -> Result<(u64, [u8; 32]), RenderError>
|
||||||
|
where
|
||||||
|
BS: AsRef<dyn BlobService> + Send,
|
||||||
|
DS: AsRef<dyn DirectoryService> + Send,
|
||||||
|
{
|
||||||
let mut h = Sha256::new();
|
let mut h = Sha256::new();
|
||||||
let mut cw = CountWrite::from(&mut h);
|
let mut cw = CountWrite::from(&mut h);
|
||||||
|
|
||||||
|
|
@ -68,12 +71,17 @@ impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> {
|
||||||
/// and uses the passed blob_service and directory_service to perform the
|
/// and uses the passed blob_service and directory_service to perform the
|
||||||
/// necessary lookups as it traverses the structure.
|
/// necessary lookups as it traverses the structure.
|
||||||
/// The contents in NAR serialization are writen to the passed [AsyncWrite].
|
/// The contents in NAR serialization are writen to the passed [AsyncWrite].
|
||||||
pub async fn write_nar<W: AsyncWrite + Unpin + Send>(
|
pub async fn write_nar<W, BS, DS>(
|
||||||
w: W,
|
w: W,
|
||||||
proto_root_node: &castorepb::node::Node,
|
proto_root_node: &castorepb::node::Node,
|
||||||
blob_service: Arc<dyn BlobService>,
|
blob_service: BS,
|
||||||
directory_service: Arc<dyn DirectoryService>,
|
directory_service: DS,
|
||||||
) -> Result<(), RenderError> {
|
) -> Result<(), RenderError>
|
||||||
|
where
|
||||||
|
W: AsyncWrite + Unpin + Send,
|
||||||
|
BS: AsRef<dyn BlobService> + Send,
|
||||||
|
DS: AsRef<dyn DirectoryService> + Send,
|
||||||
|
{
|
||||||
// Initialize NAR writer
|
// Initialize NAR writer
|
||||||
let mut w = w.compat_write();
|
let mut w = w.compat_write();
|
||||||
let nar_root_node = nar_writer::open(&mut w)
|
let nar_root_node = nar_writer::open(&mut w)
|
||||||
|
|
@ -94,12 +102,16 @@ pub async fn write_nar<W: AsyncWrite + Unpin + Send>(
|
||||||
/// Process an intermediate node in the structure.
|
/// Process an intermediate node in the structure.
|
||||||
/// This consumes the node.
|
/// This consumes the node.
|
||||||
#[async_recursion]
|
#[async_recursion]
|
||||||
async fn walk_node(
|
async fn walk_node<BS, DS>(
|
||||||
nar_node: nar_writer::Node<'async_recursion, '_>,
|
nar_node: nar_writer::Node<'async_recursion, '_>,
|
||||||
proto_node: &castorepb::node::Node,
|
proto_node: &castorepb::node::Node,
|
||||||
blob_service: Arc<dyn BlobService>,
|
blob_service: BS,
|
||||||
directory_service: Arc<dyn DirectoryService>,
|
directory_service: DS,
|
||||||
) -> Result<(), RenderError> {
|
) -> Result<(BS, DS), RenderError>
|
||||||
|
where
|
||||||
|
BS: AsRef<dyn BlobService> + Send,
|
||||||
|
DS: AsRef<dyn DirectoryService> + Send,
|
||||||
|
{
|
||||||
match proto_node {
|
match proto_node {
|
||||||
castorepb::node::Node::Symlink(proto_symlink_node) => {
|
castorepb::node::Node::Symlink(proto_symlink_node) => {
|
||||||
nar_node
|
nar_node
|
||||||
|
|
@ -117,6 +129,7 @@ async fn walk_node(
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
let blob_reader = match blob_service
|
let blob_reader = match blob_service
|
||||||
|
.as_ref()
|
||||||
.open_read(&digest)
|
.open_read(&digest)
|
||||||
.await
|
.await
|
||||||
.map_err(RenderError::StoreError)?
|
.map_err(RenderError::StoreError)?
|
||||||
|
|
@ -152,17 +165,16 @@ async fn walk_node(
|
||||||
|
|
||||||
// look it up with the directory service
|
// look it up with the directory service
|
||||||
match directory_service
|
match directory_service
|
||||||
|
.as_ref()
|
||||||
.get(&digest)
|
.get(&digest)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| RenderError::StoreError(e.into()))?
|
.map_err(|e| RenderError::StoreError(e.into()))?
|
||||||
{
|
{
|
||||||
// if it's None, that's an error!
|
// if it's None, that's an error!
|
||||||
None => {
|
None => Err(RenderError::DirectoryNotFound(
|
||||||
return Err(RenderError::DirectoryNotFound(
|
|
||||||
digest,
|
digest,
|
||||||
proto_directory_node.name.clone(),
|
proto_directory_node.name.clone(),
|
||||||
))
|
))?,
|
||||||
}
|
|
||||||
Some(proto_directory) => {
|
Some(proto_directory) => {
|
||||||
// start a directory node
|
// start a directory node
|
||||||
let mut nar_node_directory = nar_node
|
let mut nar_node_directory = nar_node
|
||||||
|
|
@ -170,19 +182,21 @@ async fn walk_node(
|
||||||
.await
|
.await
|
||||||
.map_err(RenderError::NARWriterError)?;
|
.map_err(RenderError::NARWriterError)?;
|
||||||
|
|
||||||
|
// We put blob_service, directory_service back here whenever we come up from
|
||||||
|
// the recursion.
|
||||||
|
let mut blob_service = blob_service;
|
||||||
|
let mut directory_service = directory_service;
|
||||||
|
|
||||||
// for each node in the directory, create a new entry with its name,
|
// for each node in the directory, create a new entry with its name,
|
||||||
// and then invoke walk_node on that entry.
|
// and then recurse on that entry.
|
||||||
for proto_node in proto_directory.nodes() {
|
for proto_node in proto_directory.nodes() {
|
||||||
let child_node = nar_node_directory
|
let child_node = nar_node_directory
|
||||||
.entry(proto_node.get_name())
|
.entry(proto_node.get_name())
|
||||||
.await
|
.await
|
||||||
.map_err(RenderError::NARWriterError)?;
|
.map_err(RenderError::NARWriterError)?;
|
||||||
walk_node(
|
|
||||||
child_node,
|
(blob_service, directory_service) =
|
||||||
&proto_node,
|
walk_node(child_node, &proto_node, blob_service, directory_service)
|
||||||
blob_service.clone(),
|
|
||||||
directory_service.clone(),
|
|
||||||
)
|
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -191,9 +205,12 @@ async fn walk_node(
|
||||||
.close()
|
.close()
|
||||||
.await
|
.await
|
||||||
.map_err(RenderError::NARWriterError)?;
|
.map_err(RenderError::NARWriterError)?;
|
||||||
|
|
||||||
|
return Ok((blob_service, directory_service));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
|
||||||
|
Ok((blob_service, directory_service))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue