refactor(tvix/castore/blobservice/memory): use parking_lot RwLock
This one doesn't require us to deal with poisoning, is upgradeable and the right thing to use when locking access to data, not IO resources. Change-Id: I78634953a73404500d28f51f1d93a87e215c8149 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11612 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: Connor Brewster <cbrewster@hey.com>
This commit is contained in:
parent
96b8b1a205
commit
84114cf02c
5 changed files with 27 additions and 31 deletions
|
|
@ -1,9 +1,7 @@
|
|||
use parking_lot::RwLock;
|
||||
use std::io::{self, Cursor, Write};
|
||||
use std::task::Poll;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use tonic::async_trait;
|
||||
use tracing::instrument;
|
||||
|
||||
|
|
@ -19,13 +17,13 @@ pub struct MemoryBlobService {
|
|||
impl BlobService for MemoryBlobService {
|
||||
#[instrument(skip_all, ret, err, fields(blob.digest=%digest))]
|
||||
async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
|
||||
let db = self.db.read().unwrap();
|
||||
let db = self.db.read();
|
||||
Ok(db.contains_key(digest))
|
||||
}
|
||||
|
||||
#[instrument(skip_all, err, fields(blob.digest=%digest))]
|
||||
async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
|
||||
let db = self.db.read().unwrap();
|
||||
let db = self.db.read();
|
||||
|
||||
match db.get(digest).map(|x| Cursor::new(x.clone())) {
|
||||
Some(result) => Ok(Some(Box::new(result))),
|
||||
|
|
@ -109,24 +107,16 @@ impl BlobWriter for MemoryBlobWriter {
|
|||
} else {
|
||||
let (buf, hasher) = self.writers.take().unwrap();
|
||||
|
||||
// We know self.hasher is doing blake3 hashing, so this won't fail.
|
||||
let digest: B3Digest = hasher.finalize().as_bytes().into();
|
||||
|
||||
// Only insert if the blob doesn't already exist.
|
||||
let db = self.db.read().map_err(|e| {
|
||||
io::Error::new(io::ErrorKind::BrokenPipe, format!("RwLock poisoned: {}", e))
|
||||
})?;
|
||||
let mut db = self.db.upgradable_read();
|
||||
if !db.contains_key(&digest) {
|
||||
// drop the read lock, so we can open for writing.
|
||||
drop(db);
|
||||
|
||||
// open the database for writing.
|
||||
let mut db = self.db.write().map_err(|e| {
|
||||
io::Error::new(io::ErrorKind::BrokenPipe, format!("RwLock poisoned: {}", e))
|
||||
})?;
|
||||
|
||||
// and put buf in there. This will move buf out.
|
||||
db.insert(digest.clone(), buf);
|
||||
db.with_upgraded(|db| {
|
||||
// and put buf in there. This will move buf out.
|
||||
db.insert(digest.clone(), buf);
|
||||
});
|
||||
}
|
||||
|
||||
self.digest = Some(digest.clone());
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue