* Fix NIX-23: quadratic complexity in maintaining the referers

mapping.  The referer table is replaced by a referrer table (note
  spelling fix) that stores each referrer separately.  That is,
  instead of having

    referer[P] = {Q_1, Q_2, Q_3, ...}

  we store

    referer[(P, Q_1)] = ""
    referer[(P, Q_2)] = ""
    referer[(P, Q_3)] = ""
    ...

  To find the referrers of P, we enumerate over the keys with a value
  lexicographically greater than P.  This requires the referrer table
  to be stored as a B-Tree rather than a hash table.

  (The tuples (P, Q) are stored as P + null-byte + Q.)

  Old Nix databases are upgraded automatically to the new schema.
This commit is contained in:
Eelco Dolstra 2005-12-12 18:24:42 +00:00
parent 18bbcb1214
commit 8463f27d8c
4 changed files with 107 additions and 40 deletions

View file

@ -257,9 +257,8 @@ void Database::open(const string & path)
if (e.get_errno() == DB_VERSION_MISMATCH) {
/* Remove the environment while we are holding the global
lock. If things go wrong there, we bail out. !!!
there is some leakage here op DbEnv and lock
handles. */
lock. If things go wrong there, we bail out.
!!! argh, we abolished the global lock :-( */
open2(path, true);
/* Try again. */
@ -311,7 +310,7 @@ void Database::close()
}
TableId Database::openTable(const string & tableName)
TableId Database::openTable(const string & tableName, bool sorted)
{
requireEnv();
TableId table = nextId++;
@ -322,7 +321,8 @@ TableId Database::openTable(const string & tableName)
try {
db->open(0, tableName.c_str(), 0,
DB_HASH, DB_CREATE | DB_AUTO_COMMIT, 0666);
sorted ? DB_BTREE : DB_HASH,
DB_CREATE | DB_AUTO_COMMIT, 0666);
} catch (...) {
delete db;
throw;
@ -410,7 +410,7 @@ void Database::delPair(const Transaction & txn, TableId table,
void Database::enumTable(const Transaction & txn, TableId table,
Strings & keys)
Strings & keys, const string & keyPrefix)
{
try {
Db * db = getDb(table);
@ -420,10 +420,21 @@ void Database::enumTable(const Transaction & txn, TableId table,
DestroyDbc destroyDbc(dbc);
Dbt kt, dt;
while (dbc->get(&kt, &dt, DB_NEXT) != DB_NOTFOUND) {
u_int32_t flags = DB_NEXT;
if (!keyPrefix.empty()) {
flags = DB_SET_RANGE;
kt = Dbt((void *) keyPrefix.c_str(), keyPrefix.size());
}
while (dbc->get(&kt, &dt, flags) != DB_NOTFOUND) {
checkInterrupt();
keys.push_back(
string((char *) kt.get_data(), kt.get_size()));
string data((char *) kt.get_data(), kt.get_size());
if (!keyPrefix.empty() &&
data.compare(0, keyPrefix.size(), keyPrefix) != 0)
break;
keys.push_back(data);
flags = DB_NEXT;
}
} catch (DbException e) { rethrow(e); }