feat(tvix): add instance_name to instrumentation of *Services

Currently it is not possible to distinguish between tracing of the same
*Service type whenever there are multiple of them. Now the instance_name
of ServiceBuilder is passed into the *Service and used in the existing
instrument as the `instance_name` field.

Places that did not already have a instance_name in its context use
`"default"`. In tests I used `"test"`.

Change-Id: Ia20bf2a7bb849a781e370d087ba7ddb3be79f654
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12739
Tested-by: BuildkiteCI
Autosubmit: Bob van der Linden <bobvanderlinden@gmail.com>
Reviewed-by: flokli <flokli@flokli.de>
This commit is contained in:
Bob van der Linden 2024-11-06 23:13:33 +01:00 committed by clbot
parent 951d25676b
commit cfa4154131
23 changed files with 270 additions and 137 deletions

View file

@ -16,6 +16,7 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
/// blobservice again, before falling back to the remote one.
/// The remote BlobService is never written to.
pub struct CombinedBlobService<BL, BR> {
instance_name: String,
local: BL,
remote: BR,
}
@ -27,6 +28,7 @@ where
{
fn clone(&self) -> Self {
Self {
instance_name: self.instance_name.clone(),
local: self.local.clone(),
remote: self.remote.clone(),
}
@ -39,12 +41,12 @@ where
BL: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
BR: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
{
#[instrument(skip(self, digest), fields(blob.digest=%digest))]
#[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))]
async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> {
Ok(self.local.as_ref().has(digest).await? || self.remote.as_ref().has(digest).await?)
}
#[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
#[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)]
async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> {
if self.local.as_ref().has(digest).await? {
// local store has the blob, so we can assume it also has all chunks.
@ -84,7 +86,7 @@ where
}
}
#[instrument(skip_all)]
#[instrument(skip_all, fields(instance_name=%self.instance_name))]
async fn open_write(&self) -> Box<dyn BlobWriter> {
// direct writes to the local one.
self.local.as_ref().open_write().await
@ -113,7 +115,7 @@ impl ServiceBuilder for CombinedBlobServiceConfig {
type Output = dyn BlobService;
async fn build<'a>(
&'a self,
_instance_name: &str,
instance_name: &str,
context: &CompositionContext,
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> {
let (local, remote) = futures::join!(
@ -121,6 +123,7 @@ impl ServiceBuilder for CombinedBlobServiceConfig {
context.resolve(self.remote.clone())
);
Ok(Arc::new(CombinedBlobService {
instance_name: instance_name.to_string(),
local: local?,
remote: remote?,
}))

View file

@ -24,6 +24,7 @@ use tracing::{instrument, Instrument as _};
/// Connects to a (remote) tvix-store BlobService over gRPC.
#[derive(Clone)]
pub struct GRPCBlobService<T> {
instance_name: String,
/// The internal reference to a gRPC client.
/// Cloning it is cheap, and it internally handles concurrent requests.
grpc_client: proto::blob_service_client::BlobServiceClient<T>,
@ -31,8 +32,14 @@ pub struct GRPCBlobService<T> {
impl<T> GRPCBlobService<T> {
/// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient].
pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self {
Self { grpc_client }
pub fn from_client(
instance_name: String,
grpc_client: proto::blob_service_client::BlobServiceClient<T>,
) -> Self {
Self {
instance_name,
grpc_client,
}
}
}
@ -44,7 +51,7 @@ where
<T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
T::Future: Send,
{
#[instrument(skip(self, digest), fields(blob.digest=%digest))]
#[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))]
async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
match self
.grpc_client
@ -61,7 +68,7 @@ where
}
}
#[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
#[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)]
async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
// First try to get a list of chunks. In case there's only one chunk returned,
// buffer its data into a Vec, otherwise use a ChunkedReader.
@ -124,7 +131,7 @@ where
/// Returns a BlobWriter, that'll internally wrap each write in a
/// [proto::BlobChunk], which is send to the gRPC server.
#[instrument(skip_all)]
#[instrument(skip_all, fields(instance_name=%self.instance_name))]
async fn open_write(&self) -> Box<dyn BlobWriter> {
// set up an mpsc channel passing around Bytes.
let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10);
@ -154,7 +161,7 @@ where
})
}
#[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
#[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)]
async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
let resp = self
.grpc_client
@ -205,13 +212,16 @@ impl ServiceBuilder for GRPCBlobServiceConfig {
type Output = dyn BlobService;
async fn build<'a>(
&'a self,
_instance_name: &str,
instance_name: &str,
_context: &CompositionContext,
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let client = proto::blob_service_client::BlobServiceClient::new(
crate::tonic::channel_from_url(&self.url.parse()?).await?,
);
Ok(Arc::new(GRPCBlobService::from_client(client)))
Ok(Arc::new(GRPCBlobService::from_client(
instance_name.to_string(),
client,
)))
}
}
@ -375,7 +385,7 @@ mod tests {
.await
.expect("must succeed"),
);
GRPCBlobService::from_client(client)
GRPCBlobService::from_client("default".into(), client)
};
let has = grpc_client

View file

@ -11,18 +11,19 @@ use crate::{B3Digest, Error};
#[derive(Clone, Default)]
pub struct MemoryBlobService {
instance_name: String,
db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,
}
#[async_trait]
impl BlobService for MemoryBlobService {
#[instrument(skip_all, ret, err, fields(blob.digest=%digest))]
#[instrument(skip_all, ret, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
let db = self.db.read();
Ok(db.contains_key(digest))
}
#[instrument(skip_all, err, fields(blob.digest=%digest))]
#[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
let db = self.db.read();
@ -32,7 +33,7 @@ impl BlobService for MemoryBlobService {
}
}
#[instrument(skip_all)]
#[instrument(skip_all, fields(instance_name=%self.instance_name))]
async fn open_write(&self) -> Box<dyn BlobWriter> {
Box::new(MemoryBlobWriter::new(self.db.clone()))
}
@ -58,10 +59,13 @@ impl ServiceBuilder for MemoryBlobServiceConfig {
type Output = dyn BlobService;
async fn build<'a>(
&'a self,
_instance_name: &str,
instance_name: &str,
_context: &CompositionContext,
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
Ok(Arc::new(MemoryBlobService::default()))
Ok(Arc::new(MemoryBlobService {
instance_name: instance_name.to_string(),
db: Default::default(),
}))
}
}

View file

@ -64,6 +64,7 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
/// all keys stored so far, but no promises ;-)
#[derive(Clone)]
pub struct ObjectStoreBlobService {
instance_name: String,
object_store: Arc<dyn ObjectStore>,
base_path: Path,
@ -92,7 +93,7 @@ fn derive_chunk_path(base_path: &Path, digest: &B3Digest) -> Path {
#[async_trait]
impl BlobService for ObjectStoreBlobService {
#[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest))]
#[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
// TODO: clarify if this should work for chunks or not, and explicitly
// document in the proto docs.
@ -112,7 +113,7 @@ impl BlobService for ObjectStoreBlobService {
}
}
#[instrument(skip_all, err, fields(blob.digest=%digest))]
#[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
// handle reading the empty blob.
if digest.as_slice() == blake3::hash(b"").as_bytes() {
@ -169,7 +170,7 @@ impl BlobService for ObjectStoreBlobService {
}
}
#[instrument(skip_all)]
#[instrument(skip_all, fields(instance_name=%self.instance_name))]
async fn open_write(&self) -> Box<dyn BlobWriter> {
// ObjectStoreBlobWriter implements AsyncWrite, but all the chunking
// needs an AsyncRead, so we create a pipe here.
@ -192,7 +193,7 @@ impl BlobService for ObjectStoreBlobService {
})
}
#[instrument(skip_all, err, fields(blob.digest=%digest))]
#[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
match self
.object_store
@ -294,7 +295,7 @@ impl ServiceBuilder for ObjectStoreBlobServiceConfig {
type Output = dyn BlobService;
async fn build<'a>(
&'a self,
_instance_name: &str,
instance_name: &str,
_context: &CompositionContext,
) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
let (object_store, path) = object_store::parse_url_opts(
@ -302,6 +303,7 @@ impl ServiceBuilder for ObjectStoreBlobServiceConfig {
&self.object_store_options,
)?;
Ok(Arc::new(ObjectStoreBlobService {
instance_name: instance_name.to_string(),
object_store: Arc::new(object_store),
base_path: path,
avg_chunk_size: self.avg_chunk_size,
@ -582,6 +584,7 @@ mod test {
object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap();
let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store);
let blobsvc = Arc::new(ObjectStoreBlobService {
instance_name: "test".into(),
object_store: object_store.clone(),
avg_chunk_size: default_avg_chunk_size(),
base_path,

View file

@ -29,14 +29,17 @@ pub async fn make_grpc_blob_service_client() -> Box<dyn BlobService> {
// Create a client, connecting to the right side. The URI is unused.
let mut maybe_right = Some(right);
Box::new(GRPCBlobService::from_client(BlobServiceClient::new(
Endpoint::try_from("http://[::]:50051")
.unwrap()
.connect_with_connector(tower::service_fn(move |_: Uri| {
let right = maybe_right.take().unwrap();
async move { Ok::<_, std::io::Error>(TokioIo::new(right)) }
}))
.await
.unwrap(),
)))
Box::new(GRPCBlobService::from_client(
"default".into(),
BlobServiceClient::new(
Endpoint::try_from("http://[::]:50051")
.unwrap()
.connect_with_connector(tower::service_fn(move |_: Uri| {
let right = maybe_right.take().unwrap();
async move { Ok::<_, std::io::Error>(TokioIo::new(right)) }
}))
.await
.unwrap(),
),
))
}