feat(tvix/store/pathinfoservice/bigtable): stream rows
Instead of collecting all rows into a Vec, use the stream_rows function, and massage Ok values into the right shape. This avoids collecting everything into memory. Change-Id: I2f03434c7e1e25ee8395c69685b16100aec36f89 Reviewed-on: https://cl.tvl.fyi/c/depot/+/13163 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
parent
299d957df7
commit
22d68b3100
1 changed files with 17 additions and 11 deletions
|
|
@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
|
||||||
use serde_with::{serde_as, DurationSeconds};
|
use serde_with::{serde_as, DurationSeconds};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tonic::async_trait;
|
use tonic::async_trait;
|
||||||
use tracing::{instrument, trace};
|
use tracing::{instrument, trace, warn, Span};
|
||||||
use tvix_castore::composition::{CompositionContext, ServiceBuilder};
|
use tvix_castore::composition::{CompositionContext, ServiceBuilder};
|
||||||
use tvix_castore::Error;
|
use tvix_castore::Error;
|
||||||
|
|
||||||
|
|
@ -318,22 +318,22 @@ impl PathInfoService for BigtablePathInfoService {
|
||||||
};
|
};
|
||||||
|
|
||||||
let stream = try_stream! {
|
let stream = try_stream! {
|
||||||
// TODO: add pagination, we don't want to hold all of this in memory.
|
let mut rows = client
|
||||||
let response = client
|
.stream_rows(request)
|
||||||
.read_rows(request)
|
|
||||||
.await
|
.await
|
||||||
.map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?;
|
.map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?.enumerate();
|
||||||
|
|
||||||
|
use futures::stream::StreamExt;
|
||||||
|
|
||||||
|
while let Some((i, elem)) = rows.next().await {
|
||||||
|
let (row_key, mut cells) = elem.map_err(|e| Error::StorageError(format!("unable to stream row {}: {}", i, e)))?;
|
||||||
|
let span = Span::current();
|
||||||
|
span.record("row.key", bstr::BStr::new(&row_key).to_string());
|
||||||
|
|
||||||
for (row_key, mut cells) in response {
|
|
||||||
let cell = cells
|
let cell = cells
|
||||||
.pop()
|
.pop()
|
||||||
.ok_or_else(|| Error::StorageError("found no cells".into()))?;
|
.ok_or_else(|| Error::StorageError("found no cells".into()))?;
|
||||||
|
|
||||||
// The cell must have the same qualifier as the row key
|
|
||||||
if row_key != cell.qualifier {
|
|
||||||
Err(Error::StorageError("unexpected cell qualifier".into()))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure there's only one cell (so no more left after the pop())
|
// Ensure there's only one cell (so no more left after the pop())
|
||||||
// This shouldn't happen, We filter out other cells in our query.
|
// This shouldn't happen, We filter out other cells in our query.
|
||||||
if !cells.is_empty() {
|
if !cells.is_empty() {
|
||||||
|
|
@ -343,6 +343,12 @@ impl PathInfoService for BigtablePathInfoService {
|
||||||
))?
|
))?
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The cell must have the same qualifier as the row key
|
||||||
|
if row_key != cell.qualifier {
|
||||||
|
warn!("unexpected cell qualifier");
|
||||||
|
Err(Error::StorageError("unexpected cell qualifier".into()))?;
|
||||||
|
}
|
||||||
|
|
||||||
// Try to parse the value into a PathInfo message.
|
// Try to parse the value into a PathInfo message.
|
||||||
let path_info_proto = proto::PathInfo::decode(Bytes::from(cell.value))
|
let path_info_proto = proto::PathInfo::decode(Bytes::from(cell.value))
|
||||||
.map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
|
.map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue