feat(tvix/castore/directory): add bigtable backend

This adds a Directory service using
https://cloud.google.com/bigtable/docs/ as a K/V store.

Directory (closures) are put in individual keys.

We don't do any bucketed upload of directory closures (yet), as castore/
fs does query individually, does not request recursively (and buffers).
This will be addressed by store composition at some point.

Change-Id: I7fada45bf386a78b7ec93be38c5f03879a2a6e22
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11212
Tested-by: BuildkiteCI
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Autosubmit: flokli <flokli@flokli.de>
This commit is contained in:
Florian Klink 2024-03-19 12:12:03 +02:00 committed by clbot
parent 84ad8a0bbd
commit 17849c5c00
12 changed files with 2767 additions and 22 deletions

View file

@ -29,6 +29,15 @@ tracing = "0.1.37"
url = "2.4.0"
walkdir = "2.4.0"
zstd = "0.13.0"
serde = { version = "1.0.197", features = [ "derive" ] }
serde_with = "3.7.0"
serde_qs = "0.12.0"
[dependencies.bigtable_rs]
optional = true
# https://github.com/liufuyang/bigtable_rs/pull/72
git = "https://github.com/flokli/bigtable_rs"
rev = "0af404741dfc40eb9fa99cf4d4140a09c5c20df7"
[dependencies.fuse-backend-rs]
optional = true
@ -71,6 +80,7 @@ prost-build = "0.12.1"
tonic-build = "0.11.0"
[dev-dependencies]
async-process = "2.1.0"
rstest = "0.18.2"
tempfile = "3.3.0"
tokio-retry = "0.3.0"
@ -80,6 +90,7 @@ rstest_reuse = "0.6.0"
[features]
default = []
cloud = [
"dep:bigtable_rs",
"object_store/aws",
"object_store/azure",
"object_store/gcp",

View file

@ -4,5 +4,9 @@ depot.tvix.crates.workspaceMembers.tvix-castore.build.override {
runTests = true;
testPreRun = ''
export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt;
export PATH="$PATH:${pkgs.lib.makeBinPath [pkgs.cbtemulator pkgs.google-cloud-bigtable-tool]}"
'';
# enable some optional features.
features = [ "default" "cloud" ];
}

View file

@ -0,0 +1,355 @@
use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2};
use bytes::Bytes;
use data_encoding::HEXLOWER;
use futures::stream::BoxStream;
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationSeconds};
use tonic::async_trait;
use tracing::{instrument, trace, warn};
use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter};
use crate::{proto, B3Digest, Error};
/// There should not be more than 10 MiB in a single cell.
/// https://cloud.google.com/bigtable/docs/schema-design#cells
const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
/// Provides a [DirectoryService] implementation using
/// [Bigtable](https://cloud.google.com/bigtable/docs/)
/// as an underlying K/V store.
///
/// # Data format
/// We use Bigtable as a plain K/V store.
/// The row key is the digest of the directory, in hexlower.
/// Inside the row, we currently have a single column/cell, again using the
/// hexlower directory digest.
/// Its value is the Directory message, serialized in canonical protobuf.
/// We currently only populate this column.
///
/// In the future, we might want to introduce "bucketing", essentially storing
/// all directories inserted via `put_multiple_start` in a batched form.
/// This will prevent looking up intermediate Directories, which are not
/// directly at the root, so rely on store composition.
#[derive(Clone)]
pub struct BigtableDirectoryService {
client: bigtable::BigTable,
params: BigtableParameters,
#[cfg(test)]
#[allow(dead_code)]
/// Holds the temporary directory containing the unix socket, and the
/// spawned emulator process.
emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
}
/// Represents configuration of [BigtableDirectoryService].
/// This currently conflates both connect parameters and data model/client
/// behaviour parameters.
#[serde_as]
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
pub struct BigtableParameters {
project_id: String,
instance_name: String,
#[serde(default)]
is_read_only: bool,
#[serde(default = "default_channel_size")]
channel_size: usize,
#[serde_as(as = "Option<DurationSeconds<String>>")]
#[serde(default = "default_timeout")]
timeout: Option<std::time::Duration>,
table_name: String,
family_name: String,
#[serde(default = "default_app_profile_id")]
app_profile_id: String,
}
fn default_app_profile_id() -> String {
"default".to_owned()
}
fn default_channel_size() -> usize {
4
}
fn default_timeout() -> Option<std::time::Duration> {
Some(std::time::Duration::from_secs(4))
}
impl BigtableDirectoryService {
#[cfg(not(test))]
pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
let connection = bigtable::BigTableConnection::new(
&params.project_id,
&params.instance_name,
params.is_read_only,
params.channel_size,
params.timeout,
)
.await?;
Ok(Self {
client: connection.client(),
params,
})
}
#[cfg(test)]
pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
use std::time::Duration;
use async_process::{Command, Stdio};
use tempfile::TempDir;
use tokio_retry::{strategy::ExponentialBackoff, Retry};
let tmpdir = TempDir::new().unwrap();
let socket_path = tmpdir.path().join("cbtemulator.sock");
let emulator_process = Command::new("cbtemulator")
.arg("-address")
.arg(socket_path.clone())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.kill_on_drop(true)
.spawn()
.expect("failed to spwan emulator");
Retry::spawn(
ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(1)),
|| async {
if socket_path.exists() {
Ok(())
} else {
Err(())
}
},
)
.await
.expect("failed to wait for socket");
// populate the emulator
for cmd in &[
vec!["createtable", &params.table_name],
vec!["createfamily", &params.table_name, &params.family_name],
] {
Command::new("cbt")
.args({
let mut args = vec![
"-instance",
&params.instance_name,
"-project",
&params.project_id,
];
args.extend_from_slice(cmd);
args
})
.env(
"BIGTABLE_EMULATOR_HOST",
format!("unix://{}", socket_path.to_string_lossy()),
)
.output()
.await
.expect("failed to run cbt setup command");
}
let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator(
&format!("unix://{}", socket_path.to_string_lossy()),
&params.project_id,
&params.instance_name,
params.is_read_only,
params.timeout,
)?;
Ok(Self {
client: connection.client(),
params,
emulator: (tmpdir, emulator_process).into(),
})
}
}
/// Derives the row/column key for a given blake3 digest.
/// We use hexlower encoding, also because it can't be misinterpreted as RE2.
fn derive_directory_key(digest: &B3Digest) -> String {
HEXLOWER.encode(digest.as_slice())
}
#[async_trait]
impl DirectoryService for BigtableDirectoryService {
#[instrument(skip(self, digest), err, fields(directory.digest = %digest))]
async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
let mut client = self.client.clone();
let directory_key = derive_directory_key(digest);
let request = bigtable_v2::ReadRowsRequest {
app_profile_id: self.params.app_profile_id.to_string(),
table_name: client.get_full_table_name(&self.params.table_name),
rows_limit: 1,
rows: Some(bigtable_v2::RowSet {
row_keys: vec![directory_key.clone().into()],
row_ranges: vec![],
}),
// Filter selected family name, and column qualifier matching our digest.
// This is to ensure we don't fail once we start bucketing.
filter: Some(bigtable_v2::RowFilter {
filter: Some(bigtable_v2::row_filter::Filter::Chain(
bigtable_v2::row_filter::Chain {
filters: vec![
bigtable_v2::RowFilter {
filter: Some(
bigtable_v2::row_filter::Filter::FamilyNameRegexFilter(
self.params.family_name.to_string(),
),
),
},
bigtable_v2::RowFilter {
filter: Some(
bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
directory_key.clone().into(),
),
),
},
],
},
)),
}),
..Default::default()
};
let mut response = client
.read_rows(request)
.await
.map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?;
if response.len() != 1 {
if response.len() > 1 {
// This shouldn't happen, we limit number of rows to 1
return Err(Error::StorageError(
"got more than one row from bigtable".into(),
));
}
// else, this is simply a "not found".
return Ok(None);
}
let (row_key, mut row_cells) = response.pop().unwrap();
if row_key != directory_key.as_bytes() {
// This shouldn't happen, we requested this row key.
return Err(Error::StorageError(
"got wrong row key from bigtable".into(),
));
}
let row_cell = row_cells
.pop()
.ok_or_else(|| Error::StorageError("found no cells".into()))?;
// Ensure there's only one cell (so no more left after the pop())
// This shouldn't happen, We filter out other cells in our query.
if !row_cells.is_empty() {
return Err(Error::StorageError(
"more than one cell returned from bigtable".into(),
));
}
// We also require the qualifier to be correct in the filter above,
// so this shouldn't happen.
if directory_key.as_bytes() != row_cell.qualifier {
return Err(Error::StorageError("unexpected cell qualifier".into()));
}
// For the data in that cell, ensure the digest matches what's requested, before parsing.
let got_digest = B3Digest::from(blake3::hash(&row_cell.value).as_bytes());
if got_digest != *digest {
return Err(Error::StorageError(format!(
"invalid digest: {}",
got_digest
)));
}
// Try to parse the value into a Directory message.
let directory = proto::Directory::decode(Bytes::from(row_cell.value))
.map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?;
// validate the Directory.
directory
.validate()
.map_err(|e| Error::StorageError(format!("invalid Directory message: {}", e)))?;
Ok(Some(directory))
}
#[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))]
async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
let directory_digest = directory.digest();
let mut client = self.client.clone();
let directory_key = derive_directory_key(&directory_digest);
// Ensure the directory we're trying to upload passes validation
directory
.validate()
.map_err(|e| Error::InvalidRequest(format!("directory is invalid: {}", e)))?;
let data = directory.encode_to_vec();
if data.len() as u64 > CELL_SIZE_LIMIT {
return Err(Error::StorageError(
"Directory exceeds cell limit on Bigtable".into(),
));
}
let resp = client
.check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest {
table_name: client.get_full_table_name(&self.params.table_name),
app_profile_id: self.params.app_profile_id.to_string(),
row_key: directory_key.clone().into(),
predicate_filter: Some(bigtable_v2::RowFilter {
filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter(
directory_key.clone().into(),
)),
}),
// If the column was already found, do nothing.
true_mutations: vec![],
// Else, do the insert.
false_mutations: vec![
// https://cloud.google.com/bigtable/docs/writes
bigtable_v2::Mutation {
mutation: Some(bigtable_v2::mutation::Mutation::SetCell(
bigtable_v2::mutation::SetCell {
family_name: self.params.family_name.to_string(),
column_qualifier: directory_key.clone().into(),
timestamp_micros: -1, // use server time to fill timestamp
value: data,
},
)),
},
],
})
.await
.map_err(|e| Error::StorageError(format!("unable to mutate rows: {}", e)))?;
if resp.predicate_matched {
trace!("already existed")
}
Ok(directory_digest)
}
#[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
fn get_recursive(
&self,
root_directory_digest: &B3Digest,
) -> BoxStream<Result<proto::Directory, Error>> {
traverse_directory(self.clone(), root_directory_digest)
}
#[instrument(skip_all)]
fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)>
where
Self: Clone,
{
Box::new(SimplePutter::new(self.clone()))
}
}

View file

@ -19,7 +19,8 @@ use super::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService, Sled
/// - `grpc+http://host:port`, `grpc+https://host:port`
/// Connects to a (remote) tvix-store gRPC service.
pub async fn from_addr(uri: &str) -> Result<Box<dyn DirectoryService>, crate::Error> {
let url = Url::parse(uri)
#[allow(unused_mut)]
let mut url = Url::parse(uri)
.map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?;
let directory_service: Box<dyn DirectoryService> = match url.scheme() {
@ -62,6 +63,30 @@ pub async fn from_addr(uri: &str) -> Result<Box<dyn DirectoryService>, crate::Er
let client = DirectoryServiceClient::new(crate::tonic::channel_from_url(&url).await?);
Box::new(GRPCDirectoryService::from_client(client))
}
#[cfg(feature = "cloud")]
"bigtable" => {
use super::bigtable::BigtableParameters;
use super::BigtableDirectoryService;
// parse the instance name from the hostname.
let instance_name = url
.host_str()
.ok_or_else(|| Error::StorageError("instance name missing".into()))?
.to_string();
// … but add it to the query string now, so we just need to parse that.
url.query_pairs_mut()
.append_pair("instance_name", &instance_name);
let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
.map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
Box::new(
BigtableDirectoryService::connect(params)
.await
.map_err(|e| Error::StorageError(e.to_string()))?,
)
}
_ => {
return Err(crate::Error::StorageError(format!(
"unknown scheme: {}",
@ -117,6 +142,27 @@ mod tests {
#[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)]
/// Correct scheme to connect to localhost over http, but with additional path, which is invalid.
#[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)]
/// A valid example for Bigtable
#[cfg_attr(
feature = "cloud",
case::bigtable_valid_url(
"bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1",
true
)
)]
/// A valid example for Bigtable, specifying a custom channel size and timeout
#[cfg_attr(
feature = "cloud",
case::bigtable_valid_url(
"bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1&channel_size=10&timeout=10",
true
)
)]
/// A invalid Bigtable example (missing fields)
#[cfg_attr(
feature = "cloud",
case::bigtable_invalid_url("bigtable://instance-1", false)
)]
#[tokio::test]
async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) {
if exp_succeed {

View file

@ -22,6 +22,12 @@ pub use self::sled::SledDirectoryService;
pub use self::traverse::descend_to;
pub use self::utils::traverse_directory;
#[cfg(feature = "cloud")]
mod bigtable;
#[cfg(feature = "cloud")]
pub use self::bigtable::BigtableDirectoryService;
/// The base trait all Directory services need to implement.
/// This is a simple get and put of [crate::proto::Directory], returning their
/// digest.

View file

@ -26,6 +26,7 @@ use self::utils::make_grpc_directory_service_client;
#[case::grpc(make_grpc_directory_service_client().await)]
#[case::memory(directoryservice::from_addr("memory://").await.unwrap())]
#[case::sled(directoryservice::from_addr("sled://").await.unwrap())]
#[cfg_attr(feature = "cloud", case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))]
pub fn directory_services(#[case] directory_service: impl DirectoryService) {}
/// Ensures asking for a directory that doesn't exist returns a Ok(None).

View file

@ -5,6 +5,7 @@ use crate::{
directoryservice::MemoryDirectoryService,
proto::directory_service_server::DirectoryServiceServer,
};
use tonic::transport::{Endpoint, Server, Uri};
/// Constructs and returns a gRPC DirectoryService.