feat(tvix/castore/blob/chunked_reader): add some more traces
Change-Id: I2408707a7bc0e1c0cd8bd2933f8d68805b9e12c9 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11444 Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
This commit is contained in:
parent
28e98af9bc
commit
9d9c731147
1 changed files with 8 additions and 2 deletions
|
|
@ -3,7 +3,7 @@ use pin_project_lite::pin_project;
|
||||||
use tokio::io::{AsyncRead, AsyncSeekExt};
|
use tokio::io::{AsyncRead, AsyncSeekExt};
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
use tokio_util::io::{ReaderStream, StreamReader};
|
use tokio_util::io::{ReaderStream, StreamReader};
|
||||||
use tracing::{instrument, warn};
|
use tracing::{instrument, trace, warn};
|
||||||
|
|
||||||
use crate::B3Digest;
|
use crate::B3Digest;
|
||||||
use std::{cmp::Ordering, pin::Pin};
|
use std::{cmp::Ordering, pin::Pin};
|
||||||
|
|
@ -114,6 +114,9 @@ where
|
||||||
|
|
||||||
// Update the position and the internal reader.
|
// Update the position and the internal reader.
|
||||||
*this.pos = absolute_offset;
|
*this.pos = absolute_offset;
|
||||||
|
|
||||||
|
// FUTUREWORK: if we can seek forward, avoid re-assembling.
|
||||||
|
// At least if it's still in the same chunk?
|
||||||
*this.r = this.chunked_blob.reader_skipped_offset(absolute_offset);
|
*this.r = this.chunked_blob.reader_skipped_offset(absolute_offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -174,6 +177,7 @@ where
|
||||||
|
|
||||||
/// For a given position pos, return the chunk containing the data.
|
/// For a given position pos, return the chunk containing the data.
|
||||||
/// In case this would range outside the blob, None is returned.
|
/// In case this would range outside the blob, None is returned.
|
||||||
|
#[instrument(level = "trace", skip(self), ret)]
|
||||||
fn get_chunk_idx_for_position(&self, pos: u64) -> Option<usize> {
|
fn get_chunk_idx_for_position(&self, pos: u64) -> Option<usize> {
|
||||||
// FUTUREWORK: benchmark when to use linear search, binary_search and BTreeSet
|
// FUTUREWORK: benchmark when to use linear search, binary_search and BTreeSet
|
||||||
self.chunks
|
self.chunks
|
||||||
|
|
@ -195,6 +199,7 @@ where
|
||||||
/// From the first relevant chunk, the irrelevant bytes are skipped too.
|
/// From the first relevant chunk, the irrelevant bytes are skipped too.
|
||||||
/// The returned boxed thing does not implement AsyncSeek on its own, but
|
/// The returned boxed thing does not implement AsyncSeek on its own, but
|
||||||
/// ChunkedReader does.
|
/// ChunkedReader does.
|
||||||
|
#[instrument(level = "trace", skip(self))]
|
||||||
fn reader_skipped_offset(&self, offset: u64) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
|
fn reader_skipped_offset(&self, offset: u64) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
|
||||||
if offset == self.blob_length() {
|
if offset == self.blob_length() {
|
||||||
return Box::new(std::io::Cursor::new(vec![]));
|
return Box::new(std::io::Cursor::new(vec![]));
|
||||||
|
|
@ -210,10 +215,11 @@ where
|
||||||
let blob_service = self.blob_service.clone();
|
let blob_service = self.blob_service.clone();
|
||||||
let chunks: Vec<_> = self.chunks[start_chunk_idx..].to_vec();
|
let chunks: Vec<_> = self.chunks[start_chunk_idx..].to_vec();
|
||||||
let readers_stream = tokio_stream::iter(chunks.into_iter().enumerate()).map(
|
let readers_stream = tokio_stream::iter(chunks.into_iter().enumerate()).map(
|
||||||
move |(nth_chunk, (_chunk_start_offset, _chunk_size, chunk_digest))| {
|
move |(nth_chunk, (_chunk_start_offset, chunk_size, chunk_digest))| {
|
||||||
let chunk_digest = chunk_digest.to_owned();
|
let chunk_digest = chunk_digest.to_owned();
|
||||||
let blob_service = blob_service.clone();
|
let blob_service = blob_service.clone();
|
||||||
async move {
|
async move {
|
||||||
|
trace!(chunk_size=%chunk_size, chunk_digest=%chunk_digest, "open_read on chunk in stream");
|
||||||
let mut blob_reader = blob_service
|
let mut blob_reader = blob_service
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.open_read(&chunk_digest.to_owned())
|
.open_read(&chunk_digest.to_owned())
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue