diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs index a99cbfb8e..ba2495390 100644 --- a/tvix/store/src/pathinfoservice/bigtable.rs +++ b/tvix/store/src/pathinfoservice/bigtable.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DurationSeconds}; use std::sync::Arc; use tonic::async_trait; -use tracing::{instrument, trace}; +use tracing::{instrument, trace, warn, Span}; use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; @@ -318,22 +318,22 @@ impl PathInfoService for BigtablePathInfoService { }; let stream = try_stream! { - // TODO: add pagination, we don't want to hold all of this in memory. - let response = client - .read_rows(request) + let mut rows = client + .stream_rows(request) .await - .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?; + .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?.enumerate(); + + use futures::stream::StreamExt; + + while let Some((i, elem)) = rows.next().await { + let (row_key, mut cells) = elem.map_err(|e| Error::StorageError(format!("unable to stream row {}: {}", i, e)))?; + let span = Span::current(); + span.record("row.key", bstr::BStr::new(&row_key).to_string()); - for (row_key, mut cells) in response { let cell = cells .pop() .ok_or_else(|| Error::StorageError("found no cells".into()))?; - // The cell must have the same qualifier as the row key - if row_key != cell.qualifier { - Err(Error::StorageError("unexpected cell qualifier".into()))?; - } - // Ensure there's only one cell (so no more left after the pop()) // This shouldn't happen, We filter out other cells in our query. if !cells.is_empty() { @@ -343,6 +343,12 @@ impl PathInfoService for BigtablePathInfoService { ))? } + // The cell must have the same qualifier as the row key + if row_key != cell.qualifier { + warn!("unexpected cell qualifier"); + Err(Error::StorageError("unexpected cell qualifier".into()))?; + } + // Try to parse the value into a PathInfo message. let path_info_proto = proto::PathInfo::decode(Bytes::from(cell.value)) .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;