Add 'third_party/abseil_cpp/' from commit '768eb2ca28'

git-subtree-dir: third_party/abseil_cpp
git-subtree-mainline: ffb2ae54be
git-subtree-split: 768eb2ca28
This commit is contained in:
Vincent Ambo 2020-05-20 02:32:24 +01:00
commit fc8dc48020
1276 changed files with 208196 additions and 0 deletions

View file

@ -0,0 +1,285 @@
#
# Copyright 2017 The Abseil Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
load("@rules_cc//cc:defs.bzl", "cc_binary", "cc_library", "cc_test")
load(
"//absl:copts/configure_copts.bzl",
"ABSL_DEFAULT_COPTS",
"ABSL_DEFAULT_LINKOPTS",
"ABSL_TEST_COPTS",
)
package(default_visibility = ["//visibility:public"])
licenses(["notice"]) # Apache 2.0
# Internal data structure for efficiently detecting mutex dependency cycles
cc_library(
name = "graphcycles_internal",
srcs = [
"internal/graphcycles.cc",
],
hdrs = [
"internal/graphcycles.h",
],
copts = ABSL_DEFAULT_COPTS,
linkopts = ABSL_DEFAULT_LINKOPTS,
visibility = [
"//absl:__subpackages__",
],
deps = [
"//absl/base",
"//absl/base:base_internal",
"//absl/base:config",
"//absl/base:core_headers",
"//absl/base:malloc_internal",
"//absl/base:raw_logging_internal",
],
)
cc_library(
name = "kernel_timeout_internal",
hdrs = ["internal/kernel_timeout.h"],
copts = ABSL_DEFAULT_COPTS,
linkopts = ABSL_DEFAULT_LINKOPTS,
visibility = [
"//absl/synchronization:__pkg__",
],
deps = [
"//absl/base:core_headers",
"//absl/base:raw_logging_internal",
"//absl/time",
],
)
cc_library(
name = "synchronization",
srcs = [
"barrier.cc",
"blocking_counter.cc",
"internal/create_thread_identity.cc",
"internal/per_thread_sem.cc",
"internal/waiter.cc",
"notification.cc",
] + select({
"//conditions:default": ["mutex.cc"],
}),
hdrs = [
"barrier.h",
"blocking_counter.h",
"internal/create_thread_identity.h",
"internal/mutex_nonprod.inc",
"internal/per_thread_sem.h",
"internal/waiter.h",
"mutex.h",
"notification.h",
],
copts = ABSL_DEFAULT_COPTS,
linkopts = select({
"//absl:windows": [],
"//conditions:default": ["-pthread"],
}) + ABSL_DEFAULT_LINKOPTS,
deps = [
":graphcycles_internal",
":kernel_timeout_internal",
"//absl/base",
"//absl/base:atomic_hook",
"//absl/base:base_internal",
"//absl/base:config",
"//absl/base:core_headers",
"//absl/base:dynamic_annotations",
"//absl/base:malloc_internal",
"//absl/base:raw_logging_internal",
"//absl/debugging:stacktrace",
"//absl/debugging:symbolize",
"//absl/time",
],
)
cc_test(
name = "barrier_test",
size = "small",
srcs = ["barrier_test.cc"],
copts = ABSL_TEST_COPTS,
linkopts = ABSL_DEFAULT_LINKOPTS,
deps = [
":synchronization",
"//absl/time",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "blocking_counter_test",
size = "small",
srcs = ["blocking_counter_test.cc"],
copts = ABSL_TEST_COPTS,
linkopts = ABSL_DEFAULT_LINKOPTS,
deps = [
":synchronization",
"//absl/time",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "graphcycles_test",
size = "medium",
srcs = ["internal/graphcycles_test.cc"],
copts = ABSL_TEST_COPTS,
linkopts = ABSL_DEFAULT_LINKOPTS,
deps = [
":graphcycles_internal",
"//absl/base:core_headers",
"//absl/base:raw_logging_internal",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "graphcycles_benchmark",
srcs = ["internal/graphcycles_benchmark.cc"],
copts = ABSL_TEST_COPTS,
linkopts = ABSL_DEFAULT_LINKOPTS,
tags = [
"benchmark",
],
deps = [
":graphcycles_internal",
"//absl/base:raw_logging_internal",
"@com_github_google_benchmark//:benchmark_main",
],
)
cc_library(
name = "thread_pool",
testonly = 1,
hdrs = ["internal/thread_pool.h"],
linkopts = ABSL_DEFAULT_LINKOPTS,
visibility = [
"//absl:__subpackages__",
],
deps = [
":synchronization",
"//absl/base:core_headers",
],
)
cc_test(
name = "mutex_test",
size = "large",
srcs = ["mutex_test.cc"],
copts = ABSL_TEST_COPTS,
linkopts = ABSL_DEFAULT_LINKOPTS,
shard_count = 25,
deps = [
":synchronization",
":thread_pool",
"//absl/base",
"//absl/base:core_headers",
"//absl/base:raw_logging_internal",
"//absl/memory",
"//absl/time",
"@com_google_googletest//:gtest_main",
],
)
cc_library(
name = "mutex_benchmark_common",
testonly = 1,
srcs = ["mutex_benchmark.cc"],
copts = ABSL_TEST_COPTS,
linkopts = ABSL_DEFAULT_LINKOPTS,
visibility = [
"//absl/synchronization:__pkg__",
],
deps = [
":synchronization",
":thread_pool",
"//absl/base",
"@com_github_google_benchmark//:benchmark_main",
],
alwayslink = 1,
)
cc_binary(
name = "mutex_benchmark",
testonly = 1,
copts = ABSL_DEFAULT_COPTS,
linkopts = ABSL_DEFAULT_LINKOPTS,
visibility = ["//visibility:private"],
deps = [
":mutex_benchmark_common",
],
)
cc_test(
name = "notification_test",
size = "small",
srcs = ["notification_test.cc"],
copts = ABSL_TEST_COPTS,
linkopts = ABSL_DEFAULT_LINKOPTS,
deps = [
":synchronization",
"//absl/time",
"@com_google_googletest//:gtest_main",
],
)
cc_library(
name = "per_thread_sem_test_common",
testonly = 1,
srcs = ["internal/per_thread_sem_test.cc"],
copts = ABSL_TEST_COPTS,
linkopts = ABSL_DEFAULT_LINKOPTS,
deps = [
":synchronization",
"//absl/base",
"//absl/strings",
"//absl/time",
"@com_google_googletest//:gtest",
],
alwayslink = 1,
)
cc_test(
name = "per_thread_sem_test",
size = "medium",
copts = ABSL_TEST_COPTS,
linkopts = ABSL_DEFAULT_LINKOPTS,
deps = [
":per_thread_sem_test_common",
":synchronization",
"//absl/strings",
"//absl/time",
"@com_google_googletest//:gtest_main",
],
)
cc_test(
name = "lifetime_test",
srcs = [
"lifetime_test.cc",
],
copts = ABSL_TEST_COPTS,
linkopts = ABSL_DEFAULT_LINKOPTS,
tags = ["no_test_ios_x86_64"],
deps = [
":synchronization",
"//absl/base:core_headers",
"//absl/base:raw_logging_internal",
],
)

View file

@ -0,0 +1,214 @@
#
# Copyright 2017 The Abseil Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
absl_cc_library(
NAME
graphcycles_internal
HDRS
"internal/graphcycles.h"
SRCS
"internal/graphcycles.cc"
COPTS
${ABSL_DEFAULT_COPTS}
DEPS
absl::base
absl::base_internal
absl::config
absl::core_headers
absl::malloc_internal
absl::raw_logging_internal
)
absl_cc_library(
NAME
kernel_timeout_internal
HDRS
"internal/kernel_timeout.h"
COPTS
${ABSL_DEFAULT_COPTS}
DEPS
absl::core_headers
absl::raw_logging_internal
absl::time
)
absl_cc_library(
NAME
synchronization
HDRS
"barrier.h"
"blocking_counter.h"
"internal/create_thread_identity.h"
"internal/mutex_nonprod.inc"
"internal/per_thread_sem.h"
"internal/waiter.h"
"mutex.h"
"notification.h"
SRCS
"barrier.cc"
"blocking_counter.cc"
"internal/create_thread_identity.cc"
"internal/per_thread_sem.cc"
"internal/waiter.cc"
"notification.cc"
"mutex.cc"
COPTS
${ABSL_DEFAULT_COPTS}
DEPS
absl::graphcycles_internal
absl::kernel_timeout_internal
absl::atomic_hook
absl::base
absl::base_internal
absl::config
absl::core_headers
absl::dynamic_annotations
absl::malloc_internal
absl::raw_logging_internal
absl::stacktrace
absl::symbolize
absl::time
Threads::Threads
PUBLIC
)
absl_cc_test(
NAME
barrier_test
SRCS
"barrier_test.cc"
COPTS
${ABSL_TEST_COPTS}
DEPS
absl::synchronization
absl::time
gmock_main
)
absl_cc_test(
NAME
blocking_counter_test
SRCS
"blocking_counter_test.cc"
COPTS
${ABSL_TEST_COPTS}
DEPS
absl::synchronization
absl::time
gmock_main
)
absl_cc_test(
NAME
graphcycles_test
SRCS
"internal/graphcycles_test.cc"
COPTS
${ABSL_TEST_COPTS}
DEPS
absl::graphcycles_internal
absl::core_headers
absl::raw_logging_internal
gmock_main
)
absl_cc_library(
NAME
thread_pool
HDRS
"internal/thread_pool.h"
COPTS
${ABSL_DEFAULT_COPTS}
DEPS
absl::synchronization
absl::core_headers
TESTONLY
)
absl_cc_test(
NAME
mutex_test
SRCS
"mutex_test.cc"
COPTS
${ABSL_TEST_COPTS}
DEPS
absl::synchronization
absl::thread_pool
absl::base
absl::core_headers
absl::memory
absl::raw_logging_internal
absl::time
gmock_main
)
absl_cc_test(
NAME
notification_test
SRCS
"notification_test.cc"
COPTS
${ABSL_TEST_COPTS}
DEPS
absl::synchronization
absl::time
gmock_main
)
absl_cc_library(
NAME
per_thread_sem_test_common
SRCS
"internal/per_thread_sem_test.cc"
COPTS
${ABSL_TEST_COPTS}
DEPS
absl::synchronization
absl::base
absl::strings
absl::time
gmock
TESTONLY
)
absl_cc_test(
NAME
per_thread_sem_test
SRCS
"internal/per_thread_sem_test.cc"
COPTS
${ABSL_TEST_COPTS}
DEPS
absl::per_thread_sem_test_common
absl::synchronization
absl::strings
absl::time
gmock_main
)
absl_cc_test(
NAME
lifetime_test
SRCS
"lifetime_test.cc"
COPTS
${ABSL_TEST_COPTS}
DEPS
absl::synchronization
absl::core_headers
absl::raw_logging_internal
)

View file

@ -0,0 +1,52 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/barrier.h"
#include "absl/base/internal/raw_logging.h"
#include "absl/synchronization/mutex.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
// Return whether int *arg is zero.
static bool IsZero(void *arg) {
return 0 == *reinterpret_cast<int *>(arg);
}
bool Barrier::Block() {
MutexLock l(&this->lock_);
this->num_to_block_--;
if (this->num_to_block_ < 0) {
ABSL_RAW_LOG(
FATAL,
"Block() called too many times. num_to_block_=%d out of total=%d",
this->num_to_block_, this->num_to_exit_);
}
this->lock_.Await(Condition(IsZero, &this->num_to_block_));
// Determine which thread can safely delete this Barrier object
this->num_to_exit_--;
ABSL_RAW_CHECK(this->num_to_exit_ >= 0, "barrier underflow");
// If num_to_exit_ == 0 then all other threads in the barrier have
// exited the Wait() and have released the Mutex so this thread is
// free to delete the barrier.
return this->num_to_exit_ == 0;
}
ABSL_NAMESPACE_END
} // namespace absl

View file

@ -0,0 +1,79 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// -----------------------------------------------------------------------------
// barrier.h
// -----------------------------------------------------------------------------
#ifndef ABSL_SYNCHRONIZATION_BARRIER_H_
#define ABSL_SYNCHRONIZATION_BARRIER_H_
#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
// Barrier
//
// This class creates a barrier which blocks threads until a prespecified
// threshold of threads (`num_threads`) utilizes the barrier. A thread utilizes
// the `Barrier` by calling `Block()` on the barrier, which will block that
// thread; no call to `Block()` will return until `num_threads` threads have
// called it.
//
// Exactly one call to `Block()` will return `true`, which is then responsible
// for destroying the barrier; because stack allocation will cause the barrier
// to be deleted when it is out of scope, barriers should not be stack
// allocated.
//
// Example:
//
// // Main thread creates a `Barrier`:
// barrier = new Barrier(num_threads);
//
// // Each participating thread could then call:
// if (barrier->Block()) delete barrier; // Exactly one call to `Block()`
// // returns `true`; that call
// // deletes the barrier.
class Barrier {
public:
// `num_threads` is the number of threads that will participate in the barrier
explicit Barrier(int num_threads)
: num_to_block_(num_threads), num_to_exit_(num_threads) {}
Barrier(const Barrier&) = delete;
Barrier& operator=(const Barrier&) = delete;
// Barrier::Block()
//
// Blocks the current thread, and returns only when the `num_threads`
// threshold of threads utilizing this barrier has been reached. `Block()`
// returns `true` for precisely one caller, which may then destroy the
// barrier.
//
// Memory ordering: For any threads X and Y, any action taken by X
// before X calls `Block()` will be visible to Y after Y returns from
// `Block()`.
bool Block();
private:
Mutex lock_;
int num_to_block_ ABSL_GUARDED_BY(lock_);
int num_to_exit_ ABSL_GUARDED_BY(lock_);
};
ABSL_NAMESPACE_END
} // namespace absl
#endif // ABSL_SYNCHRONIZATION_BARRIER_H_

View file

@ -0,0 +1,75 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/barrier.h"
#include <thread> // NOLINT(build/c++11)
#include <vector>
#include "gtest/gtest.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/clock.h"
TEST(Barrier, SanityTest) {
constexpr int kNumThreads = 10;
absl::Barrier* barrier = new absl::Barrier(kNumThreads);
absl::Mutex mutex;
int counter = 0; // Guarded by mutex.
auto thread_func = [&] {
if (barrier->Block()) {
// This thread is the last thread to reach the barrier so it is
// responsible for deleting it.
delete barrier;
}
// Increment the counter.
absl::MutexLock lock(&mutex);
++counter;
};
// Start (kNumThreads - 1) threads running thread_func.
std::vector<std::thread> threads;
for (int i = 0; i < kNumThreads - 1; ++i) {
threads.push_back(std::thread(thread_func));
}
// Give (kNumThreads - 1) threads a chance to reach the barrier.
// This test assumes at least one thread will have run after the
// sleep has elapsed. Sleeping in a test is usually bad form, but we
// need to make sure that we are testing the barrier instead of some
// other synchronization method.
absl::SleepFor(absl::Seconds(1));
// The counter should still be zero since no thread should have
// been able to pass the barrier yet.
{
absl::MutexLock lock(&mutex);
EXPECT_EQ(counter, 0);
}
// Start 1 more thread. This should make all threads pass the barrier.
threads.push_back(std::thread(thread_func));
// All threads should now be able to proceed and finish.
for (auto& thread : threads) {
thread.join();
}
// All threads should now have incremented the counter.
absl::MutexLock lock(&mutex);
EXPECT_EQ(counter, kNumThreads);
}

View file

@ -0,0 +1,57 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/blocking_counter.h"
#include "absl/base/internal/raw_logging.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
// Return whether int *arg is zero.
static bool IsZero(void *arg) {
return 0 == *reinterpret_cast<int *>(arg);
}
bool BlockingCounter::DecrementCount() {
MutexLock l(&lock_);
count_--;
if (count_ < 0) {
ABSL_RAW_LOG(
FATAL,
"BlockingCounter::DecrementCount() called too many times. count=%d",
count_);
}
return count_ == 0;
}
void BlockingCounter::Wait() {
MutexLock l(&this->lock_);
ABSL_RAW_CHECK(count_ >= 0, "BlockingCounter underflow");
// only one thread may call Wait(). To support more than one thread,
// implement a counter num_to_exit, like in the Barrier class.
ABSL_RAW_CHECK(num_waiting_ == 0, "multiple threads called Wait()");
num_waiting_++;
this->lock_.Await(Condition(IsZero, &this->count_));
// At this point, We know that all threads executing DecrementCount have
// released the lock, and so will not touch this object again.
// Therefore, the thread calling this method is free to delete the object
// after we return from this method.
}
ABSL_NAMESPACE_END
} // namespace absl

View file

@ -0,0 +1,99 @@
//
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// -----------------------------------------------------------------------------
// blocking_counter.h
// -----------------------------------------------------------------------------
#ifndef ABSL_SYNCHRONIZATION_BLOCKING_COUNTER_H_
#define ABSL_SYNCHRONIZATION_BLOCKING_COUNTER_H_
#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
// BlockingCounter
//
// This class allows a thread to block for a pre-specified number of actions.
// `BlockingCounter` maintains a single non-negative abstract integer "count"
// with an initial value `initial_count`. A thread can then call `Wait()` on
// this blocking counter to block until the specified number of events occur;
// worker threads then call 'DecrementCount()` on the counter upon completion of
// their work. Once the counter's internal "count" reaches zero, the blocked
// thread unblocks.
//
// A `BlockingCounter` requires the following:
// - its `initial_count` is non-negative.
// - the number of calls to `DecrementCount()` on it is at most
// `initial_count`.
// - `Wait()` is called at most once on it.
//
// Given the above requirements, a `BlockingCounter` provides the following
// guarantees:
// - Once its internal "count" reaches zero, no legal action on the object
// can further change the value of "count".
// - When `Wait()` returns, it is legal to destroy the `BlockingCounter`.
// - When `Wait()` returns, the number of calls to `DecrementCount()` on
// this blocking counter exactly equals `initial_count`.
//
// Example:
// BlockingCounter bcount(N); // there are N items of work
// ... Allow worker threads to start.
// ... On completing each work item, workers do:
// ... bcount.DecrementCount(); // an item of work has been completed
//
// bcount.Wait(); // wait for all work to be complete
//
class BlockingCounter {
public:
explicit BlockingCounter(int initial_count)
: count_(initial_count), num_waiting_(0) {}
BlockingCounter(const BlockingCounter&) = delete;
BlockingCounter& operator=(const BlockingCounter&) = delete;
// BlockingCounter::DecrementCount()
//
// Decrements the counter's "count" by one, and return "count == 0". This
// function requires that "count != 0" when it is called.
//
// Memory ordering: For any threads X and Y, any action taken by X
// before it calls `DecrementCount()` is visible to thread Y after
// Y's call to `DecrementCount()`, provided Y's call returns `true`.
bool DecrementCount();
// BlockingCounter::Wait()
//
// Blocks until the counter reaches zero. This function may be called at most
// once. On return, `DecrementCount()` will have been called "initial_count"
// times and the blocking counter may be destroyed.
//
// Memory ordering: For any threads X and Y, any action taken by X
// before X calls `DecrementCount()` is visible to Y after Y returns
// from `Wait()`.
void Wait();
private:
Mutex lock_;
int count_ ABSL_GUARDED_BY(lock_);
int num_waiting_ ABSL_GUARDED_BY(lock_);
};
ABSL_NAMESPACE_END
} // namespace absl
#endif // ABSL_SYNCHRONIZATION_BLOCKING_COUNTER_H_

View file

@ -0,0 +1,68 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/blocking_counter.h"
#include <thread> // NOLINT(build/c++11)
#include <vector>
#include "gtest/gtest.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
namespace {
void PauseAndDecreaseCounter(BlockingCounter* counter, int* done) {
absl::SleepFor(absl::Seconds(1));
*done = 1;
counter->DecrementCount();
}
TEST(BlockingCounterTest, BasicFunctionality) {
// This test verifies that BlockingCounter functions correctly. Starts a
// number of threads that just sleep for a second and decrement a counter.
// Initialize the counter.
const int num_workers = 10;
BlockingCounter counter(num_workers);
std::vector<std::thread> workers;
std::vector<int> done(num_workers, 0);
// Start a number of parallel tasks that will just wait for a seconds and
// then decrement the count.
workers.reserve(num_workers);
for (int k = 0; k < num_workers; k++) {
workers.emplace_back(
[&counter, &done, k] { PauseAndDecreaseCounter(&counter, &done[k]); });
}
// Wait for the threads to have all finished.
counter.Wait();
// Check that all the workers have completed.
for (int k = 0; k < num_workers; k++) {
EXPECT_EQ(1, done[k]);
}
for (std::thread& w : workers) {
w.join();
}
}
} // namespace
ABSL_NAMESPACE_END
} // namespace absl

View file

@ -0,0 +1,140 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <stdint.h>
#include <new>
// This file is a no-op if the required LowLevelAlloc support is missing.
#include "absl/base/internal/low_level_alloc.h"
#ifndef ABSL_LOW_LEVEL_ALLOC_MISSING
#include <string.h>
#include "absl/base/attributes.h"
#include "absl/base/internal/spinlock.h"
#include "absl/base/internal/thread_identity.h"
#include "absl/synchronization/internal/per_thread_sem.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
namespace synchronization_internal {
// ThreadIdentity storage is persistent, we maintain a free-list of previously
// released ThreadIdentity objects.
ABSL_CONST_INIT static base_internal::SpinLock freelist_lock(
absl::kConstInit, base_internal::SCHEDULE_KERNEL_ONLY);
ABSL_CONST_INIT static base_internal::ThreadIdentity* thread_identity_freelist;
// A per-thread destructor for reclaiming associated ThreadIdentity objects.
// Since we must preserve their storage we cache them for re-use.
void ReclaimThreadIdentity(void* v) {
base_internal::ThreadIdentity* identity =
static_cast<base_internal::ThreadIdentity*>(v);
// all_locks might have been allocated by the Mutex implementation.
// We free it here when we are notified that our thread is dying.
if (identity->per_thread_synch.all_locks != nullptr) {
base_internal::LowLevelAlloc::Free(identity->per_thread_synch.all_locks);
}
PerThreadSem::Destroy(identity);
// We must explicitly clear the current thread's identity:
// (a) Subsequent (unrelated) per-thread destructors may require an identity.
// We must guarantee a new identity is used in this case (this instructor
// will be reinvoked up to PTHREAD_DESTRUCTOR_ITERATIONS in this case).
// (b) ThreadIdentity implementations may depend on memory that is not
// reinitialized before reuse. We must allow explicit clearing of the
// association state in this case.
base_internal::ClearCurrentThreadIdentity();
{
base_internal::SpinLockHolder l(&freelist_lock);
identity->next = thread_identity_freelist;
thread_identity_freelist = identity;
}
}
// Return value rounded up to next multiple of align.
// Align must be a power of two.
static intptr_t RoundUp(intptr_t addr, intptr_t align) {
return (addr + align - 1) & ~(align - 1);
}
static void ResetThreadIdentity(base_internal::ThreadIdentity* identity) {
base_internal::PerThreadSynch* pts = &identity->per_thread_synch;
pts->next = nullptr;
pts->skip = nullptr;
pts->may_skip = false;
pts->waitp = nullptr;
pts->suppress_fatal_errors = false;
pts->readers = 0;
pts->priority = 0;
pts->next_priority_read_cycles = 0;
pts->state.store(base_internal::PerThreadSynch::State::kAvailable,
std::memory_order_relaxed);
pts->maybe_unlocking = false;
pts->wake = false;
pts->cond_waiter = false;
pts->all_locks = nullptr;
identity->blocked_count_ptr = nullptr;
identity->ticker.store(0, std::memory_order_relaxed);
identity->wait_start.store(0, std::memory_order_relaxed);
identity->is_idle.store(false, std::memory_order_relaxed);
identity->next = nullptr;
}
static base_internal::ThreadIdentity* NewThreadIdentity() {
base_internal::ThreadIdentity* identity = nullptr;
{
// Re-use a previously released object if possible.
base_internal::SpinLockHolder l(&freelist_lock);
if (thread_identity_freelist) {
identity = thread_identity_freelist; // Take list-head.
thread_identity_freelist = thread_identity_freelist->next;
}
}
if (identity == nullptr) {
// Allocate enough space to align ThreadIdentity to a multiple of
// PerThreadSynch::kAlignment. This space is never released (it is
// added to a freelist by ReclaimThreadIdentity instead).
void* allocation = base_internal::LowLevelAlloc::Alloc(
sizeof(*identity) + base_internal::PerThreadSynch::kAlignment - 1);
// Round up the address to the required alignment.
identity = reinterpret_cast<base_internal::ThreadIdentity*>(
RoundUp(reinterpret_cast<intptr_t>(allocation),
base_internal::PerThreadSynch::kAlignment));
}
ResetThreadIdentity(identity);
return identity;
}
// Allocates and attaches ThreadIdentity object for the calling thread. Returns
// the new identity.
// REQUIRES: CurrentThreadIdentity(false) == nullptr
base_internal::ThreadIdentity* CreateThreadIdentity() {
base_internal::ThreadIdentity* identity = NewThreadIdentity();
PerThreadSem::Init(identity);
// Associate the value with the current thread, and attach our destructor.
base_internal::SetCurrentThreadIdentity(identity, ReclaimThreadIdentity);
return identity;
}
} // namespace synchronization_internal
ABSL_NAMESPACE_END
} // namespace absl
#endif // ABSL_LOW_LEVEL_ALLOC_MISSING

View file

@ -0,0 +1,60 @@
/*
* Copyright 2017 The Abseil Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Interface for getting the current ThreadIdentity, creating one if necessary.
// See thread_identity.h.
//
// This file is separate from thread_identity.h because creating a new
// ThreadIdentity requires slightly higher level libraries (per_thread_sem
// and low_level_alloc) than accessing an existing one. This separation allows
// us to have a smaller //absl/base:base.
#ifndef ABSL_SYNCHRONIZATION_INTERNAL_CREATE_THREAD_IDENTITY_H_
#define ABSL_SYNCHRONIZATION_INTERNAL_CREATE_THREAD_IDENTITY_H_
#include "absl/base/internal/thread_identity.h"
#include "absl/base/port.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
namespace synchronization_internal {
// Allocates and attaches a ThreadIdentity object for the calling thread.
// For private use only.
base_internal::ThreadIdentity* CreateThreadIdentity();
// A per-thread destructor for reclaiming associated ThreadIdentity objects.
// For private use only.
void ReclaimThreadIdentity(void* v);
// Returns the ThreadIdentity object representing the calling thread; guaranteed
// to be unique for its lifetime. The returned object will remain valid for the
// program's lifetime; although it may be re-assigned to a subsequent thread.
// If one does not exist for the calling thread, allocate it now.
inline base_internal::ThreadIdentity* GetOrCreateCurrentThreadIdentity() {
base_internal::ThreadIdentity* identity =
base_internal::CurrentThreadIdentityIfPresent();
if (ABSL_PREDICT_FALSE(identity == nullptr)) {
return CreateThreadIdentity();
}
return identity;
}
} // namespace synchronization_internal
ABSL_NAMESPACE_END
} // namespace absl
#endif // ABSL_SYNCHRONIZATION_INTERNAL_CREATE_THREAD_IDENTITY_H_

View file

@ -0,0 +1,697 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// GraphCycles provides incremental cycle detection on a dynamic
// graph using the following algorithm:
//
// A dynamic topological sort algorithm for directed acyclic graphs
// David J. Pearce, Paul H. J. Kelly
// Journal of Experimental Algorithmics (JEA) JEA Homepage archive
// Volume 11, 2006, Article No. 1.7
//
// Brief summary of the algorithm:
//
// (1) Maintain a rank for each node that is consistent
// with the topological sort of the graph. I.e., path from x to y
// implies rank[x] < rank[y].
// (2) When a new edge (x->y) is inserted, do nothing if rank[x] < rank[y].
// (3) Otherwise: adjust ranks in the neighborhood of x and y.
#include "absl/base/attributes.h"
// This file is a no-op if the required LowLevelAlloc support is missing.
#include "absl/base/internal/low_level_alloc.h"
#ifndef ABSL_LOW_LEVEL_ALLOC_MISSING
#include "absl/synchronization/internal/graphcycles.h"
#include <algorithm>
#include <array>
#include "absl/base/internal/hide_ptr.h"
#include "absl/base/internal/raw_logging.h"
#include "absl/base/internal/spinlock.h"
// Do not use STL. This module does not use standard memory allocation.
namespace absl {
ABSL_NAMESPACE_BEGIN
namespace synchronization_internal {
namespace {
// Avoid LowLevelAlloc's default arena since it calls malloc hooks in
// which people are doing things like acquiring Mutexes.
ABSL_CONST_INIT static absl::base_internal::SpinLock arena_mu(
absl::kConstInit, base_internal::SCHEDULE_KERNEL_ONLY);
ABSL_CONST_INIT static base_internal::LowLevelAlloc::Arena* arena;
static void InitArenaIfNecessary() {
arena_mu.Lock();
if (arena == nullptr) {
arena = base_internal::LowLevelAlloc::NewArena(0);
}
arena_mu.Unlock();
}
// Number of inlined elements in Vec. Hash table implementation
// relies on this being a power of two.
static const uint32_t kInline = 8;
// A simple LowLevelAlloc based resizable vector with inlined storage
// for a few elements. T must be a plain type since constructor
// and destructor are not run on elements of type T managed by Vec.
template <typename T>
class Vec {
public:
Vec() { Init(); }
~Vec() { Discard(); }
void clear() {
Discard();
Init();
}
bool empty() const { return size_ == 0; }
uint32_t size() const { return size_; }
T* begin() { return ptr_; }
T* end() { return ptr_ + size_; }
const T& operator[](uint32_t i) const { return ptr_[i]; }
T& operator[](uint32_t i) { return ptr_[i]; }
const T& back() const { return ptr_[size_-1]; }
void pop_back() { size_--; }
void push_back(const T& v) {
if (size_ == capacity_) Grow(size_ + 1);
ptr_[size_] = v;
size_++;
}
void resize(uint32_t n) {
if (n > capacity_) Grow(n);
size_ = n;
}
void fill(const T& val) {
for (uint32_t i = 0; i < size(); i++) {
ptr_[i] = val;
}
}
// Guarantees src is empty at end.
// Provided for the hash table resizing code below.
void MoveFrom(Vec<T>* src) {
if (src->ptr_ == src->space_) {
// Need to actually copy
resize(src->size_);
std::copy(src->ptr_, src->ptr_ + src->size_, ptr_);
src->size_ = 0;
} else {
Discard();
ptr_ = src->ptr_;
size_ = src->size_;
capacity_ = src->capacity_;
src->Init();
}
}
private:
T* ptr_;
T space_[kInline];
uint32_t size_;
uint32_t capacity_;
void Init() {
ptr_ = space_;
size_ = 0;
capacity_ = kInline;
}
void Discard() {
if (ptr_ != space_) base_internal::LowLevelAlloc::Free(ptr_);
}
void Grow(uint32_t n) {
while (capacity_ < n) {
capacity_ *= 2;
}
size_t request = static_cast<size_t>(capacity_) * sizeof(T);
T* copy = static_cast<T*>(
base_internal::LowLevelAlloc::AllocWithArena(request, arena));
std::copy(ptr_, ptr_ + size_, copy);
Discard();
ptr_ = copy;
}
Vec(const Vec&) = delete;
Vec& operator=(const Vec&) = delete;
};
// A hash set of non-negative int32_t that uses Vec for its underlying storage.
class NodeSet {
public:
NodeSet() { Init(); }
void clear() { Init(); }
bool contains(int32_t v) const { return table_[FindIndex(v)] == v; }
bool insert(int32_t v) {
uint32_t i = FindIndex(v);
if (table_[i] == v) {
return false;
}
if (table_[i] == kEmpty) {
// Only inserting over an empty cell increases the number of occupied
// slots.
occupied_++;
}
table_[i] = v;
// Double when 75% full.
if (occupied_ >= table_.size() - table_.size()/4) Grow();
return true;
}
void erase(uint32_t v) {
uint32_t i = FindIndex(v);
if (static_cast<uint32_t>(table_[i]) == v) {
table_[i] = kDel;
}
}
// Iteration: is done via HASH_FOR_EACH
// Example:
// HASH_FOR_EACH(elem, node->out) { ... }
#define HASH_FOR_EACH(elem, eset) \
for (int32_t elem, _cursor = 0; (eset).Next(&_cursor, &elem); )
bool Next(int32_t* cursor, int32_t* elem) {
while (static_cast<uint32_t>(*cursor) < table_.size()) {
int32_t v = table_[*cursor];
(*cursor)++;
if (v >= 0) {
*elem = v;
return true;
}
}
return false;
}
private:
enum : int32_t { kEmpty = -1, kDel = -2 };
Vec<int32_t> table_;
uint32_t occupied_; // Count of non-empty slots (includes deleted slots)
static uint32_t Hash(uint32_t a) { return a * 41; }
// Return index for storing v. May return an empty index or deleted index
int FindIndex(int32_t v) const {
// Search starting at hash index.
const uint32_t mask = table_.size() - 1;
uint32_t i = Hash(v) & mask;
int deleted_index = -1; // If >= 0, index of first deleted element we see
while (true) {
int32_t e = table_[i];
if (v == e) {
return i;
} else if (e == kEmpty) {
// Return any previously encountered deleted slot.
return (deleted_index >= 0) ? deleted_index : i;
} else if (e == kDel && deleted_index < 0) {
// Keep searching since v might be present later.
deleted_index = i;
}
i = (i + 1) & mask; // Linear probing; quadratic is slightly slower.
}
}
void Init() {
table_.clear();
table_.resize(kInline);
table_.fill(kEmpty);
occupied_ = 0;
}
void Grow() {
Vec<int32_t> copy;
copy.MoveFrom(&table_);
occupied_ = 0;
table_.resize(copy.size() * 2);
table_.fill(kEmpty);
for (const auto& e : copy) {
if (e >= 0) insert(e);
}
}
NodeSet(const NodeSet&) = delete;
NodeSet& operator=(const NodeSet&) = delete;
};
// We encode a node index and a node version in GraphId. The version
// number is incremented when the GraphId is freed which automatically
// invalidates all copies of the GraphId.
inline GraphId MakeId(int32_t index, uint32_t version) {
GraphId g;
g.handle =
(static_cast<uint64_t>(version) << 32) | static_cast<uint32_t>(index);
return g;
}
inline int32_t NodeIndex(GraphId id) {
return static_cast<uint32_t>(id.handle & 0xfffffffful);
}
inline uint32_t NodeVersion(GraphId id) {
return static_cast<uint32_t>(id.handle >> 32);
}
struct Node {
int32_t rank; // rank number assigned by Pearce-Kelly algorithm
uint32_t version; // Current version number
int32_t next_hash; // Next entry in hash table
bool visited; // Temporary marker used by depth-first-search
uintptr_t masked_ptr; // User-supplied pointer
NodeSet in; // List of immediate predecessor nodes in graph
NodeSet out; // List of immediate successor nodes in graph
int priority; // Priority of recorded stack trace.
int nstack; // Depth of recorded stack trace.
void* stack[40]; // stack[0,nstack-1] holds stack trace for node.
};
// Hash table for pointer to node index lookups.
class PointerMap {
public:
explicit PointerMap(const Vec<Node*>* nodes) : nodes_(nodes) {
table_.fill(-1);
}
int32_t Find(void* ptr) {
auto masked = base_internal::HidePtr(ptr);
for (int32_t i = table_[Hash(ptr)]; i != -1;) {
Node* n = (*nodes_)[i];
if (n->masked_ptr == masked) return i;
i = n->next_hash;
}
return -1;
}
void Add(void* ptr, int32_t i) {
int32_t* head = &table_[Hash(ptr)];
(*nodes_)[i]->next_hash = *head;
*head = i;
}
int32_t Remove(void* ptr) {
// Advance through linked list while keeping track of the
// predecessor slot that points to the current entry.
auto masked = base_internal::HidePtr(ptr);
for (int32_t* slot = &table_[Hash(ptr)]; *slot != -1; ) {
int32_t index = *slot;
Node* n = (*nodes_)[index];
if (n->masked_ptr == masked) {
*slot = n->next_hash; // Remove n from linked list
n->next_hash = -1;
return index;
}
slot = &n->next_hash;
}
return -1;
}
private:
// Number of buckets in hash table for pointer lookups.
static constexpr uint32_t kHashTableSize = 8171; // should be prime
const Vec<Node*>* nodes_;
std::array<int32_t, kHashTableSize> table_;
static uint32_t Hash(void* ptr) {
return reinterpret_cast<uintptr_t>(ptr) % kHashTableSize;
}
};
} // namespace
struct GraphCycles::Rep {
Vec<Node*> nodes_;
Vec<int32_t> free_nodes_; // Indices for unused entries in nodes_
PointerMap ptrmap_;
// Temporary state.
Vec<int32_t> deltaf_; // Results of forward DFS
Vec<int32_t> deltab_; // Results of backward DFS
Vec<int32_t> list_; // All nodes to reprocess
Vec<int32_t> merged_; // Rank values to assign to list_ entries
Vec<int32_t> stack_; // Emulates recursion stack for depth-first searches
Rep() : ptrmap_(&nodes_) {}
};
static Node* FindNode(GraphCycles::Rep* rep, GraphId id) {
Node* n = rep->nodes_[NodeIndex(id)];
return (n->version == NodeVersion(id)) ? n : nullptr;
}
GraphCycles::GraphCycles() {
InitArenaIfNecessary();
rep_ = new (base_internal::LowLevelAlloc::AllocWithArena(sizeof(Rep), arena))
Rep;
}
GraphCycles::~GraphCycles() {
for (auto* node : rep_->nodes_) {
node->Node::~Node();
base_internal::LowLevelAlloc::Free(node);
}
rep_->Rep::~Rep();
base_internal::LowLevelAlloc::Free(rep_);
}
bool GraphCycles::CheckInvariants() const {
Rep* r = rep_;
NodeSet ranks; // Set of ranks seen so far.
for (uint32_t x = 0; x < r->nodes_.size(); x++) {
Node* nx = r->nodes_[x];
void* ptr = base_internal::UnhidePtr<void>(nx->masked_ptr);
if (ptr != nullptr && static_cast<uint32_t>(r->ptrmap_.Find(ptr)) != x) {
ABSL_RAW_LOG(FATAL, "Did not find live node in hash table %u %p", x, ptr);
}
if (nx->visited) {
ABSL_RAW_LOG(FATAL, "Did not clear visited marker on node %u", x);
}
if (!ranks.insert(nx->rank)) {
ABSL_RAW_LOG(FATAL, "Duplicate occurrence of rank %d", nx->rank);
}
HASH_FOR_EACH(y, nx->out) {
Node* ny = r->nodes_[y];
if (nx->rank >= ny->rank) {
ABSL_RAW_LOG(FATAL, "Edge %u->%d has bad rank assignment %d->%d", x, y,
nx->rank, ny->rank);
}
}
}
return true;
}
GraphId GraphCycles::GetId(void* ptr) {
int32_t i = rep_->ptrmap_.Find(ptr);
if (i != -1) {
return MakeId(i, rep_->nodes_[i]->version);
} else if (rep_->free_nodes_.empty()) {
Node* n =
new (base_internal::LowLevelAlloc::AllocWithArena(sizeof(Node), arena))
Node;
n->version = 1; // Avoid 0 since it is used by InvalidGraphId()
n->visited = false;
n->rank = rep_->nodes_.size();
n->masked_ptr = base_internal::HidePtr(ptr);
n->nstack = 0;
n->priority = 0;
rep_->nodes_.push_back(n);
rep_->ptrmap_.Add(ptr, n->rank);
return MakeId(n->rank, n->version);
} else {
// Preserve preceding rank since the set of ranks in use must be
// a permutation of [0,rep_->nodes_.size()-1].
int32_t r = rep_->free_nodes_.back();
rep_->free_nodes_.pop_back();
Node* n = rep_->nodes_[r];
n->masked_ptr = base_internal::HidePtr(ptr);
n->nstack = 0;
n->priority = 0;
rep_->ptrmap_.Add(ptr, r);
return MakeId(r, n->version);
}
}
void GraphCycles::RemoveNode(void* ptr) {
int32_t i = rep_->ptrmap_.Remove(ptr);
if (i == -1) {
return;
}
Node* x = rep_->nodes_[i];
HASH_FOR_EACH(y, x->out) {
rep_->nodes_[y]->in.erase(i);
}
HASH_FOR_EACH(y, x->in) {
rep_->nodes_[y]->out.erase(i);
}
x->in.clear();
x->out.clear();
x->masked_ptr = base_internal::HidePtr<void>(nullptr);
if (x->version == std::numeric_limits<uint32_t>::max()) {
// Cannot use x any more
} else {
x->version++; // Invalidates all copies of node.
rep_->free_nodes_.push_back(i);
}
}
void* GraphCycles::Ptr(GraphId id) {
Node* n = FindNode(rep_, id);
return n == nullptr ? nullptr
: base_internal::UnhidePtr<void>(n->masked_ptr);
}
bool GraphCycles::HasNode(GraphId node) {
return FindNode(rep_, node) != nullptr;
}
bool GraphCycles::HasEdge(GraphId x, GraphId y) const {
Node* xn = FindNode(rep_, x);
return xn && FindNode(rep_, y) && xn->out.contains(NodeIndex(y));
}
void GraphCycles::RemoveEdge(GraphId x, GraphId y) {
Node* xn = FindNode(rep_, x);
Node* yn = FindNode(rep_, y);
if (xn && yn) {
xn->out.erase(NodeIndex(y));
yn->in.erase(NodeIndex(x));
// No need to update the rank assignment since a previous valid
// rank assignment remains valid after an edge deletion.
}
}
static bool ForwardDFS(GraphCycles::Rep* r, int32_t n, int32_t upper_bound);
static void BackwardDFS(GraphCycles::Rep* r, int32_t n, int32_t lower_bound);
static void Reorder(GraphCycles::Rep* r);
static void Sort(const Vec<Node*>&, Vec<int32_t>* delta);
static void MoveToList(
GraphCycles::Rep* r, Vec<int32_t>* src, Vec<int32_t>* dst);
bool GraphCycles::InsertEdge(GraphId idx, GraphId idy) {
Rep* r = rep_;
const int32_t x = NodeIndex(idx);
const int32_t y = NodeIndex(idy);
Node* nx = FindNode(r, idx);
Node* ny = FindNode(r, idy);
if (nx == nullptr || ny == nullptr) return true; // Expired ids
if (nx == ny) return false; // Self edge
if (!nx->out.insert(y)) {
// Edge already exists.
return true;
}
ny->in.insert(x);
if (nx->rank <= ny->rank) {
// New edge is consistent with existing rank assignment.
return true;
}
// Current rank assignments are incompatible with the new edge. Recompute.
// We only need to consider nodes that fall in the range [ny->rank,nx->rank].
if (!ForwardDFS(r, y, nx->rank)) {
// Found a cycle. Undo the insertion and tell caller.
nx->out.erase(y);
ny->in.erase(x);
// Since we do not call Reorder() on this path, clear any visited
// markers left by ForwardDFS.
for (const auto& d : r->deltaf_) {
r->nodes_[d]->visited = false;
}
return false;
}
BackwardDFS(r, x, ny->rank);
Reorder(r);
return true;
}
static bool ForwardDFS(GraphCycles::Rep* r, int32_t n, int32_t upper_bound) {
// Avoid recursion since stack space might be limited.
// We instead keep a stack of nodes to visit.
r->deltaf_.clear();
r->stack_.clear();
r->stack_.push_back(n);
while (!r->stack_.empty()) {
n = r->stack_.back();
r->stack_.pop_back();
Node* nn = r->nodes_[n];
if (nn->visited) continue;
nn->visited = true;
r->deltaf_.push_back(n);
HASH_FOR_EACH(w, nn->out) {
Node* nw = r->nodes_[w];
if (nw->rank == upper_bound) {
return false; // Cycle
}
if (!nw->visited && nw->rank < upper_bound) {
r->stack_.push_back(w);
}
}
}
return true;
}
static void BackwardDFS(GraphCycles::Rep* r, int32_t n, int32_t lower_bound) {
r->deltab_.clear();
r->stack_.clear();
r->stack_.push_back(n);
while (!r->stack_.empty()) {
n = r->stack_.back();
r->stack_.pop_back();
Node* nn = r->nodes_[n];
if (nn->visited) continue;
nn->visited = true;
r->deltab_.push_back(n);
HASH_FOR_EACH(w, nn->in) {
Node* nw = r->nodes_[w];
if (!nw->visited && lower_bound < nw->rank) {
r->stack_.push_back(w);
}
}
}
}
static void Reorder(GraphCycles::Rep* r) {
Sort(r->nodes_, &r->deltab_);
Sort(r->nodes_, &r->deltaf_);
// Adds contents of delta lists to list_ (backwards deltas first).
r->list_.clear();
MoveToList(r, &r->deltab_, &r->list_);
MoveToList(r, &r->deltaf_, &r->list_);
// Produce sorted list of all ranks that will be reassigned.
r->merged_.resize(r->deltab_.size() + r->deltaf_.size());
std::merge(r->deltab_.begin(), r->deltab_.end(),
r->deltaf_.begin(), r->deltaf_.end(),
r->merged_.begin());
// Assign the ranks in order to the collected list.
for (uint32_t i = 0; i < r->list_.size(); i++) {
r->nodes_[r->list_[i]]->rank = r->merged_[i];
}
}
static void Sort(const Vec<Node*>& nodes, Vec<int32_t>* delta) {
struct ByRank {
const Vec<Node*>* nodes;
bool operator()(int32_t a, int32_t b) const {
return (*nodes)[a]->rank < (*nodes)[b]->rank;
}
};
ByRank cmp;
cmp.nodes = &nodes;
std::sort(delta->begin(), delta->end(), cmp);
}
static void MoveToList(
GraphCycles::Rep* r, Vec<int32_t>* src, Vec<int32_t>* dst) {
for (auto& v : *src) {
int32_t w = v;
v = r->nodes_[w]->rank; // Replace v entry with its rank
r->nodes_[w]->visited = false; // Prepare for future DFS calls
dst->push_back(w);
}
}
int GraphCycles::FindPath(GraphId idx, GraphId idy, int max_path_len,
GraphId path[]) const {
Rep* r = rep_;
if (FindNode(r, idx) == nullptr || FindNode(r, idy) == nullptr) return 0;
const int32_t x = NodeIndex(idx);
const int32_t y = NodeIndex(idy);
// Forward depth first search starting at x until we hit y.
// As we descend into a node, we push it onto the path.
// As we leave a node, we remove it from the path.
int path_len = 0;
NodeSet seen;
r->stack_.clear();
r->stack_.push_back(x);
while (!r->stack_.empty()) {
int32_t n = r->stack_.back();
r->stack_.pop_back();
if (n < 0) {
// Marker to indicate that we are leaving a node
path_len--;
continue;
}
if (path_len < max_path_len) {
path[path_len] = MakeId(n, rep_->nodes_[n]->version);
}
path_len++;
r->stack_.push_back(-1); // Will remove tentative path entry
if (n == y) {
return path_len;
}
HASH_FOR_EACH(w, r->nodes_[n]->out) {
if (seen.insert(w)) {
r->stack_.push_back(w);
}
}
}
return 0;
}
bool GraphCycles::IsReachable(GraphId x, GraphId y) const {
return FindPath(x, y, 0, nullptr) > 0;
}
void GraphCycles::UpdateStackTrace(GraphId id, int priority,
int (*get_stack_trace)(void** stack, int)) {
Node* n = FindNode(rep_, id);
if (n == nullptr || n->priority >= priority) {
return;
}
n->nstack = (*get_stack_trace)(n->stack, ABSL_ARRAYSIZE(n->stack));
n->priority = priority;
}
int GraphCycles::GetStackTrace(GraphId id, void*** ptr) {
Node* n = FindNode(rep_, id);
if (n == nullptr) {
*ptr = nullptr;
return 0;
} else {
*ptr = n->stack;
return n->nstack;
}
}
} // namespace synchronization_internal
ABSL_NAMESPACE_END
} // namespace absl
#endif // ABSL_LOW_LEVEL_ALLOC_MISSING

View file

@ -0,0 +1,141 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef ABSL_SYNCHRONIZATION_INTERNAL_GRAPHCYCLES_H_
#define ABSL_SYNCHRONIZATION_INTERNAL_GRAPHCYCLES_H_
// GraphCycles detects the introduction of a cycle into a directed
// graph that is being built up incrementally.
//
// Nodes are identified by small integers. It is not possible to
// record multiple edges with the same (source, destination) pair;
// requests to add an edge where one already exists are silently
// ignored.
//
// It is also not possible to introduce a cycle; an attempt to insert
// an edge that would introduce a cycle fails and returns false.
//
// GraphCycles uses no internal locking; calls into it should be
// serialized externally.
// Performance considerations:
// Works well on sparse graphs, poorly on dense graphs.
// Extra information is maintained incrementally to detect cycles quickly.
// InsertEdge() is very fast when the edge already exists, and reasonably fast
// otherwise.
// FindPath() is linear in the size of the graph.
// The current implementation uses O(|V|+|E|) space.
#include <cstdint>
#include "absl/base/config.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
namespace synchronization_internal {
// Opaque identifier for a graph node.
struct GraphId {
uint64_t handle;
bool operator==(const GraphId& x) const { return handle == x.handle; }
bool operator!=(const GraphId& x) const { return handle != x.handle; }
};
// Return an invalid graph id that will never be assigned by GraphCycles.
inline GraphId InvalidGraphId() {
return GraphId{0};
}
class GraphCycles {
public:
GraphCycles();
~GraphCycles();
// Return the id to use for ptr, assigning one if necessary.
// Subsequent calls with the same ptr value will return the same id
// until Remove().
GraphId GetId(void* ptr);
// Remove "ptr" from the graph. Its corresponding node and all
// edges to and from it are removed.
void RemoveNode(void* ptr);
// Return the pointer associated with id, or nullptr if id is not
// currently in the graph.
void* Ptr(GraphId id);
// Attempt to insert an edge from source_node to dest_node. If the
// edge would introduce a cycle, return false without making any
// changes. Otherwise add the edge and return true.
bool InsertEdge(GraphId source_node, GraphId dest_node);
// Remove any edge that exists from source_node to dest_node.
void RemoveEdge(GraphId source_node, GraphId dest_node);
// Return whether node exists in the graph.
bool HasNode(GraphId node);
// Return whether there is an edge directly from source_node to dest_node.
bool HasEdge(GraphId source_node, GraphId dest_node) const;
// Return whether dest_node is reachable from source_node
// by following edges.
bool IsReachable(GraphId source_node, GraphId dest_node) const;
// Find a path from "source" to "dest". If such a path exists,
// place the nodes on the path in the array path[], and return
// the number of nodes on the path. If the path is longer than
// max_path_len nodes, only the first max_path_len nodes are placed
// in path[]. The client should compare the return value with
// max_path_len" to see when this occurs. If no path exists, return
// 0. Any valid path stored in path[] will start with "source" and
// end with "dest". There is no guarantee that the path is the
// shortest, but no node will appear twice in the path, except the
// source and destination node if they are identical; therefore, the
// return value is at most one greater than the number of nodes in
// the graph.
int FindPath(GraphId source, GraphId dest, int max_path_len,
GraphId path[]) const;
// Update the stack trace recorded for id with the current stack
// trace if the last time it was updated had a smaller priority
// than the priority passed on this call.
//
// *get_stack_trace is called to get the stack trace.
void UpdateStackTrace(GraphId id, int priority,
int (*get_stack_trace)(void**, int));
// Set *ptr to the beginning of the array that holds the recorded
// stack trace for id and return the depth of the stack trace.
int GetStackTrace(GraphId id, void*** ptr);
// Check internal invariants. Crashes on failure, returns true on success.
// Expensive: should only be called from graphcycles_test.cc.
bool CheckInvariants() const;
// ----------------------------------------------------
struct Rep;
private:
Rep *rep_; // opaque representation
GraphCycles(const GraphCycles&) = delete;
GraphCycles& operator=(const GraphCycles&) = delete;
};
} // namespace synchronization_internal
ABSL_NAMESPACE_END
} // namespace absl
#endif

View file

@ -0,0 +1,44 @@
// Copyright 2018 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/internal/graphcycles.h"
#include <algorithm>
#include <cstdint>
#include <vector>
#include "benchmark/benchmark.h"
#include "absl/base/internal/raw_logging.h"
namespace {
void BM_StressTest(benchmark::State& state) {
const int num_nodes = state.range(0);
while (state.KeepRunningBatch(num_nodes)) {
absl::synchronization_internal::GraphCycles g;
std::vector<absl::synchronization_internal::GraphId> nodes(num_nodes);
for (int i = 0; i < num_nodes; i++) {
nodes[i] = g.GetId(reinterpret_cast<void*>(static_cast<uintptr_t>(i)));
}
for (int i = 0; i < num_nodes; i++) {
int end = std::min(num_nodes, i + 5);
for (int j = i + 1; j < end; j++) {
ABSL_RAW_CHECK(g.InsertEdge(nodes[i], nodes[j]), "");
}
}
}
}
BENCHMARK(BM_StressTest)->Range(2048, 1048576);
} // namespace

View file

@ -0,0 +1,464 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/internal/graphcycles.h"
#include <map>
#include <random>
#include <unordered_set>
#include <utility>
#include <vector>
#include "gtest/gtest.h"
#include "absl/base/internal/raw_logging.h"
#include "absl/base/macros.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
namespace synchronization_internal {
// We emulate a GraphCycles object with a node vector and an edge vector.
// We then compare the two implementations.
using Nodes = std::vector<int>;
struct Edge {
int from;
int to;
};
using Edges = std::vector<Edge>;
using RandomEngine = std::mt19937_64;
// Mapping from integer index to GraphId.
typedef std::map<int, GraphId> IdMap;
static GraphId Get(const IdMap& id, int num) {
auto iter = id.find(num);
return (iter == id.end()) ? InvalidGraphId() : iter->second;
}
// Return whether "to" is reachable from "from".
static bool IsReachable(Edges *edges, int from, int to,
std::unordered_set<int> *seen) {
seen->insert(from); // we are investigating "from"; don't do it again
if (from == to) return true;
for (const auto &edge : *edges) {
if (edge.from == from) {
if (edge.to == to) { // success via edge directly
return true;
} else if (seen->find(edge.to) == seen->end() && // success via edge
IsReachable(edges, edge.to, to, seen)) {
return true;
}
}
}
return false;
}
static void PrintEdges(Edges *edges) {
ABSL_RAW_LOG(INFO, "EDGES (%zu)", edges->size());
for (const auto &edge : *edges) {
int a = edge.from;
int b = edge.to;
ABSL_RAW_LOG(INFO, "%d %d", a, b);
}
ABSL_RAW_LOG(INFO, "---");
}
static void PrintGCEdges(Nodes *nodes, const IdMap &id, GraphCycles *gc) {
ABSL_RAW_LOG(INFO, "GC EDGES");
for (int a : *nodes) {
for (int b : *nodes) {
if (gc->HasEdge(Get(id, a), Get(id, b))) {
ABSL_RAW_LOG(INFO, "%d %d", a, b);
}
}
}
ABSL_RAW_LOG(INFO, "---");
}
static void PrintTransitiveClosure(Nodes *nodes, Edges *edges) {
ABSL_RAW_LOG(INFO, "Transitive closure");
for (int a : *nodes) {
for (int b : *nodes) {
std::unordered_set<int> seen;
if (IsReachable(edges, a, b, &seen)) {
ABSL_RAW_LOG(INFO, "%d %d", a, b);
}
}
}
ABSL_RAW_LOG(INFO, "---");
}
static void PrintGCTransitiveClosure(Nodes *nodes, const IdMap &id,
GraphCycles *gc) {
ABSL_RAW_LOG(INFO, "GC Transitive closure");
for (int a : *nodes) {
for (int b : *nodes) {
if (gc->IsReachable(Get(id, a), Get(id, b))) {
ABSL_RAW_LOG(INFO, "%d %d", a, b);
}
}
}
ABSL_RAW_LOG(INFO, "---");
}
static void CheckTransitiveClosure(Nodes *nodes, Edges *edges, const IdMap &id,
GraphCycles *gc) {
std::unordered_set<int> seen;
for (const auto &a : *nodes) {
for (const auto &b : *nodes) {
seen.clear();
bool gc_reachable = gc->IsReachable(Get(id, a), Get(id, b));
bool reachable = IsReachable(edges, a, b, &seen);
if (gc_reachable != reachable) {
PrintEdges(edges);
PrintGCEdges(nodes, id, gc);
PrintTransitiveClosure(nodes, edges);
PrintGCTransitiveClosure(nodes, id, gc);
ABSL_RAW_LOG(FATAL, "gc_reachable %s reachable %s a %d b %d",
gc_reachable ? "true" : "false",
reachable ? "true" : "false", a, b);
}
}
}
}
static void CheckEdges(Nodes *nodes, Edges *edges, const IdMap &id,
GraphCycles *gc) {
int count = 0;
for (const auto &edge : *edges) {
int a = edge.from;
int b = edge.to;
if (!gc->HasEdge(Get(id, a), Get(id, b))) {
PrintEdges(edges);
PrintGCEdges(nodes, id, gc);
ABSL_RAW_LOG(FATAL, "!gc->HasEdge(%d, %d)", a, b);
}
}
for (const auto &a : *nodes) {
for (const auto &b : *nodes) {
if (gc->HasEdge(Get(id, a), Get(id, b))) {
count++;
}
}
}
if (count != edges->size()) {
PrintEdges(edges);
PrintGCEdges(nodes, id, gc);
ABSL_RAW_LOG(FATAL, "edges->size() %zu count %d", edges->size(), count);
}
}
static void CheckInvariants(const GraphCycles &gc) {
if (ABSL_PREDICT_FALSE(!gc.CheckInvariants()))
ABSL_RAW_LOG(FATAL, "CheckInvariants");
}
// Returns the index of a randomly chosen node in *nodes.
// Requires *nodes be non-empty.
static int RandomNode(RandomEngine* rng, Nodes *nodes) {
std::uniform_int_distribution<int> uniform(0, nodes->size()-1);
return uniform(*rng);
}
// Returns the index of a randomly chosen edge in *edges.
// Requires *edges be non-empty.
static int RandomEdge(RandomEngine* rng, Edges *edges) {
std::uniform_int_distribution<int> uniform(0, edges->size()-1);
return uniform(*rng);
}
// Returns the index of edge (from, to) in *edges or -1 if it is not in *edges.
static int EdgeIndex(Edges *edges, int from, int to) {
int i = 0;
while (i != edges->size() &&
((*edges)[i].from != from || (*edges)[i].to != to)) {
i++;
}
return i == edges->size()? -1 : i;
}
TEST(GraphCycles, RandomizedTest) {
int next_node = 0;
Nodes nodes;
Edges edges; // from, to
IdMap id;
GraphCycles graph_cycles;
static const int kMaxNodes = 7; // use <= 7 nodes to keep test short
static const int kDataOffset = 17; // an offset to the node-specific data
int n = 100000;
int op = 0;
RandomEngine rng(testing::UnitTest::GetInstance()->random_seed());
std::uniform_int_distribution<int> uniform(0, 5);
auto ptr = [](intptr_t i) {
return reinterpret_cast<void*>(i + kDataOffset);
};
for (int iter = 0; iter != n; iter++) {
for (const auto &node : nodes) {
ASSERT_EQ(graph_cycles.Ptr(Get(id, node)), ptr(node)) << " node " << node;
}
CheckEdges(&nodes, &edges, id, &graph_cycles);
CheckTransitiveClosure(&nodes, &edges, id, &graph_cycles);
op = uniform(rng);
switch (op) {
case 0: // Add a node
if (nodes.size() < kMaxNodes) {
int new_node = next_node++;
GraphId new_gnode = graph_cycles.GetId(ptr(new_node));
ASSERT_NE(new_gnode, InvalidGraphId());
id[new_node] = new_gnode;
ASSERT_EQ(ptr(new_node), graph_cycles.Ptr(new_gnode));
nodes.push_back(new_node);
}
break;
case 1: // Remove a node
if (nodes.size() > 0) {
int node_index = RandomNode(&rng, &nodes);
int node = nodes[node_index];
nodes[node_index] = nodes.back();
nodes.pop_back();
graph_cycles.RemoveNode(ptr(node));
ASSERT_EQ(graph_cycles.Ptr(Get(id, node)), nullptr);
id.erase(node);
int i = 0;
while (i != edges.size()) {
if (edges[i].from == node || edges[i].to == node) {
edges[i] = edges.back();
edges.pop_back();
} else {
i++;
}
}
}
break;
case 2: // Add an edge
if (nodes.size() > 0) {
int from = RandomNode(&rng, &nodes);
int to = RandomNode(&rng, &nodes);
if (EdgeIndex(&edges, nodes[from], nodes[to]) == -1) {
if (graph_cycles.InsertEdge(id[nodes[from]], id[nodes[to]])) {
Edge new_edge;
new_edge.from = nodes[from];
new_edge.to = nodes[to];
edges.push_back(new_edge);
} else {
std::unordered_set<int> seen;
ASSERT_TRUE(IsReachable(&edges, nodes[to], nodes[from], &seen))
<< "Edge " << nodes[to] << "->" << nodes[from];
}
}
}
break;
case 3: // Remove an edge
if (edges.size() > 0) {
int i = RandomEdge(&rng, &edges);
int from = edges[i].from;
int to = edges[i].to;
ASSERT_EQ(i, EdgeIndex(&edges, from, to));
edges[i] = edges.back();
edges.pop_back();
ASSERT_EQ(-1, EdgeIndex(&edges, from, to));
graph_cycles.RemoveEdge(id[from], id[to]);
}
break;
case 4: // Check a path
if (nodes.size() > 0) {
int from = RandomNode(&rng, &nodes);
int to = RandomNode(&rng, &nodes);
GraphId path[2*kMaxNodes];
int path_len = graph_cycles.FindPath(id[nodes[from]], id[nodes[to]],
ABSL_ARRAYSIZE(path), path);
std::unordered_set<int> seen;
bool reachable = IsReachable(&edges, nodes[from], nodes[to], &seen);
bool gc_reachable =
graph_cycles.IsReachable(Get(id, nodes[from]), Get(id, nodes[to]));
ASSERT_EQ(path_len != 0, reachable);
ASSERT_EQ(path_len != 0, gc_reachable);
// In the following line, we add one because a node can appear
// twice, if the path is from that node to itself, perhaps via
// every other node.
ASSERT_LE(path_len, kMaxNodes + 1);
if (path_len != 0) {
ASSERT_EQ(id[nodes[from]], path[0]);
ASSERT_EQ(id[nodes[to]], path[path_len-1]);
for (int i = 1; i < path_len; i++) {
ASSERT_TRUE(graph_cycles.HasEdge(path[i-1], path[i]));
}
}
}
break;
case 5: // Check invariants
CheckInvariants(graph_cycles);
break;
default:
ABSL_RAW_LOG(FATAL, "op %d", op);
}
// Very rarely, test graph expansion by adding then removing many nodes.
std::bernoulli_distribution one_in_1024(1.0 / 1024);
if (one_in_1024(rng)) {
CheckEdges(&nodes, &edges, id, &graph_cycles);
CheckTransitiveClosure(&nodes, &edges, id, &graph_cycles);
for (int i = 0; i != 256; i++) {
int new_node = next_node++;
GraphId new_gnode = graph_cycles.GetId(ptr(new_node));
ASSERT_NE(InvalidGraphId(), new_gnode);
id[new_node] = new_gnode;
ASSERT_EQ(ptr(new_node), graph_cycles.Ptr(new_gnode));
for (const auto &node : nodes) {
ASSERT_NE(node, new_node);
}
nodes.push_back(new_node);
}
for (int i = 0; i != 256; i++) {
ASSERT_GT(nodes.size(), 0);
int node_index = RandomNode(&rng, &nodes);
int node = nodes[node_index];
nodes[node_index] = nodes.back();
nodes.pop_back();
graph_cycles.RemoveNode(ptr(node));
id.erase(node);
int j = 0;
while (j != edges.size()) {
if (edges[j].from == node || edges[j].to == node) {
edges[j] = edges.back();
edges.pop_back();
} else {
j++;
}
}
}
CheckInvariants(graph_cycles);
}
}
}
class GraphCyclesTest : public ::testing::Test {
public:
IdMap id_;
GraphCycles g_;
static void* Ptr(int i) {
return reinterpret_cast<void*>(static_cast<uintptr_t>(i));
}
static int Num(void* ptr) {
return static_cast<int>(reinterpret_cast<uintptr_t>(ptr));
}
// Test relies on ith NewNode() call returning Node numbered i
GraphCyclesTest() {
for (int i = 0; i < 100; i++) {
id_[i] = g_.GetId(Ptr(i));
}
CheckInvariants(g_);
}
bool AddEdge(int x, int y) {
return g_.InsertEdge(Get(id_, x), Get(id_, y));
}
void AddMultiples() {
// For every node x > 0: add edge to 2*x, 3*x
for (int x = 1; x < 25; x++) {
EXPECT_TRUE(AddEdge(x, 2*x)) << x;
EXPECT_TRUE(AddEdge(x, 3*x)) << x;
}
CheckInvariants(g_);
}
std::string Path(int x, int y) {
GraphId path[5];
int np = g_.FindPath(Get(id_, x), Get(id_, y), ABSL_ARRAYSIZE(path), path);
std::string result;
for (int i = 0; i < np; i++) {
if (i >= ABSL_ARRAYSIZE(path)) {
result += " ...";
break;
}
if (!result.empty()) result.push_back(' ');
char buf[20];
snprintf(buf, sizeof(buf), "%d", Num(g_.Ptr(path[i])));
result += buf;
}
return result;
}
};
TEST_F(GraphCyclesTest, NoCycle) {
AddMultiples();
CheckInvariants(g_);
}
TEST_F(GraphCyclesTest, SimpleCycle) {
AddMultiples();
EXPECT_FALSE(AddEdge(8, 4));
EXPECT_EQ("4 8", Path(4, 8));
CheckInvariants(g_);
}
TEST_F(GraphCyclesTest, IndirectCycle) {
AddMultiples();
EXPECT_TRUE(AddEdge(16, 9));
CheckInvariants(g_);
EXPECT_FALSE(AddEdge(9, 2));
EXPECT_EQ("2 4 8 16 9", Path(2, 9));
CheckInvariants(g_);
}
TEST_F(GraphCyclesTest, LongPath) {
ASSERT_TRUE(AddEdge(2, 4));
ASSERT_TRUE(AddEdge(4, 6));
ASSERT_TRUE(AddEdge(6, 8));
ASSERT_TRUE(AddEdge(8, 10));
ASSERT_TRUE(AddEdge(10, 12));
ASSERT_FALSE(AddEdge(12, 2));
EXPECT_EQ("2 4 6 8 10 ...", Path(2, 12));
CheckInvariants(g_);
}
TEST_F(GraphCyclesTest, RemoveNode) {
ASSERT_TRUE(AddEdge(1, 2));
ASSERT_TRUE(AddEdge(2, 3));
ASSERT_TRUE(AddEdge(3, 4));
ASSERT_TRUE(AddEdge(4, 5));
g_.RemoveNode(g_.Ptr(id_[3]));
id_.erase(3);
ASSERT_TRUE(AddEdge(5, 1));
}
TEST_F(GraphCyclesTest, ManyEdges) {
const int N = 50;
for (int i = 0; i < N; i++) {
for (int j = 1; j < N; j++) {
ASSERT_TRUE(AddEdge(i, i+j));
}
}
CheckInvariants(g_);
ASSERT_TRUE(AddEdge(2*N-1, 0));
CheckInvariants(g_);
ASSERT_FALSE(AddEdge(10, 9));
CheckInvariants(g_);
}
} // namespace synchronization_internal
ABSL_NAMESPACE_END
} // namespace absl

View file

@ -0,0 +1,155 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// An optional absolute timeout, with nanosecond granularity,
// compatible with absl::Time. Suitable for in-register
// parameter-passing (e.g. syscalls.)
// Constructible from a absl::Time (for a timeout to be respected) or {}
// (for "no timeout".)
// This is a private low-level API for use by a handful of low-level
// components that are friends of this class. Higher-level components
// should build APIs based on absl::Time and absl::Duration.
#ifndef ABSL_SYNCHRONIZATION_INTERNAL_KERNEL_TIMEOUT_H_
#define ABSL_SYNCHRONIZATION_INTERNAL_KERNEL_TIMEOUT_H_
#include <time.h>
#include <algorithm>
#include <limits>
#include "absl/base/internal/raw_logging.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
namespace synchronization_internal {
class Futex;
class Waiter;
class KernelTimeout {
public:
// A timeout that should expire at <t>. Any value, in the full
// InfinitePast() to InfiniteFuture() range, is valid here and will be
// respected.
explicit KernelTimeout(absl::Time t) : ns_(MakeNs(t)) {}
// No timeout.
KernelTimeout() : ns_(0) {}
// A more explicit factory for those who prefer it. Equivalent to {}.
static KernelTimeout Never() { return {}; }
// We explicitly do not support other custom formats: timespec, int64_t nanos.
// Unify on this and absl::Time, please.
bool has_timeout() const { return ns_ != 0; }
private:
// internal rep, not user visible: ns after unix epoch.
// zero = no timeout.
// Negative we treat as an unlikely (and certainly expired!) but valid
// timeout.
int64_t ns_;
static int64_t MakeNs(absl::Time t) {
// optimization--InfiniteFuture is common "no timeout" value
// and cheaper to compare than convert.
if (t == absl::InfiniteFuture()) return 0;
int64_t x = ToUnixNanos(t);
// A timeout that lands exactly on the epoch (x=0) needs to be respected,
// so we alter it unnoticably to 1. Negative timeouts are in
// theory supported, but handled poorly by the kernel (long
// delays) so push them forward too; since all such times have
// already passed, it's indistinguishable.
if (x <= 0) x = 1;
// A time larger than what can be represented to the kernel is treated
// as no timeout.
if (x == (std::numeric_limits<int64_t>::max)()) x = 0;
return x;
}
// Convert to parameter for sem_timedwait/futex/similar. Only for approved
// users. Do not call if !has_timeout.
struct timespec MakeAbsTimespec() {
int64_t n = ns_;
static const int64_t kNanosPerSecond = 1000 * 1000 * 1000;
if (n == 0) {
ABSL_RAW_LOG(
ERROR,
"Tried to create a timespec from a non-timeout; never do this.");
// But we'll try to continue sanely. no-timeout ~= saturated timeout.
n = (std::numeric_limits<int64_t>::max)();
}
// Kernel APIs validate timespecs as being at or after the epoch,
// despite the kernel time type being signed. However, no one can
// tell the difference between a timeout at or before the epoch (since
// all such timeouts have expired!)
if (n < 0) n = 0;
struct timespec abstime;
int64_t seconds = (std::min)(n / kNanosPerSecond,
int64_t{(std::numeric_limits<time_t>::max)()});
abstime.tv_sec = static_cast<time_t>(seconds);
abstime.tv_nsec =
static_cast<decltype(abstime.tv_nsec)>(n % kNanosPerSecond);
return abstime;
}
#ifdef _WIN32
// Converts to milliseconds from now, or INFINITE when
// !has_timeout(). For use by SleepConditionVariableSRW on
// Windows. Callers should recognize that the return value is a
// relative duration (it should be recomputed by calling this method
// in the case of a spurious wakeup).
// This header file may be included transitively by public header files,
// so we define our own DWORD and INFINITE instead of getting them from
// <intsafe.h> and <WinBase.h>.
typedef unsigned long DWord; // NOLINT
DWord InMillisecondsFromNow() const {
constexpr DWord kInfinite = (std::numeric_limits<DWord>::max)();
if (!has_timeout()) {
return kInfinite;
}
// The use of absl::Now() to convert from absolute time to
// relative time means that absl::Now() cannot use anything that
// depends on KernelTimeout (for example, Mutex) on Windows.
int64_t now = ToUnixNanos(absl::Now());
if (ns_ >= now) {
// Round up so that Now() + ms_from_now >= ns_.
constexpr uint64_t max_nanos =
(std::numeric_limits<int64_t>::max)() - 999999u;
uint64_t ms_from_now =
(std::min<uint64_t>(max_nanos, ns_ - now) + 999999u) / 1000000u;
if (ms_from_now > kInfinite) {
return kInfinite;
}
return static_cast<DWord>(ms_from_now);
}
return 0;
}
#endif
friend class Futex;
friend class Waiter;
};
} // namespace synchronization_internal
ABSL_NAMESPACE_END
} // namespace absl
#endif // ABSL_SYNCHRONIZATION_INTERNAL_KERNEL_TIMEOUT_H_

View file

@ -0,0 +1,324 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Implementation of a small subset of Mutex and CondVar functionality
// for platforms where the production implementation hasn't been fully
// ported yet.
#include "absl/synchronization/mutex.h"
#if defined(_WIN32)
#include <chrono> // NOLINT(build/c++11)
#else
#include <sys/time.h>
#include <time.h>
#endif
#include <algorithm>
#include "absl/base/internal/raw_logging.h"
#include "absl/time/time.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
void SetMutexDeadlockDetectionMode(OnDeadlockCycle) {}
void EnableMutexInvariantDebugging(bool) {}
namespace synchronization_internal {
namespace {
// Return the current time plus the timeout.
absl::Time DeadlineFromTimeout(absl::Duration timeout) {
return absl::Now() + timeout;
}
// Limit the deadline to a positive, 32-bit time_t value to accommodate
// implementation restrictions. This also deals with InfinitePast and
// InfiniteFuture.
absl::Time LimitedDeadline(absl::Time deadline) {
deadline = std::max(absl::FromTimeT(0), deadline);
deadline = std::min(deadline, absl::FromTimeT(0x7fffffff));
return deadline;
}
} // namespace
#if defined(_WIN32)
MutexImpl::MutexImpl() {}
MutexImpl::~MutexImpl() {
if (locked_) {
std_mutex_.unlock();
}
}
void MutexImpl::Lock() {
std_mutex_.lock();
locked_ = true;
}
bool MutexImpl::TryLock() {
bool locked = std_mutex_.try_lock();
if (locked) locked_ = true;
return locked;
}
void MutexImpl::Unlock() {
locked_ = false;
released_.SignalAll();
std_mutex_.unlock();
}
CondVarImpl::CondVarImpl() {}
CondVarImpl::~CondVarImpl() {}
void CondVarImpl::Signal() { std_cv_.notify_one(); }
void CondVarImpl::SignalAll() { std_cv_.notify_all(); }
void CondVarImpl::Wait(MutexImpl* mu) {
mu->released_.SignalAll();
std_cv_.wait(mu->std_mutex_);
}
bool CondVarImpl::WaitWithDeadline(MutexImpl* mu, absl::Time deadline) {
mu->released_.SignalAll();
time_t when = ToTimeT(deadline);
int64_t nanos = ToInt64Nanoseconds(deadline - absl::FromTimeT(when));
std::chrono::system_clock::time_point deadline_tp =
std::chrono::system_clock::from_time_t(when) +
std::chrono::duration_cast<std::chrono::system_clock::duration>(
std::chrono::nanoseconds(nanos));
auto deadline_since_epoch =
std::chrono::duration_cast<std::chrono::duration<double>>(
deadline_tp - std::chrono::system_clock::from_time_t(0));
return std_cv_.wait_until(mu->std_mutex_, deadline_tp) ==
std::cv_status::timeout;
}
#else // ! _WIN32
MutexImpl::MutexImpl() {
ABSL_RAW_CHECK(pthread_mutex_init(&pthread_mutex_, nullptr) == 0,
"pthread error");
}
MutexImpl::~MutexImpl() {
if (locked_) {
ABSL_RAW_CHECK(pthread_mutex_unlock(&pthread_mutex_) == 0, "pthread error");
}
ABSL_RAW_CHECK(pthread_mutex_destroy(&pthread_mutex_) == 0, "pthread error");
}
void MutexImpl::Lock() {
ABSL_RAW_CHECK(pthread_mutex_lock(&pthread_mutex_) == 0, "pthread error");
locked_ = true;
}
bool MutexImpl::TryLock() {
bool locked = (0 == pthread_mutex_trylock(&pthread_mutex_));
if (locked) locked_ = true;
return locked;
}
void MutexImpl::Unlock() {
locked_ = false;
released_.SignalAll();
ABSL_RAW_CHECK(pthread_mutex_unlock(&pthread_mutex_) == 0, "pthread error");
}
CondVarImpl::CondVarImpl() {
ABSL_RAW_CHECK(pthread_cond_init(&pthread_cv_, nullptr) == 0,
"pthread error");
}
CondVarImpl::~CondVarImpl() {
ABSL_RAW_CHECK(pthread_cond_destroy(&pthread_cv_) == 0, "pthread error");
}
void CondVarImpl::Signal() {
ABSL_RAW_CHECK(pthread_cond_signal(&pthread_cv_) == 0, "pthread error");
}
void CondVarImpl::SignalAll() {
ABSL_RAW_CHECK(pthread_cond_broadcast(&pthread_cv_) == 0, "pthread error");
}
void CondVarImpl::Wait(MutexImpl* mu) {
mu->released_.SignalAll();
ABSL_RAW_CHECK(pthread_cond_wait(&pthread_cv_, &mu->pthread_mutex_) == 0,
"pthread error");
}
bool CondVarImpl::WaitWithDeadline(MutexImpl* mu, absl::Time deadline) {
mu->released_.SignalAll();
struct timespec ts = ToTimespec(deadline);
int rc = pthread_cond_timedwait(&pthread_cv_, &mu->pthread_mutex_, &ts);
if (rc == ETIMEDOUT) return true;
ABSL_RAW_CHECK(rc == 0, "pthread error");
return false;
}
#endif // ! _WIN32
void MutexImpl::Await(const Condition& cond) {
if (cond.Eval()) return;
released_.SignalAll();
do {
released_.Wait(this);
} while (!cond.Eval());
}
bool MutexImpl::AwaitWithDeadline(const Condition& cond, absl::Time deadline) {
if (cond.Eval()) return true;
released_.SignalAll();
while (true) {
if (released_.WaitWithDeadline(this, deadline)) return false;
if (cond.Eval()) return true;
}
}
} // namespace synchronization_internal
Mutex::Mutex() {}
Mutex::~Mutex() {}
void Mutex::Lock() { impl()->Lock(); }
void Mutex::Unlock() { impl()->Unlock(); }
bool Mutex::TryLock() { return impl()->TryLock(); }
void Mutex::ReaderLock() { Lock(); }
void Mutex::ReaderUnlock() { Unlock(); }
void Mutex::Await(const Condition& cond) { impl()->Await(cond); }
void Mutex::LockWhen(const Condition& cond) {
Lock();
Await(cond);
}
bool Mutex::AwaitWithDeadline(const Condition& cond, absl::Time deadline) {
return impl()->AwaitWithDeadline(
cond, synchronization_internal::LimitedDeadline(deadline));
}
bool Mutex::AwaitWithTimeout(const Condition& cond, absl::Duration timeout) {
return AwaitWithDeadline(
cond, synchronization_internal::DeadlineFromTimeout(timeout));
}
bool Mutex::LockWhenWithDeadline(const Condition& cond, absl::Time deadline) {
Lock();
return AwaitWithDeadline(cond, deadline);
}
bool Mutex::LockWhenWithTimeout(const Condition& cond, absl::Duration timeout) {
return LockWhenWithDeadline(
cond, synchronization_internal::DeadlineFromTimeout(timeout));
}
void Mutex::ReaderLockWhen(const Condition& cond) {
ReaderLock();
Await(cond);
}
bool Mutex::ReaderLockWhenWithTimeout(const Condition& cond,
absl::Duration timeout) {
return LockWhenWithTimeout(cond, timeout);
}
bool Mutex::ReaderLockWhenWithDeadline(const Condition& cond,
absl::Time deadline) {
return LockWhenWithDeadline(cond, deadline);
}
void Mutex::EnableDebugLog(const char*) {}
void Mutex::EnableInvariantDebugging(void (*)(void*), void*) {}
void Mutex::ForgetDeadlockInfo() {}
void Mutex::AssertHeld() const {}
void Mutex::AssertReaderHeld() const {}
void Mutex::AssertNotHeld() const {}
CondVar::CondVar() {}
CondVar::~CondVar() {}
void CondVar::Signal() { impl()->Signal(); }
void CondVar::SignalAll() { impl()->SignalAll(); }
void CondVar::Wait(Mutex* mu) { return impl()->Wait(mu->impl()); }
bool CondVar::WaitWithDeadline(Mutex* mu, absl::Time deadline) {
return impl()->WaitWithDeadline(
mu->impl(), synchronization_internal::LimitedDeadline(deadline));
}
bool CondVar::WaitWithTimeout(Mutex* mu, absl::Duration timeout) {
return WaitWithDeadline(mu, absl::Now() + timeout);
}
void CondVar::EnableDebugLog(const char*) {}
#ifdef THREAD_SANITIZER
extern "C" void __tsan_read1(void *addr);
#else
#define __tsan_read1(addr) // do nothing if TSan not enabled
#endif
// A function that just returns its argument, dereferenced
static bool Dereference(void *arg) {
// ThreadSanitizer does not instrument this file for memory accesses.
// This function dereferences a user variable that can participate
// in a data race, so we need to manually tell TSan about this memory access.
__tsan_read1(arg);
return *(static_cast<bool *>(arg));
}
Condition::Condition() {} // null constructor, used for kTrue only
const Condition Condition::kTrue;
Condition::Condition(bool (*func)(void *), void *arg)
: eval_(&CallVoidPtrFunction),
function_(func),
method_(nullptr),
arg_(arg) {}
bool Condition::CallVoidPtrFunction(const Condition *c) {
return (*c->function_)(c->arg_);
}
Condition::Condition(const bool *cond)
: eval_(CallVoidPtrFunction),
function_(Dereference),
method_(nullptr),
// const_cast is safe since Dereference does not modify arg
arg_(const_cast<bool *>(cond)) {}
bool Condition::Eval() const {
// eval_ == null for kTrue
return (this->eval_ == nullptr) || (*this->eval_)(this);
}
void RegisterSymbolizer(bool (*)(const void*, char*, int)) {}
ABSL_NAMESPACE_END
} // namespace absl

View file

@ -0,0 +1,249 @@
// Do not include. This is an implementation detail of base/mutex.h.
//
// Declares three classes:
//
// base::internal::MutexImpl - implementation helper for Mutex
// base::internal::CondVarImpl - implementation helper for CondVar
// base::internal::SynchronizationStorage<T> - implementation helper for
// Mutex, CondVar
#include <type_traits>
#if defined(_WIN32)
#include <condition_variable>
#include <mutex>
#else
#include <pthread.h>
#endif
#include "absl/base/call_once.h"
#include "absl/time/time.h"
// Declare that Mutex::ReaderLock is actually Lock(). Intended primarily
// for tests, and even then as a last resort.
#ifdef ABSL_MUTEX_READER_LOCK_IS_EXCLUSIVE
#error ABSL_MUTEX_READER_LOCK_IS_EXCLUSIVE cannot be directly set
#else
#define ABSL_MUTEX_READER_LOCK_IS_EXCLUSIVE 1
#endif
// Declare that Mutex::EnableInvariantDebugging is not implemented.
// Intended primarily for tests, and even then as a last resort.
#ifdef ABSL_MUTEX_ENABLE_INVARIANT_DEBUGGING_NOT_IMPLEMENTED
#error ABSL_MUTEX_ENABLE_INVARIANT_DEBUGGING_NOT_IMPLEMENTED cannot be directly set
#else
#define ABSL_MUTEX_ENABLE_INVARIANT_DEBUGGING_NOT_IMPLEMENTED 1
#endif
namespace absl {
ABSL_NAMESPACE_BEGIN
class Condition;
namespace synchronization_internal {
class MutexImpl;
// Do not use this implementation detail of CondVar. Provides most of the
// implementation, but should not be placed directly in static storage
// because it will not linker initialize properly. See
// SynchronizationStorage<T> below for what we mean by linker
// initialization.
class CondVarImpl {
public:
CondVarImpl();
CondVarImpl(const CondVarImpl&) = delete;
CondVarImpl& operator=(const CondVarImpl&) = delete;
~CondVarImpl();
void Signal();
void SignalAll();
void Wait(MutexImpl* mutex);
bool WaitWithDeadline(MutexImpl* mutex, absl::Time deadline);
private:
#if defined(_WIN32)
std::condition_variable_any std_cv_;
#else
pthread_cond_t pthread_cv_;
#endif
};
// Do not use this implementation detail of Mutex. Provides most of the
// implementation, but should not be placed directly in static storage
// because it will not linker initialize properly. See
// SynchronizationStorage<T> below for what we mean by linker
// initialization.
class MutexImpl {
public:
MutexImpl();
MutexImpl(const MutexImpl&) = delete;
MutexImpl& operator=(const MutexImpl&) = delete;
~MutexImpl();
void Lock();
bool TryLock();
void Unlock();
void Await(const Condition& cond);
bool AwaitWithDeadline(const Condition& cond, absl::Time deadline);
private:
friend class CondVarImpl;
#if defined(_WIN32)
std::mutex std_mutex_;
#else
pthread_mutex_t pthread_mutex_;
#endif
// True if the underlying mutex is locked. If the destructor is entered
// while locked_, the underlying mutex is unlocked. Mutex supports
// destruction while locked, but the same is undefined behavior for both
// pthread_mutex_t and std::mutex.
bool locked_ = false;
// Signaled before releasing the lock, in support of Await.
CondVarImpl released_;
};
// Do not use this implementation detail of CondVar and Mutex. A storage
// space for T that supports a LinkerInitialized constructor. T must
// have a default constructor, which is called by the first call to
// get(). T's destructor is never called if the LinkerInitialized
// constructor is called.
//
// Objects constructed with the default constructor are constructed and
// destructed like any other object, and should never be allocated in
// static storage.
//
// Objects constructed with the LinkerInitialized constructor should
// always be in static storage. For such objects, calls to get() are always
// valid, except from signal handlers.
//
// Note that this implementation relies on undefined language behavior that
// are known to hold for the set of supported compilers. An analysis
// follows.
//
// From the C++11 standard:
//
// [basic.life] says an object has non-trivial initialization if it is of
// class type and it is initialized by a constructor other than a trivial
// default constructor. (the LinkerInitialized constructor is
// non-trivial)
//
// [basic.life] says the lifetime of an object with a non-trivial
// constructor begins when the call to the constructor is complete.
//
// [basic.life] says the lifetime of an object with non-trivial destructor
// ends when the call to the destructor begins.
//
// [basic.life] p5 specifies undefined behavior when accessing non-static
// members of an instance outside its
// lifetime. (SynchronizationStorage::get() access non-static members)
//
// So, LinkerInitialized object of SynchronizationStorage uses a
// non-trivial constructor, which is called at some point during dynamic
// initialization, and is therefore subject to order of dynamic
// initialization bugs, where get() is called before the object's
// constructor is, resulting in undefined behavior.
//
// Similarly, a LinkerInitialized SynchronizationStorage object has a
// non-trivial destructor, and so its lifetime ends at some point during
// destruction of objects with static storage duration [basic.start.term]
// p4. There is a window where other exit code could call get() after this
// occurs, resulting in undefined behavior.
//
// Combined, these statements imply that LinkerInitialized instances
// of SynchronizationStorage<T> rely on undefined behavior.
//
// However, in practice, the implementation works on all supported
// compilers. Specifically, we rely on:
//
// a) zero-initialization being sufficient to initialize
// LinkerInitialized instances for the purposes of calling
// get(), regardless of when the constructor is called. This is
// because the is_dynamic_ boolean is correctly zero-initialized to
// false.
//
// b) the LinkerInitialized constructor is a NOP, and immaterial to
// even to concurrent calls to get().
//
// c) the destructor being a NOP for LinkerInitialized objects
// (guaranteed by a check for !is_dynamic_), and so any concurrent and
// subsequent calls to get() functioning as if the destructor were not
// called, by virtue of the instances' storage remaining valid after the
// destructor runs.
//
// d) That a-c apply transitively when SynchronizationStorage<T> is the
// only member of a class allocated in static storage.
//
// Nothing in the language standard guarantees that a-d hold. In practice,
// these hold in all supported compilers.
//
// Future direction:
//
// Ideally, we would simply use std::mutex or a similar class, which when
// allocated statically would support use immediately after static
// initialization up until static storage is reclaimed (i.e. the properties
// we require of all "linker initialized" instances).
//
// Regarding construction in static storage, std::mutex is required to
// provide a constexpr default constructor [thread.mutex.class], which
// ensures the instance's lifetime begins with static initialization
// [basic.start.init], and so is immune to any problems caused by the order
// of dynamic initialization. However, as of this writing Microsoft's
// Visual Studio does not provide a constexpr constructor for std::mutex.
// See
// https://blogs.msdn.microsoft.com/vcblog/2015/06/02/constexpr-complete-for-vs-2015-rtm-c11-compiler-c17-stl/
//
// Regarding destruction of instances in static storage, [basic.life] does
// say an object ends when storage in which the occupies is released, in
// the case of non-trivial destructor. However, std::mutex is not specified
// to have a trivial destructor.
//
// So, we would need a class with a constexpr default constructor and a
// trivial destructor. Today, we can achieve neither desired property using
// std::mutex directly.
template <typename T>
class SynchronizationStorage {
public:
// Instances allocated on the heap or on the stack should use the default
// constructor.
SynchronizationStorage()
: destruct_(true), once_() {}
constexpr explicit SynchronizationStorage(absl::ConstInitType)
: destruct_(false), once_(), space_{{0}} {}
SynchronizationStorage(SynchronizationStorage&) = delete;
SynchronizationStorage& operator=(SynchronizationStorage&) = delete;
~SynchronizationStorage() {
if (destruct_) {
get()->~T();
}
}
// Retrieve the object in storage. This is fast and thread safe, but does
// incur the cost of absl::call_once().
T* get() {
absl::call_once(once_, SynchronizationStorage::Construct, this);
return reinterpret_cast<T*>(&space_);
}
private:
static void Construct(SynchronizationStorage<T>* self) {
new (&self->space_) T();
}
// When true, T's destructor is run when this is destructed.
const bool destruct_;
absl::once_flag once_;
// An aligned space for the T.
alignas(T) unsigned char space_[sizeof(T)];
};
} // namespace synchronization_internal
ABSL_NAMESPACE_END
} // namespace absl

View file

@ -0,0 +1,106 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This file is a no-op if the required LowLevelAlloc support is missing.
#include "absl/base/internal/low_level_alloc.h"
#ifndef ABSL_LOW_LEVEL_ALLOC_MISSING
#include "absl/synchronization/internal/per_thread_sem.h"
#include <atomic>
#include "absl/base/attributes.h"
#include "absl/base/internal/thread_identity.h"
#include "absl/synchronization/internal/waiter.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
namespace synchronization_internal {
void PerThreadSem::SetThreadBlockedCounter(std::atomic<int> *counter) {
base_internal::ThreadIdentity *identity;
identity = GetOrCreateCurrentThreadIdentity();
identity->blocked_count_ptr = counter;
}
std::atomic<int> *PerThreadSem::GetThreadBlockedCounter() {
base_internal::ThreadIdentity *identity;
identity = GetOrCreateCurrentThreadIdentity();
return identity->blocked_count_ptr;
}
void PerThreadSem::Init(base_internal::ThreadIdentity *identity) {
new (Waiter::GetWaiter(identity)) Waiter();
identity->ticker.store(0, std::memory_order_relaxed);
identity->wait_start.store(0, std::memory_order_relaxed);
identity->is_idle.store(false, std::memory_order_relaxed);
}
void PerThreadSem::Destroy(base_internal::ThreadIdentity *identity) {
Waiter::GetWaiter(identity)->~Waiter();
}
void PerThreadSem::Tick(base_internal::ThreadIdentity *identity) {
const int ticker =
identity->ticker.fetch_add(1, std::memory_order_relaxed) + 1;
const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
if (wait_start && (ticker - wait_start > Waiter::kIdlePeriods) && !is_idle) {
// Wakeup the waiting thread since it is time for it to become idle.
Waiter::GetWaiter(identity)->Poke();
}
}
} // namespace synchronization_internal
ABSL_NAMESPACE_END
} // namespace absl
extern "C" {
ABSL_ATTRIBUTE_WEAK void AbslInternalPerThreadSemPost(
absl::base_internal::ThreadIdentity *identity) {
absl::synchronization_internal::Waiter::GetWaiter(identity)->Post();
}
ABSL_ATTRIBUTE_WEAK bool AbslInternalPerThreadSemWait(
absl::synchronization_internal::KernelTimeout t) {
bool timeout = false;
absl::base_internal::ThreadIdentity *identity;
identity = absl::synchronization_internal::GetOrCreateCurrentThreadIdentity();
// Ensure wait_start != 0.
int ticker = identity->ticker.load(std::memory_order_relaxed);
identity->wait_start.store(ticker ? ticker : 1, std::memory_order_relaxed);
identity->is_idle.store(false, std::memory_order_relaxed);
if (identity->blocked_count_ptr != nullptr) {
// Increment count of threads blocked in a given thread pool.
identity->blocked_count_ptr->fetch_add(1, std::memory_order_relaxed);
}
timeout =
!absl::synchronization_internal::Waiter::GetWaiter(identity)->Wait(t);
if (identity->blocked_count_ptr != nullptr) {
identity->blocked_count_ptr->fetch_sub(1, std::memory_order_relaxed);
}
identity->is_idle.store(false, std::memory_order_relaxed);
identity->wait_start.store(0, std::memory_order_relaxed);
return !timeout;
}
} // extern "C"
#endif // ABSL_LOW_LEVEL_ALLOC_MISSING

View file

@ -0,0 +1,115 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// PerThreadSem is a low-level synchronization primitive controlling the
// runnability of a single thread, used internally by Mutex and CondVar.
//
// This is NOT a general-purpose synchronization mechanism, and should not be
// used directly by applications. Applications should use Mutex and CondVar.
//
// The semantics of PerThreadSem are the same as that of a counting semaphore.
// Each thread maintains an abstract "count" value associated with its identity.
#ifndef ABSL_SYNCHRONIZATION_INTERNAL_PER_THREAD_SEM_H_
#define ABSL_SYNCHRONIZATION_INTERNAL_PER_THREAD_SEM_H_
#include <atomic>
#include "absl/base/internal/thread_identity.h"
#include "absl/synchronization/internal/create_thread_identity.h"
#include "absl/synchronization/internal/kernel_timeout.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
class Mutex;
namespace synchronization_internal {
class PerThreadSem {
public:
PerThreadSem() = delete;
PerThreadSem(const PerThreadSem&) = delete;
PerThreadSem& operator=(const PerThreadSem&) = delete;
// Routine invoked periodically (once a second) by a background thread.
// Has no effect on user-visible state.
static void Tick(base_internal::ThreadIdentity* identity);
// ---------------------------------------------------------------------------
// Routines used by autosizing threadpools to detect when threads are
// blocked. Each thread has a counter pointer, initially zero. If non-zero,
// the implementation atomically increments the counter when it blocks on a
// semaphore, a decrements it again when it wakes. This allows a threadpool
// to keep track of how many of its threads are blocked.
// SetThreadBlockedCounter() should be used only by threadpool
// implementations. GetThreadBlockedCounter() should be used by modules that
// block threads; if the pointer returned is non-zero, the location should be
// incremented before the thread blocks, and decremented after it wakes.
static void SetThreadBlockedCounter(std::atomic<int> *counter);
static std::atomic<int> *GetThreadBlockedCounter();
private:
// Create the PerThreadSem associated with "identity". Initializes count=0.
// REQUIRES: May only be called by ThreadIdentity.
static void Init(base_internal::ThreadIdentity* identity);
// Destroy the PerThreadSem associated with "identity".
// REQUIRES: May only be called by ThreadIdentity.
static void Destroy(base_internal::ThreadIdentity* identity);
// Increments "identity"'s count.
static inline void Post(base_internal::ThreadIdentity* identity);
// Waits until either our count > 0 or t has expired.
// If count > 0, decrements count and returns true. Otherwise returns false.
// !t.has_timeout() => Wait(t) will return true.
static inline bool Wait(KernelTimeout t);
// White-listed callers.
friend class PerThreadSemTest;
friend class absl::Mutex;
friend absl::base_internal::ThreadIdentity* CreateThreadIdentity();
friend void ReclaimThreadIdentity(void* v);
};
} // namespace synchronization_internal
ABSL_NAMESPACE_END
} // namespace absl
// In some build configurations we pass --detect-odr-violations to the
// gold linker. This causes it to flag weak symbol overrides as ODR
// violations. Because ODR only applies to C++ and not C,
// --detect-odr-violations ignores symbols not mangled with C++ names.
// By changing our extension points to be extern "C", we dodge this
// check.
extern "C" {
void AbslInternalPerThreadSemPost(
absl::base_internal::ThreadIdentity* identity);
bool AbslInternalPerThreadSemWait(
absl::synchronization_internal::KernelTimeout t);
} // extern "C"
void absl::synchronization_internal::PerThreadSem::Post(
absl::base_internal::ThreadIdentity* identity) {
AbslInternalPerThreadSemPost(identity);
}
bool absl::synchronization_internal::PerThreadSem::Wait(
absl::synchronization_internal::KernelTimeout t) {
return AbslInternalPerThreadSemWait(t);
}
#endif // ABSL_SYNCHRONIZATION_INTERNAL_PER_THREAD_SEM_H_

View file

@ -0,0 +1,180 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/internal/per_thread_sem.h"
#include <atomic>
#include <condition_variable> // NOLINT(build/c++11)
#include <functional>
#include <limits>
#include <mutex> // NOLINT(build/c++11)
#include <string>
#include <thread> // NOLINT(build/c++11)
#include "gtest/gtest.h"
#include "absl/base/internal/cycleclock.h"
#include "absl/base/internal/thread_identity.h"
#include "absl/strings/str_cat.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
// In this test we explicitly avoid the use of synchronization
// primitives which might use PerThreadSem, most notably absl::Mutex.
namespace absl {
ABSL_NAMESPACE_BEGIN
namespace synchronization_internal {
class SimpleSemaphore {
public:
SimpleSemaphore() : count_(0) {}
// Decrements (locks) the semaphore. If the semaphore's value is
// greater than zero, then the decrement proceeds, and the function
// returns, immediately. If the semaphore currently has the value
// zero, then the call blocks until it becomes possible to perform
// the decrement.
void Wait() {
std::unique_lock<std::mutex> lock(mu_);
cv_.wait(lock, [this]() { return count_ > 0; });
--count_;
cv_.notify_one();
}
// Increments (unlocks) the semaphore. If the semaphore's value
// consequently becomes greater than zero, then another thread
// blocked Wait() call will be woken up and proceed to lock the
// semaphore.
void Post() {
std::lock_guard<std::mutex> lock(mu_);
++count_;
cv_.notify_one();
}
private:
std::mutex mu_;
std::condition_variable cv_;
int count_;
};
struct ThreadData {
int num_iterations; // Number of replies to send.
SimpleSemaphore identity2_written; // Posted by thread writing identity2.
base_internal::ThreadIdentity *identity1; // First Post()-er.
base_internal::ThreadIdentity *identity2; // First Wait()-er.
KernelTimeout timeout;
};
// Need friendship with PerThreadSem.
class PerThreadSemTest : public testing::Test {
public:
static void TimingThread(ThreadData* t) {
t->identity2 = GetOrCreateCurrentThreadIdentity();
t->identity2_written.Post();
while (t->num_iterations--) {
Wait(t->timeout);
Post(t->identity1);
}
}
void TestTiming(const char *msg, bool timeout) {
static const int kNumIterations = 100;
ThreadData t;
t.num_iterations = kNumIterations;
t.timeout = timeout ?
KernelTimeout(absl::Now() + absl::Seconds(10000)) // far in the future
: KernelTimeout::Never();
t.identity1 = GetOrCreateCurrentThreadIdentity();
// We can't use the Thread class here because it uses the Mutex
// class which will invoke PerThreadSem, so we use std::thread instead.
std::thread partner_thread(std::bind(TimingThread, &t));
// Wait for our partner thread to register their identity.
t.identity2_written.Wait();
int64_t min_cycles = std::numeric_limits<int64_t>::max();
int64_t total_cycles = 0;
for (int i = 0; i < kNumIterations; ++i) {
absl::SleepFor(absl::Milliseconds(20));
int64_t cycles = base_internal::CycleClock::Now();
Post(t.identity2);
Wait(t.timeout);
cycles = base_internal::CycleClock::Now() - cycles;
min_cycles = std::min(min_cycles, cycles);
total_cycles += cycles;
}
std::string out = StrCat(
msg, "min cycle count=", min_cycles, " avg cycle count=",
absl::SixDigits(static_cast<double>(total_cycles) / kNumIterations));
printf("%s\n", out.c_str());
partner_thread.join();
}
protected:
static void Post(base_internal::ThreadIdentity *id) {
PerThreadSem::Post(id);
}
static bool Wait(KernelTimeout t) {
return PerThreadSem::Wait(t);
}
// convenience overload
static bool Wait(absl::Time t) {
return Wait(KernelTimeout(t));
}
static void Tick(base_internal::ThreadIdentity *identity) {
PerThreadSem::Tick(identity);
}
};
namespace {
TEST_F(PerThreadSemTest, WithoutTimeout) {
PerThreadSemTest::TestTiming("Without timeout: ", false);
}
TEST_F(PerThreadSemTest, WithTimeout) {
PerThreadSemTest::TestTiming("With timeout: ", true);
}
TEST_F(PerThreadSemTest, Timeouts) {
const absl::Duration delay = absl::Milliseconds(50);
const absl::Time start = absl::Now();
EXPECT_FALSE(Wait(start + delay));
const absl::Duration elapsed = absl::Now() - start;
// Allow for a slight early return, to account for quality of implementation
// issues on various platforms.
const absl::Duration slop = absl::Microseconds(200);
EXPECT_LE(delay - slop, elapsed)
<< "Wait returned " << delay - elapsed
<< " early (with " << slop << " slop), start time was " << start;
absl::Time negative_timeout = absl::UnixEpoch() - absl::Milliseconds(100);
EXPECT_FALSE(Wait(negative_timeout));
EXPECT_LE(negative_timeout, absl::Now() + slop); // trivially true :)
Post(GetOrCreateCurrentThreadIdentity());
// The wait here has an expired timeout, but we have a wake to consume,
// so this should succeed
EXPECT_TRUE(Wait(negative_timeout));
}
} // namespace
} // namespace synchronization_internal
ABSL_NAMESPACE_END
} // namespace absl

View file

@ -0,0 +1,93 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
#define ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
#include <cassert>
#include <cstddef>
#include <functional>
#include <queue>
#include <thread> // NOLINT(build/c++11)
#include <vector>
#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
namespace synchronization_internal {
// A simple ThreadPool implementation for tests.
class ThreadPool {
public:
explicit ThreadPool(int num_threads) {
for (int i = 0; i < num_threads; ++i) {
threads_.push_back(std::thread(&ThreadPool::WorkLoop, this));
}
}
ThreadPool(const ThreadPool &) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;
~ThreadPool() {
{
absl::MutexLock l(&mu_);
for (size_t i = 0; i < threads_.size(); i++) {
queue_.push(nullptr); // Shutdown signal.
}
}
for (auto &t : threads_) {
t.join();
}
}
// Schedule a function to be run on a ThreadPool thread immediately.
void Schedule(std::function<void()> func) {
assert(func != nullptr);
absl::MutexLock l(&mu_);
queue_.push(std::move(func));
}
private:
bool WorkAvailable() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
return !queue_.empty();
}
void WorkLoop() {
while (true) {
std::function<void()> func;
{
absl::MutexLock l(&mu_);
mu_.Await(absl::Condition(this, &ThreadPool::WorkAvailable));
func = std::move(queue_.front());
queue_.pop();
}
if (func == nullptr) { // Shutdown signal.
break;
}
func();
}
}
absl::Mutex mu_;
std::queue<std::function<void()>> queue_ ABSL_GUARDED_BY(mu_);
std::vector<std::thread> threads_;
};
} // namespace synchronization_internal
ABSL_NAMESPACE_END
} // namespace absl
#endif // ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_

View file

@ -0,0 +1,492 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/internal/waiter.h"
#include "absl/base/config.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <pthread.h>
#include <sys/time.h>
#include <unistd.h>
#endif
#ifdef __linux__
#include <linux/futex.h>
#include <sys/syscall.h>
#endif
#ifdef ABSL_HAVE_SEMAPHORE_H
#include <semaphore.h>
#endif
#include <errno.h>
#include <stdio.h>
#include <time.h>
#include <atomic>
#include <cassert>
#include <cstdint>
#include <new>
#include <type_traits>
#include "absl/base/internal/raw_logging.h"
#include "absl/base/internal/thread_identity.h"
#include "absl/base/optimization.h"
#include "absl/synchronization/internal/kernel_timeout.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
namespace synchronization_internal {
static void MaybeBecomeIdle() {
base_internal::ThreadIdentity *identity =
base_internal::CurrentThreadIdentityIfPresent();
assert(identity != nullptr);
const bool is_idle = identity->is_idle.load(std::memory_order_relaxed);
const int ticker = identity->ticker.load(std::memory_order_relaxed);
const int wait_start = identity->wait_start.load(std::memory_order_relaxed);
if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) {
identity->is_idle.store(true, std::memory_order_relaxed);
}
}
#if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
// Some Android headers are missing these definitions even though they
// support these futex operations.
#ifdef __BIONIC__
#ifndef SYS_futex
#define SYS_futex __NR_futex
#endif
#ifndef FUTEX_WAIT_BITSET
#define FUTEX_WAIT_BITSET 9
#endif
#ifndef FUTEX_PRIVATE_FLAG
#define FUTEX_PRIVATE_FLAG 128
#endif
#ifndef FUTEX_CLOCK_REALTIME
#define FUTEX_CLOCK_REALTIME 256
#endif
#ifndef FUTEX_BITSET_MATCH_ANY
#define FUTEX_BITSET_MATCH_ANY 0xFFFFFFFF
#endif
#endif
#if defined(__NR_futex_time64) && !defined(SYS_futex_time64)
#define SYS_futex_time64 __NR_futex_time64
#endif
#if defined(SYS_futex_time64) && !defined(SYS_futex)
#define SYS_futex SYS_futex_time64
#endif
class Futex {
public:
static int WaitUntil(std::atomic<int32_t> *v, int32_t val,
KernelTimeout t) {
int err = 0;
if (t.has_timeout()) {
// https://locklessinc.com/articles/futex_cheat_sheet/
// Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time.
struct timespec abs_timeout = t.MakeAbsTimespec();
// Atomically check that the futex value is still 0, and if it
// is, sleep until abs_timeout or until woken by FUTEX_WAKE.
err = syscall(
SYS_futex, reinterpret_cast<int32_t *>(v),
FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val,
&abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY);
} else {
// Atomically check that the futex value is still 0, and if it
// is, sleep until woken by FUTEX_WAKE.
err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr);
}
if (err != 0) {
err = -errno;
}
return err;
}
static int Wake(std::atomic<int32_t> *v, int32_t count) {
int err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v),
FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count);
if (ABSL_PREDICT_FALSE(err < 0)) {
err = -errno;
}
return err;
}
};
Waiter::Waiter() {
futex_.store(0, std::memory_order_relaxed);
}
Waiter::~Waiter() = default;
bool Waiter::Wait(KernelTimeout t) {
// Loop until we can atomically decrement futex from a positive
// value, waiting on a futex while we believe it is zero.
// Note that, since the thread ticker is just reset, we don't need to check
// whether the thread is idle on the very first pass of the loop.
bool first_pass = true;
while (true) {
int32_t x = futex_.load(std::memory_order_relaxed);
while (x != 0) {
if (!futex_.compare_exchange_weak(x, x - 1,
std::memory_order_acquire,
std::memory_order_relaxed)) {
continue; // Raced with someone, retry.
}
return true; // Consumed a wakeup, we are done.
}
if (!first_pass) MaybeBecomeIdle();
const int err = Futex::WaitUntil(&futex_, 0, t);
if (err != 0) {
if (err == -EINTR || err == -EWOULDBLOCK) {
// Do nothing, the loop will retry.
} else if (err == -ETIMEDOUT) {
return false;
} else {
ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
}
}
first_pass = false;
}
}
void Waiter::Post() {
if (futex_.fetch_add(1, std::memory_order_release) == 0) {
// We incremented from 0, need to wake a potential waiter.
Poke();
}
}
void Waiter::Poke() {
// Wake one thread waiting on the futex.
const int err = Futex::Wake(&futex_, 1);
if (ABSL_PREDICT_FALSE(err < 0)) {
ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err);
}
}
#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
class PthreadMutexHolder {
public:
explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) {
const int err = pthread_mutex_lock(mu_);
if (err != 0) {
ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err);
}
}
PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete;
PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete;
~PthreadMutexHolder() {
const int err = pthread_mutex_unlock(mu_);
if (err != 0) {
ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err);
}
}
private:
pthread_mutex_t *mu_;
};
Waiter::Waiter() {
const int err = pthread_mutex_init(&mu_, 0);
if (err != 0) {
ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
}
const int err2 = pthread_cond_init(&cv_, 0);
if (err2 != 0) {
ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
}
waiter_count_ = 0;
wakeup_count_ = 0;
}
Waiter::~Waiter() {
const int err = pthread_mutex_destroy(&mu_);
if (err != 0) {
ABSL_RAW_LOG(FATAL, "pthread_mutex_destroy failed: %d", err);
}
const int err2 = pthread_cond_destroy(&cv_);
if (err2 != 0) {
ABSL_RAW_LOG(FATAL, "pthread_cond_destroy failed: %d", err2);
}
}
bool Waiter::Wait(KernelTimeout t) {
struct timespec abs_timeout;
if (t.has_timeout()) {
abs_timeout = t.MakeAbsTimespec();
}
PthreadMutexHolder h(&mu_);
++waiter_count_;
// Loop until we find a wakeup to consume or timeout.
// Note that, since the thread ticker is just reset, we don't need to check
// whether the thread is idle on the very first pass of the loop.
bool first_pass = true;
while (wakeup_count_ == 0) {
if (!first_pass) MaybeBecomeIdle();
// No wakeups available, time to wait.
if (!t.has_timeout()) {
const int err = pthread_cond_wait(&cv_, &mu_);
if (err != 0) {
ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
}
} else {
const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
if (err == ETIMEDOUT) {
--waiter_count_;
return false;
}
if (err != 0) {
ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err);
}
}
first_pass = false;
}
// Consume a wakeup and we're done.
--wakeup_count_;
--waiter_count_;
return true;
}
void Waiter::Post() {
PthreadMutexHolder h(&mu_);
++wakeup_count_;
InternalCondVarPoke();
}
void Waiter::Poke() {
PthreadMutexHolder h(&mu_);
InternalCondVarPoke();
}
void Waiter::InternalCondVarPoke() {
if (waiter_count_ != 0) {
const int err = pthread_cond_signal(&cv_);
if (ABSL_PREDICT_FALSE(err != 0)) {
ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
}
}
}
#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
Waiter::Waiter() {
if (sem_init(&sem_, 0, 0) != 0) {
ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
}
wakeups_.store(0, std::memory_order_relaxed);
}
Waiter::~Waiter() {
if (sem_destroy(&sem_) != 0) {
ABSL_RAW_LOG(FATAL, "sem_destroy failed with errno %d\n", errno);
}
}
bool Waiter::Wait(KernelTimeout t) {
struct timespec abs_timeout;
if (t.has_timeout()) {
abs_timeout = t.MakeAbsTimespec();
}
// Loop until we timeout or consume a wakeup.
// Note that, since the thread ticker is just reset, we don't need to check
// whether the thread is idle on the very first pass of the loop.
bool first_pass = true;
while (true) {
int x = wakeups_.load(std::memory_order_relaxed);
while (x != 0) {
if (!wakeups_.compare_exchange_weak(x, x - 1,
std::memory_order_acquire,
std::memory_order_relaxed)) {
continue; // Raced with someone, retry.
}
// Successfully consumed a wakeup, we're done.
return true;
}
if (!first_pass) MaybeBecomeIdle();
// Nothing to consume, wait (looping on EINTR).
while (true) {
if (!t.has_timeout()) {
if (sem_wait(&sem_) == 0) break;
if (errno == EINTR) continue;
ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno);
} else {
if (sem_timedwait(&sem_, &abs_timeout) == 0) break;
if (errno == EINTR) continue;
if (errno == ETIMEDOUT) return false;
ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno);
}
}
first_pass = false;
}
}
void Waiter::Post() {
// Post a wakeup.
if (wakeups_.fetch_add(1, std::memory_order_release) == 0) {
// We incremented from 0, need to wake a potential waiter.
Poke();
}
}
void Waiter::Poke() {
if (sem_post(&sem_) != 0) { // Wake any semaphore waiter.
ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno);
}
}
#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
class Waiter::WinHelper {
public:
static SRWLOCK *GetLock(Waiter *w) {
return reinterpret_cast<SRWLOCK *>(&w->mu_storage_);
}
static CONDITION_VARIABLE *GetCond(Waiter *w) {
return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_);
}
static_assert(sizeof(SRWLOCK) == sizeof(void *),
"`mu_storage_` does not have the same size as SRWLOCK");
static_assert(alignof(SRWLOCK) == alignof(void *),
"`mu_storage_` does not have the same alignment as SRWLOCK");
static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *),
"`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size "
"as `CONDITION_VARIABLE`");
static_assert(
alignof(CONDITION_VARIABLE) == alignof(void *),
"`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`");
// The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible
// and destructible because we never call their constructors or destructors.
static_assert(std::is_trivially_constructible<SRWLOCK>::value,
"The `SRWLOCK` type must be trivially constructible");
static_assert(
std::is_trivially_constructible<CONDITION_VARIABLE>::value,
"The `CONDITION_VARIABLE` type must be trivially constructible");
static_assert(std::is_trivially_destructible<SRWLOCK>::value,
"The `SRWLOCK` type must be trivially destructible");
static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value,
"The `CONDITION_VARIABLE` type must be trivially destructible");
};
class LockHolder {
public:
explicit LockHolder(SRWLOCK* mu) : mu_(mu) {
AcquireSRWLockExclusive(mu_);
}
LockHolder(const LockHolder&) = delete;
LockHolder& operator=(const LockHolder&) = delete;
~LockHolder() {
ReleaseSRWLockExclusive(mu_);
}
private:
SRWLOCK* mu_;
};
Waiter::Waiter() {
auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
InitializeSRWLock(mu);
InitializeConditionVariable(cv);
waiter_count_ = 0;
wakeup_count_ = 0;
}
// SRW locks and condition variables do not need to be explicitly destroyed.
// https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock
// https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with
Waiter::~Waiter() = default;
bool Waiter::Wait(KernelTimeout t) {
SRWLOCK *mu = WinHelper::GetLock(this);
CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
LockHolder h(mu);
++waiter_count_;
// Loop until we find a wakeup to consume or timeout.
// Note that, since the thread ticker is just reset, we don't need to check
// whether the thread is idle on the very first pass of the loop.
bool first_pass = true;
while (wakeup_count_ == 0) {
if (!first_pass) MaybeBecomeIdle();
// No wakeups available, time to wait.
if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
// GetLastError() returns a Win32 DWORD, but we assign to
// unsigned long to simplify the ABSL_RAW_LOG case below. The uniform
// initialization guarantees this is not a narrowing conversion.
const unsigned long err{GetLastError()}; // NOLINT(runtime/int)
if (err == ERROR_TIMEOUT) {
--waiter_count_;
return false;
} else {
ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
}
}
first_pass = false;
}
// Consume a wakeup and we're done.
--wakeup_count_;
--waiter_count_;
return true;
}
void Waiter::Post() {
LockHolder h(WinHelper::GetLock(this));
++wakeup_count_;
InternalCondVarPoke();
}
void Waiter::Poke() {
LockHolder h(WinHelper::GetLock(this));
InternalCondVarPoke();
}
void Waiter::InternalCondVarPoke() {
if (waiter_count_ != 0) {
WakeConditionVariable(WinHelper::GetCond(this));
}
}
#else
#error Unknown ABSL_WAITER_MODE
#endif
} // namespace synchronization_internal
ABSL_NAMESPACE_END
} // namespace absl

View file

@ -0,0 +1,159 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#ifndef ABSL_SYNCHRONIZATION_INTERNAL_WAITER_H_
#define ABSL_SYNCHRONIZATION_INTERNAL_WAITER_H_
#include "absl/base/config.h"
#ifdef _WIN32
#include <sdkddkver.h>
#else
#include <pthread.h>
#endif
#ifdef __linux__
#include <linux/futex.h>
#endif
#ifdef ABSL_HAVE_SEMAPHORE_H
#include <semaphore.h>
#endif
#include <atomic>
#include <cstdint>
#include "absl/base/internal/thread_identity.h"
#include "absl/synchronization/internal/kernel_timeout.h"
// May be chosen at compile time via -DABSL_FORCE_WAITER_MODE=<index>
#define ABSL_WAITER_MODE_FUTEX 0
#define ABSL_WAITER_MODE_SEM 1
#define ABSL_WAITER_MODE_CONDVAR 2
#define ABSL_WAITER_MODE_WIN32 3
#if defined(ABSL_FORCE_WAITER_MODE)
#define ABSL_WAITER_MODE ABSL_FORCE_WAITER_MODE
#elif defined(_WIN32) && _WIN32_WINNT >= _WIN32_WINNT_VISTA
#define ABSL_WAITER_MODE ABSL_WAITER_MODE_WIN32
#elif defined(__BIONIC__)
// Bionic supports all the futex operations we need even when some of the futex
// definitions are missing.
#define ABSL_WAITER_MODE ABSL_WAITER_MODE_FUTEX
#elif defined(__linux__) && defined(FUTEX_CLOCK_REALTIME)
// FUTEX_CLOCK_REALTIME requires Linux >= 2.6.28.
#define ABSL_WAITER_MODE ABSL_WAITER_MODE_FUTEX
#elif defined(ABSL_HAVE_SEMAPHORE_H)
#define ABSL_WAITER_MODE ABSL_WAITER_MODE_SEM
#else
#define ABSL_WAITER_MODE ABSL_WAITER_MODE_CONDVAR
#endif
namespace absl {
ABSL_NAMESPACE_BEGIN
namespace synchronization_internal {
// Waiter is an OS-specific semaphore.
class Waiter {
public:
// Prepare any data to track waits.
Waiter();
// Not copyable or movable
Waiter(const Waiter&) = delete;
Waiter& operator=(const Waiter&) = delete;
// Destroy any data to track waits.
~Waiter();
// Blocks the calling thread until a matching call to `Post()` or
// `t` has passed. Returns `true` if woken (`Post()` called),
// `false` on timeout.
bool Wait(KernelTimeout t);
// Restart the caller of `Wait()` as with a normal semaphore.
void Post();
// If anyone is waiting, wake them up temporarily and cause them to
// call `MaybeBecomeIdle()`. They will then return to waiting for a
// `Post()` or timeout.
void Poke();
// Returns the Waiter associated with the identity.
static Waiter* GetWaiter(base_internal::ThreadIdentity* identity) {
static_assert(
sizeof(Waiter) <= sizeof(base_internal::ThreadIdentity::WaiterState),
"Insufficient space for Waiter");
return reinterpret_cast<Waiter*>(identity->waiter_state.data);
}
// How many periods to remain idle before releasing resources
#ifndef THREAD_SANITIZER
static constexpr int kIdlePeriods = 60;
#else
// Memory consumption under ThreadSanitizer is a serious concern,
// so we release resources sooner. The value of 1 leads to 1 to 2 second
// delay before marking a thread as idle.
static const int kIdlePeriods = 1;
#endif
private:
#if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX
// Futexes are defined by specification to be 32-bits.
// Thus std::atomic<int32_t> must be just an int32_t with lockfree methods.
std::atomic<int32_t> futex_;
static_assert(sizeof(int32_t) == sizeof(futex_), "Wrong size for futex");
#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
// REQUIRES: mu_ must be held.
void InternalCondVarPoke();
pthread_mutex_t mu_;
pthread_cond_t cv_;
int waiter_count_;
int wakeup_count_; // Unclaimed wakeups.
#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
sem_t sem_;
// This seems superfluous, but for Poke() we need to cause spurious
// wakeups on the semaphore. Hence we can't actually use the
// semaphore's count.
std::atomic<int> wakeups_;
#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
// WinHelper - Used to define utilities for accessing the lock and
// condition variable storage once the types are complete.
class WinHelper;
// REQUIRES: WinHelper::GetLock(this) must be held.
void InternalCondVarPoke();
// We can't include Windows.h in our headers, so we use aligned charachter
// buffers to define the storage of SRWLOCK and CONDITION_VARIABLE.
alignas(void*) unsigned char mu_storage_[sizeof(void*)];
alignas(void*) unsigned char cv_storage_[sizeof(void*)];
int waiter_count_;
int wakeup_count_;
#else
#error Unknown ABSL_WAITER_MODE
#endif
};
} // namespace synchronization_internal
ABSL_NAMESPACE_END
} // namespace absl
#endif // ABSL_SYNCHRONIZATION_INTERNAL_WAITER_H_

View file

@ -0,0 +1,181 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <cstdlib>
#include <thread> // NOLINT(build/c++11), Abseil test
#include <type_traits>
#include "absl/base/attributes.h"
#include "absl/base/const_init.h"
#include "absl/base/internal/raw_logging.h"
#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
#include "absl/synchronization/notification.h"
namespace {
// A two-threaded test which checks that Mutex, CondVar, and Notification have
// correct basic functionality. The intent is to establish that they
// function correctly in various phases of construction and destruction.
//
// Thread one acquires a lock on 'mutex', wakes thread two via 'notification',
// then waits for 'state' to be set, as signalled by 'condvar'.
//
// Thread two waits on 'notification', then sets 'state' inside the 'mutex',
// signalling the change via 'condvar'.
//
// These tests use ABSL_RAW_CHECK to validate invariants, rather than EXPECT or
// ASSERT from gUnit, because we need to invoke them during global destructors,
// when gUnit teardown would have already begun.
void ThreadOne(absl::Mutex* mutex, absl::CondVar* condvar,
absl::Notification* notification, bool* state) {
// Test that the notification is in a valid initial state.
ABSL_RAW_CHECK(!notification->HasBeenNotified(), "invalid Notification");
ABSL_RAW_CHECK(*state == false, "*state not initialized");
{
absl::MutexLock lock(mutex);
notification->Notify();
ABSL_RAW_CHECK(notification->HasBeenNotified(), "invalid Notification");
while (*state == false) {
condvar->Wait(mutex);
}
}
}
void ThreadTwo(absl::Mutex* mutex, absl::CondVar* condvar,
absl::Notification* notification, bool* state) {
ABSL_RAW_CHECK(*state == false, "*state not initialized");
// Wake thread one
notification->WaitForNotification();
ABSL_RAW_CHECK(notification->HasBeenNotified(), "invalid Notification");
{
absl::MutexLock lock(mutex);
*state = true;
condvar->Signal();
}
}
// Launch thread 1 and thread 2, and block on their completion.
// If any of 'mutex', 'condvar', or 'notification' is nullptr, use a locally
// constructed instance instead.
void RunTests(absl::Mutex* mutex, absl::CondVar* condvar) {
absl::Mutex default_mutex;
absl::CondVar default_condvar;
absl::Notification notification;
if (!mutex) {
mutex = &default_mutex;
}
if (!condvar) {
condvar = &default_condvar;
}
bool state = false;
std::thread thread_one(ThreadOne, mutex, condvar, &notification, &state);
std::thread thread_two(ThreadTwo, mutex, condvar, &notification, &state);
thread_one.join();
thread_two.join();
}
void TestLocals() {
absl::Mutex mutex;
absl::CondVar condvar;
RunTests(&mutex, &condvar);
}
// Normal kConstInit usage
ABSL_CONST_INIT absl::Mutex const_init_mutex(absl::kConstInit);
void TestConstInitGlobal() { RunTests(&const_init_mutex, nullptr); }
// Global variables during start and termination
//
// In a translation unit, static storage duration variables are initialized in
// the order of their definitions, and destroyed in the reverse order of their
// definitions. We can use this to arrange for tests to be run on these objects
// before they are created, and after they are destroyed.
using Function = void (*)();
class OnConstruction {
public:
explicit OnConstruction(Function fn) { fn(); }
};
class OnDestruction {
public:
explicit OnDestruction(Function fn) : fn_(fn) {}
~OnDestruction() { fn_(); }
private:
Function fn_;
};
// These tests require that the compiler correctly supports C++11 constant
// initialization... but MSVC has a known regression since v19.10:
// https://developercommunity.visualstudio.com/content/problem/336946/class-with-constexpr-constructor-not-using-static.html
// TODO(epastor): Limit the affected range once MSVC fixes this bug.
#if defined(__clang__) || !(defined(_MSC_VER) && _MSC_VER > 1900)
// kConstInit
// Test early usage. (Declaration comes first; definitions must appear after
// the test runner.)
extern absl::Mutex early_const_init_mutex;
// (Normally I'd write this +[], to make the cast-to-function-pointer explicit,
// but in some MSVC setups we support, lambdas provide conversion operators to
// different flavors of function pointers, making this trick ambiguous.)
OnConstruction test_early_const_init([] {
RunTests(&early_const_init_mutex, nullptr);
});
// This definition appears before test_early_const_init, but it should be
// initialized first (due to constant initialization). Test that the object
// actually works when constructed this way.
ABSL_CONST_INIT absl::Mutex early_const_init_mutex(absl::kConstInit);
// Furthermore, test that the const-init c'tor doesn't stomp over the state of
// a Mutex. Really, this is a test that the platform under test correctly
// supports C++11 constant initialization. (The constant-initialization
// constructors of globals "happen at link time"; memory is pre-initialized,
// before the constructors of either grab_lock or check_still_locked are run.)
extern absl::Mutex const_init_sanity_mutex;
OnConstruction grab_lock([]() ABSL_NO_THREAD_SAFETY_ANALYSIS {
const_init_sanity_mutex.Lock();
});
ABSL_CONST_INIT absl::Mutex const_init_sanity_mutex(absl::kConstInit);
OnConstruction check_still_locked([]() ABSL_NO_THREAD_SAFETY_ANALYSIS {
const_init_sanity_mutex.AssertHeld();
const_init_sanity_mutex.Unlock();
});
#endif // defined(__clang__) || !(defined(_MSC_VER) && _MSC_VER > 1900)
// Test shutdown usage. (Declarations come first; definitions must appear after
// the test runner.)
extern absl::Mutex late_const_init_mutex;
// OnDestruction is being used here as a global variable, even though it has a
// non-trivial destructor. This is against the style guide. We're violating
// that rule here to check that the exception we allow for kConstInit is safe.
// NOLINTNEXTLINE
OnDestruction test_late_const_init([] {
RunTests(&late_const_init_mutex, nullptr);
});
ABSL_CONST_INIT absl::Mutex late_const_init_mutex(absl::kConstInit);
} // namespace
int main() {
TestLocals();
TestConstInitGlobal();
// Explicitly call exit(0) here, to make it clear that we intend for the
// above global object destructors to run.
std::exit(0);
}

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,223 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <cstdint>
#include <mutex> // NOLINT(build/c++11)
#include <vector>
#include "absl/base/internal/cycleclock.h"
#include "absl/base/internal/spinlock.h"
#include "absl/synchronization/blocking_counter.h"
#include "absl/synchronization/internal/thread_pool.h"
#include "absl/synchronization/mutex.h"
#include "benchmark/benchmark.h"
namespace {
void BM_Mutex(benchmark::State& state) {
static absl::Mutex* mu = new absl::Mutex;
for (auto _ : state) {
absl::MutexLock lock(mu);
}
}
BENCHMARK(BM_Mutex)->UseRealTime()->Threads(1)->ThreadPerCpu();
static void DelayNs(int64_t ns, int* data) {
int64_t end = absl::base_internal::CycleClock::Now() +
ns * absl::base_internal::CycleClock::Frequency() / 1e9;
while (absl::base_internal::CycleClock::Now() < end) {
++(*data);
benchmark::DoNotOptimize(*data);
}
}
template <typename MutexType>
class RaiiLocker {
public:
explicit RaiiLocker(MutexType* mu) : mu_(mu) { mu_->Lock(); }
~RaiiLocker() { mu_->Unlock(); }
private:
MutexType* mu_;
};
template <>
class RaiiLocker<std::mutex> {
public:
explicit RaiiLocker(std::mutex* mu) : mu_(mu) { mu_->lock(); }
~RaiiLocker() { mu_->unlock(); }
private:
std::mutex* mu_;
};
template <typename MutexType>
void BM_Contended(benchmark::State& state) {
struct Shared {
MutexType mu;
int data = 0;
};
static auto* shared = new Shared;
int local = 0;
for (auto _ : state) {
// Here we model both local work outside of the critical section as well as
// some work inside of the critical section. The idea is to capture some
// more or less realisitic contention levels.
// If contention is too low, the benchmark won't measure anything useful.
// If contention is unrealistically high, the benchmark will favor
// bad mutex implementations that block and otherwise distract threads
// from the mutex and shared state for as much as possible.
// To achieve this amount of local work is multiplied by number of threads
// to keep ratio between local work and critical section approximately
// equal regardless of number of threads.
DelayNs(100 * state.threads, &local);
RaiiLocker<MutexType> locker(&shared->mu);
DelayNs(state.range(0), &shared->data);
}
}
BENCHMARK_TEMPLATE(BM_Contended, absl::Mutex)
->UseRealTime()
// ThreadPerCpu poorly handles non-power-of-two CPU counts.
->Threads(1)
->Threads(2)
->Threads(4)
->Threads(6)
->Threads(8)
->Threads(12)
->Threads(16)
->Threads(24)
->Threads(32)
->Threads(48)
->Threads(64)
->Threads(96)
->Threads(128)
->Threads(192)
->Threads(256)
// Some empirically chosen amounts of work in critical section.
// 1 is low contention, 200 is high contention and few values in between.
->Arg(1)
->Arg(20)
->Arg(50)
->Arg(200);
BENCHMARK_TEMPLATE(BM_Contended, absl::base_internal::SpinLock)
->UseRealTime()
// ThreadPerCpu poorly handles non-power-of-two CPU counts.
->Threads(1)
->Threads(2)
->Threads(4)
->Threads(6)
->Threads(8)
->Threads(12)
->Threads(16)
->Threads(24)
->Threads(32)
->Threads(48)
->Threads(64)
->Threads(96)
->Threads(128)
->Threads(192)
->Threads(256)
// Some empirically chosen amounts of work in critical section.
// 1 is low contention, 200 is high contention and few values in between.
->Arg(1)
->Arg(20)
->Arg(50)
->Arg(200);
BENCHMARK_TEMPLATE(BM_Contended, std::mutex)
->UseRealTime()
// ThreadPerCpu poorly handles non-power-of-two CPU counts.
->Threads(1)
->Threads(2)
->Threads(4)
->Threads(6)
->Threads(8)
->Threads(12)
->Threads(16)
->Threads(24)
->Threads(32)
->Threads(48)
->Threads(64)
->Threads(96)
->Threads(128)
->Threads(192)
->Threads(256)
// Some empirically chosen amounts of work in critical section.
// 1 is low contention, 200 is high contention and few values in between.
->Arg(1)
->Arg(20)
->Arg(50)
->Arg(200);
// Measure the overhead of conditions on mutex release (when they must be
// evaluated). Mutex has (some) support for equivalence classes allowing
// Conditions with the same function/argument to potentially not be multiply
// evaluated.
//
// num_classes==0 is used for the special case of every waiter being distinct.
void BM_ConditionWaiters(benchmark::State& state) {
int num_classes = state.range(0);
int num_waiters = state.range(1);
struct Helper {
static void Waiter(absl::BlockingCounter* init, absl::Mutex* m, int* p) {
init->DecrementCount();
m->LockWhen(absl::Condition(
static_cast<bool (*)(int*)>([](int* v) { return *v == 0; }), p));
m->Unlock();
}
};
if (num_classes == 0) {
// No equivalence classes.
num_classes = num_waiters;
}
absl::BlockingCounter init(num_waiters);
absl::Mutex mu;
std::vector<int> equivalence_classes(num_classes, 1);
// Must be declared last to be destroyed first.
absl::synchronization_internal::ThreadPool pool(num_waiters);
for (int i = 0; i < num_waiters; i++) {
// Mutex considers Conditions with the same function and argument
// to be equivalent.
pool.Schedule([&, i] {
Helper::Waiter(&init, &mu, &equivalence_classes[i % num_classes]);
});
}
init.Wait();
for (auto _ : state) {
mu.Lock();
mu.Unlock(); // Each unlock requires Condition evaluation for our waiters.
}
mu.Lock();
for (int i = 0; i < num_classes; i++) {
equivalence_classes[i] = 0;
}
mu.Unlock();
}
// Some configurations have higher thread limits than others.
#if defined(__linux__) && !defined(THREAD_SANITIZER)
constexpr int kMaxConditionWaiters = 8192;
#else
constexpr int kMaxConditionWaiters = 1024;
#endif
BENCHMARK(BM_ConditionWaiters)->RangePair(0, 2, 1, kMaxConditionWaiters);
} // namespace

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,78 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/notification.h"
#include <atomic>
#include "absl/base/attributes.h"
#include "absl/base/internal/raw_logging.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
void Notification::Notify() {
MutexLock l(&this->mutex_);
#ifndef NDEBUG
if (ABSL_PREDICT_FALSE(notified_yet_.load(std::memory_order_relaxed))) {
ABSL_RAW_LOG(
FATAL,
"Notify() method called more than once for Notification object %p",
static_cast<void *>(this));
}
#endif
notified_yet_.store(true, std::memory_order_release);
}
Notification::~Notification() {
// Make sure that the thread running Notify() exits before the object is
// destructed.
MutexLock l(&this->mutex_);
}
void Notification::WaitForNotification() const {
if (!HasBeenNotifiedInternal(&this->notified_yet_)) {
this->mutex_.LockWhen(Condition(&HasBeenNotifiedInternal,
&this->notified_yet_));
this->mutex_.Unlock();
}
}
bool Notification::WaitForNotificationWithTimeout(
absl::Duration timeout) const {
bool notified = HasBeenNotifiedInternal(&this->notified_yet_);
if (!notified) {
notified = this->mutex_.LockWhenWithTimeout(
Condition(&HasBeenNotifiedInternal, &this->notified_yet_), timeout);
this->mutex_.Unlock();
}
return notified;
}
bool Notification::WaitForNotificationWithDeadline(absl::Time deadline) const {
bool notified = HasBeenNotifiedInternal(&this->notified_yet_);
if (!notified) {
notified = this->mutex_.LockWhenWithDeadline(
Condition(&HasBeenNotifiedInternal, &this->notified_yet_), deadline);
this->mutex_.Unlock();
}
return notified;
}
ABSL_NAMESPACE_END
} // namespace absl

View file

@ -0,0 +1,123 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// -----------------------------------------------------------------------------
// notification.h
// -----------------------------------------------------------------------------
//
// This header file defines a `Notification` abstraction, which allows threads
// to receive notification of a single occurrence of a single event.
//
// The `Notification` object maintains a private boolean "notified" state that
// transitions to `true` at most once. The `Notification` class provides the
// following primary member functions:
// * `HasBeenNotified() `to query its state
// * `WaitForNotification*()` to have threads wait until the "notified" state
// is `true`.
// * `Notify()` to set the notification's "notified" state to `true` and
// notify all waiting threads that the event has occurred.
// This method may only be called once.
//
// Note that while `Notify()` may only be called once, it is perfectly valid to
// call any of the `WaitForNotification*()` methods multiple times, from
// multiple threads -- even after the notification's "notified" state has been
// set -- in which case those methods will immediately return.
//
// Note that the lifetime of a `Notification` requires careful consideration;
// it might not be safe to destroy a notification after calling `Notify()` since
// it is still legal for other threads to call `WaitForNotification*()` methods
// on the notification. However, observers responding to a "notified" state of
// `true` can safely delete the notification without interfering with the call
// to `Notify()` in the other thread.
//
// Memory ordering: For any threads X and Y, if X calls `Notify()`, then any
// action taken by X before it calls `Notify()` is visible to thread Y after:
// * Y returns from `WaitForNotification()`, or
// * Y receives a `true` return value from either `HasBeenNotified()` or
// `WaitForNotificationWithTimeout()`.
#ifndef ABSL_SYNCHRONIZATION_NOTIFICATION_H_
#define ABSL_SYNCHRONIZATION_NOTIFICATION_H_
#include <atomic>
#include "absl/base/macros.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
// -----------------------------------------------------------------------------
// Notification
// -----------------------------------------------------------------------------
class Notification {
public:
// Initializes the "notified" state to unnotified.
Notification() : notified_yet_(false) {}
explicit Notification(bool prenotify) : notified_yet_(prenotify) {}
Notification(const Notification&) = delete;
Notification& operator=(const Notification&) = delete;
~Notification();
// Notification::HasBeenNotified()
//
// Returns the value of the notification's internal "notified" state.
bool HasBeenNotified() const {
return HasBeenNotifiedInternal(&this->notified_yet_);
}
// Notification::WaitForNotification()
//
// Blocks the calling thread until the notification's "notified" state is
// `true`. Note that if `Notify()` has been previously called on this
// notification, this function will immediately return.
void WaitForNotification() const;
// Notification::WaitForNotificationWithTimeout()
//
// Blocks until either the notification's "notified" state is `true` (which
// may occur immediately) or the timeout has elapsed, returning the value of
// its "notified" state in either case.
bool WaitForNotificationWithTimeout(absl::Duration timeout) const;
// Notification::WaitForNotificationWithDeadline()
//
// Blocks until either the notification's "notified" state is `true` (which
// may occur immediately) or the deadline has expired, returning the value of
// its "notified" state in either case.
bool WaitForNotificationWithDeadline(absl::Time deadline) const;
// Notification::Notify()
//
// Sets the "notified" state of this notification to `true` and wakes waiting
// threads. Note: do not call `Notify()` multiple times on the same
// `Notification`; calling `Notify()` more than once on the same notification
// results in undefined behavior.
void Notify();
private:
static inline bool HasBeenNotifiedInternal(
const std::atomic<bool>* notified_yet) {
return notified_yet->load(std::memory_order_acquire);
}
mutable Mutex mutex_;
std::atomic<bool> notified_yet_; // written under mutex_
};
ABSL_NAMESPACE_END
} // namespace absl
#endif // ABSL_SYNCHRONIZATION_NOTIFICATION_H_

View file

@ -0,0 +1,133 @@
// Copyright 2017 The Abseil Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "absl/synchronization/notification.h"
#include <thread> // NOLINT(build/c++11)
#include <vector>
#include "gtest/gtest.h"
#include "absl/synchronization/mutex.h"
namespace absl {
ABSL_NAMESPACE_BEGIN
// A thread-safe class that holds a counter.
class ThreadSafeCounter {
public:
ThreadSafeCounter() : count_(0) {}
void Increment() {
MutexLock lock(&mutex_);
++count_;
}
int Get() const {
MutexLock lock(&mutex_);
return count_;
}
void WaitUntilGreaterOrEqual(int n) {
MutexLock lock(&mutex_);
auto cond = [this, n]() { return count_ >= n; };
mutex_.Await(Condition(&cond));
}
private:
mutable Mutex mutex_;
int count_;
};
// Runs the |i|'th worker thread for the tests in BasicTests(). Increments the
// |ready_counter|, waits on the |notification|, and then increments the
// |done_counter|.
static void RunWorker(int i, ThreadSafeCounter* ready_counter,
Notification* notification,
ThreadSafeCounter* done_counter) {
ready_counter->Increment();
notification->WaitForNotification();
done_counter->Increment();
}
// Tests that the |notification| properly blocks and awakens threads. Assumes
// that the |notification| is not yet triggered. If |notify_before_waiting| is
// true, the |notification| is triggered before any threads are created, so the
// threads never block in WaitForNotification(). Otherwise, the |notification|
// is triggered at a later point when most threads are likely to be blocking in
// WaitForNotification().
static void BasicTests(bool notify_before_waiting, Notification* notification) {
EXPECT_FALSE(notification->HasBeenNotified());
EXPECT_FALSE(
notification->WaitForNotificationWithTimeout(absl::Milliseconds(0)));
EXPECT_FALSE(notification->WaitForNotificationWithDeadline(absl::Now()));
const absl::Duration delay = absl::Milliseconds(50);
const absl::Time start = absl::Now();
EXPECT_FALSE(notification->WaitForNotificationWithTimeout(delay));
const absl::Duration elapsed = absl::Now() - start;
// Allow for a slight early return, to account for quality of implementation
// issues on various platforms.
const absl::Duration slop = absl::Microseconds(200);
EXPECT_LE(delay - slop, elapsed)
<< "WaitForNotificationWithTimeout returned " << delay - elapsed
<< " early (with " << slop << " slop), start time was " << start;
ThreadSafeCounter ready_counter;
ThreadSafeCounter done_counter;
if (notify_before_waiting) {
notification->Notify();
}
// Create a bunch of threads that increment the |done_counter| after being
// notified.
const int kNumThreads = 10;
std::vector<std::thread> workers;
for (int i = 0; i < kNumThreads; ++i) {
workers.push_back(std::thread(&RunWorker, i, &ready_counter, notification,
&done_counter));
}
if (!notify_before_waiting) {
ready_counter.WaitUntilGreaterOrEqual(kNumThreads);
// Workers have not been notified yet, so the |done_counter| should be
// unmodified.
EXPECT_EQ(0, done_counter.Get());
notification->Notify();
}
// After notifying and then joining the workers, both counters should be
// fully incremented.
notification->WaitForNotification(); // should exit immediately
EXPECT_TRUE(notification->HasBeenNotified());
EXPECT_TRUE(notification->WaitForNotificationWithTimeout(absl::Seconds(0)));
EXPECT_TRUE(notification->WaitForNotificationWithDeadline(absl::Now()));
for (std::thread& worker : workers) {
worker.join();
}
EXPECT_EQ(kNumThreads, ready_counter.Get());
EXPECT_EQ(kNumThreads, done_counter.Get());
}
TEST(NotificationTest, SanityTest) {
Notification local_notification1, local_notification2;
BasicTests(false, &local_notification1);
BasicTests(true, &local_notification2);
}
ABSL_NAMESPACE_END
} // namespace absl