同步模式有一个核心目标:确保对多个线程共享的数据进行正确的操作。这些模式对于绝大多数并发程序都至关重要。唯一不需要同步的程序是那些执行多个完全独立任务的程序,这些任务不涉及共同数据(可能只读取共享的、不可变的输入),并产生独立的结果。
对于所有其他程序,都需要管理一些共享状态,需要面临可怕的“数据竞争”。形式上,C++ 标准规定,对同一对象(同一内存位置)的并发访问,如果没有适当的同步机制来保证每个线程的独占访问,就会导致未定义行为。准确地说,如果至少有一个线程可以修改共享数据,那么行为就是未定义的:如果数据从未被线程更改,就不可能发生数据竞争。有一些设计模式利用了这一漏洞,但让我们先从最广为人知的同步模式开始。当你听到避免数据竞争时,首先想到的是什么?
如果说有一种编写并发程序的工具,那就是互斥锁。互斥锁用于保证多个线程对共享数据的独占访问:
std::mutex m;
MyData data;
...
// 在多个线程中:
m.lock();
transmogrify(data);
m.unlock();
数据修改操作 transmogrify() 必须保证对共享数据的独占访问:在给定时间,只能有一个线程执行此操作。开发者使用互斥锁(mutex,即“mutual exclusion”的缩写)来确保:一次只能有一个线程锁定互斥锁并进入临界区(即 lock() 和 unlock() 之间的代码)。
使用互斥锁足以确保对共享数据的正确访问,但这几乎算不上一个好的设计。第一个问题是它容易出错:如果 transmogrify() 抛出异常,或者开发者添加了返回值检查并提前退出临界区,那么最终的 unlock() 将永远不会执行,互斥锁将一直保持锁定状态,从而永久阻塞其他所有线程访问该数据。
这个问题可以通过应用第5章中的 C++ 模式来轻松解决。只需要一个对象来锁定和解锁互斥锁,而 C++ 标准库已经提供了一个,即 std::lock_guard:
// Example 01
std::mutex m;
int i = 0;
void add() {
std::lock_guard<std::mutex> l(m);
++i;
}
...
std::thread t1(add);
std::thread t2(add);
t1.join();
t2.join();
std::cout << i << std::endl;
函数 add() 修改了共享变量 i,因此需要独占访问;这通过使用互斥锁 m 来实现。如果在没有互斥锁的情况下运行此示例,很可能仍然会得到正确的结果,其中一个线程会在另一个线程之前执行。有时程序会失败,但更多时候不会。这并不意味着程序是正确的,只是让它更难调试。你可以借助线程 sanitizer (TSAN) 来发现这种竞争条件。如果使用的是 GCC 或 Clang,请添加 --sanitize=address 以启用它。从 add() 中移除互斥锁(示例 02),用 TSAN 编译,运行程序,将看到如下信息:
WARNING: ThreadSanitizer: data race
...
Location is global 'i' of size 4 at <address>
这里会显示更多信息,帮助确定哪些线程存在数据竞争以及涉及哪个变量。这比等待程序失败来测试数据竞争要可靠得多。
在 C++17 中,使用 std::lock_guard 略微简化,编译器可以从构造函数中推断出模板参数:
// Example 03
std::lock_guard l(m);
在 C++20 中,可以使用 std::jthread 而无需显式调用 join():
// Example 03
{
std::jthread t1(add);
std::jthread t2(add);
}
std::cout << i << std::endl;
在使用计算结果之前,必须小心确保线程已销毁,现在析构函数会汇入(join)线程并等待计算完成。否则,将出现另一个数据竞争:主线程在读取 i 的值的同时,其他线程正在对其进行递增操作(TSAN 也能发现这种竞争)。
RAII 的使用确保了每次锁定互斥锁后都会解锁,但这并不能避免使用互斥锁时可能发生的其他错误。最常见的错误是根本忘记了使用互斥锁。同步保证只有在每个线程都使用相同的机制来确保对数据的独占访问时才有效。即使只有一个线程没有使用互斥锁,哪怕只是读取数据,整个程序也是不正确的。
为此,开发出了一种防止未同步访问共享数据的模式。这种模式通常被称为“互斥锁保护”或“互斥锁防护”,它包含两个关键要素:第一,需要保护的数据和用于保护它的互斥锁组合在同一个对象中。第二,设计确保对数据的每次访问都受到互斥锁的保护。以下是基本的互斥锁保护类模板:
// Example 04
template <typename T> class MutexGuarded {
std::mutex m_;
T data_ {};
public:
MutexGuarded() = default;
template <typename... Args>
explicit MutexGuarded(Args&&... args) :
data_(std::forward<Args>(args)...) {}
template <typename F> decltype(auto) operator()(F f) {
std::lock_guard<std::mutex> l(m_);
return f(data_);
}
};
此模板将互斥锁与其保护的数据结合在一起,并仅提供一种访问数据的方式:通过使用可调用对象来调用 MutexGuarded 对象。确保了所有数据访问都同步:
// Example 04
MutexGuarded<int> i_guarded(0);
void add() {
i_guarded([](int& i) { ++i; });
}
...
std::thread t1(add);
std::thread t2(add);
t1.join();
t2.join();
i_guarded([](int i) { std::cout << i << std::endl; });
这些是正确可靠使用互斥锁模式的最基本版本。实际上,需求往往更为复杂,解决方案也是如此:存在比 std::mutex 更高效的锁(例如,用于保护短时间计算的自旋锁),也存在更复杂的锁,例如用于高效读写访问的共享锁和独占锁。此外,经常需要同时操作多个共享对象,这就引出了安全锁定多个互斥锁的问题。
这些问题中的许多,都可以通过刚刚看到模式的变体来解决。有些则需要一种完全不同的方法来同步数据访问,我们将在本节后面看到其中的几个。最后,一些数据访问挑战最好在系统整体设计的更高层次上解决;本章也将对此进行说明。
接下来,回顾一下超越常用互斥锁的数据共享的不同方法。
虽然使用互斥锁保护共享数据看起来并不那么复杂,但数据竞争是并发程序中最常见的错误。尽管说“不共享数据就不可能有数据竞争”听起来像是无意义的同义反复,但“不共享”却常常被忽视,而它其实是“共享”的一个替代方案。通常可以重新设计程序,以避免共享某些变量,或将对共享数据的访问限制在代码的更小范围内。
这一理念是某个设计模式的基础,该模式解释起来很简单,但应用起来往往很难,需要跳出常规思维 —— 即“线程特定数据模式”。也称为“线程局部数据”,但这个名称容易与 C++ 的 thread_local 关键字混淆。为了说明这个想法,考虑以下示例:需要统计可能在多个线程中同时发生的某些事件(在此演示中,具体统计什么并不重要)。需要整个程序中的事件总数,因此直接的方法是拥有一个共享计数器,并在某个线程检测到事件时对其进行递增(在演示中,我们统计能被 10 整除的随机数):
// Example 05
MutexGuarded<size_t> count;
void events(unsigned int s) {
for (size_t i = 1; i != 100; ++i) {
if ((rand_r(&s) % 10) == 0) { // 需要统计的事件!
count([](size_t& i) { ++i; });
}
}
}
这是一个直接的设计,但并不是最好的。虽然每个线程都在统计事件,但它并不需要知道其他线程统计了多少事件。这不要与实现混淆:在实现中,每个线程需要知道当前计数的值,以便正确地递增它。这种区别很微妙但很重要,并暗示了一种替代方案:每个线程可以使用线程特定的计数器来统计自己的事件,每个线程一个计数器。这些计数器中的一个都不正确,但只要能在需要正确的总事件数时将所有计数器相加,这就无关紧要了。这里有几种可能的设计。可以在每个线程中使用一个局部计数器,并在该线程退出前一次性更新共享计数器:
// Example 06
MutexGuarded<size_t> count;
void events(unsigned int s) {
size_t n = 0;
for (size_t i = 1; i != 100; ++i) {
if ((rand_r(&s) % 10) == 0) { // 需要统计的事件!
++n;
}
}
if (n > 0) count([n](size_t& i) { i += n; });
}
在由一个或多个线程执行的函数中,声明的局部(栈上分配的)变量对每个线程都是特定的:每个线程的栈上都有该变量的唯一副本,当所有线程引用同一个变量名时,每个线程都访问自己的变量。
也可以让每个线程拥有一个唯一的计数变量用于递增,然后在所有计数线程结束后,在主线程中将它们相加:
// Example 07
void events(unsigned int s, size_t& n) {
for (size_t i = 1; i != 100; ++i) {
if ((rand_r(&s) % 10) == 0) ++n;
}
}
当在多个线程上调用此计数函数时,必须采取一些预防措施。显然,应该为每个线程提供自己独立的计数变量 n。但这还不够:由于硬件相关的“伪共享”效应,还必须确保线程特定的计数器在内存中不相邻:
// Example 07
alignas(64) size_t n1 = 0;
alignas(64) size_t n2 = 0;
std::thread t1(events, 1, std::ref(n1));
std::thread t2(events, 2, std::ref(n2));
t1.join();
t2.join();
size_t count = n1 + n2;
alignas 属性确保每个计数变量都按 64 字节对齐,从而保证 n1 和 n2 的地址之间至少相差 64 字节(64 是大多数现代 CPU,包括 x86 和 ARM,的缓存行大小)。对于使用引用参数的函数,std::thread 需要 std::ref 包装器。
前面的例子将对共享数据的访问需求减少到每个线程一次,而最后一个例子则完全没有共享数据;选择哪种解决方案取决于总数量值具体需要在何时获取。
可以从稍微不同的角度来看待最后一个例子,重写一下会有所帮助:
// Example 08
struct {
alignas(64) size_t n1 = 0;
alignas(64) size_t n2 = 0;
} counts;
std::thread t1(events, 1, std::ref(counts.n1));
std::thread t2(events, 2, std::ref(counts.n2));
t1.join();
t2.join();
size_t count = counts.n1 + counts.n2;
这在实质上没有改变东西,但可以将线程特定的计数视为同一数据结构的组成部分,而不是为每个线程创建的独立变量。这种思维方式引导我们走向线程特定数据模式的另一种变体:有时,多个线程必须操作相同的数据,但可能可以将数据分区,并为每个线程分配一个子集来操作。
下一个例子中,需要将vector中的每个元素进行限幅(如果某个元素超过最大值,则用该最大值替换,结果始终在零和最大值之间)。该计算通过以下模板算法实现:
// Example 09
template <typename IT, typename T>
void clamp(IT from, IT to, T value) {
for (IT it = from; it != to; ++it) {
if (*it > value) *it = value;
}
}
一个生产质量的实现会确保迭代器参数满足迭代器要求,并且最大值可以与迭代器的值类型进行比较,但为了简洁起见,省略了所有这些(曾用一整章来讨论概念和其他限制模板的方法)。
clamp() 函数可以应用于序列,有时我们很幸运地拥有可以独立在多个线程上处理的互不相关的数据结构。但为了继续这个例子,假设只有一个需要限幅的vector。然而,情况并非无望,因为可以用多个线程处理其互不重叠的部分,而不会产生数据竞争的风险:
// Example 09
std::vector<int> data = ... 数据 ...;
std::thread t1([&](){
clamp(data.begin(), data.begin() + data.size()/2, 42);
});
std::thread t2([&](){
clamp(data.begin() + data.size()/2, data.end(), 42);
});
...
t1.join();
t2.join();
尽管程序中的数据结构在两个线程之间是共享的,并且两个线程都会修改它,但此程序是正确的:对于每个vector元素,只有一个线程可以修改它。但vector对象本身呢?不是也在所有线程之间共享吗?
我们已经强调过,有一种情况可以在无需同步的情况下允许数据共享:只要没有其他线程在修改,任意数量的线程都可以读取同一个变量。我们的示例利用了:所有线程都读取vector的大小和其他数据成员,但没有线程修改它们。
线程特定数据模式的应用必须经过仔细思考,通常需要对数据结构有很好的理解。必须绝对确保没有线程试图修改它们共享的变量,例如vector对象本身的大小和指向数据的指针。例如,如果其中一个线程可以调整vector的大小,即使没有两个线程访问同一个元素,这也会构成数据竞争:vector的大小是一个由一个或多个线程在没有锁的情况下修改变量。
我们在此小节中要描述的最后一种模式适用于以下情况:多个线程需要修改整个数据集(因此无法分区),但这些线程不需要看到其他线程所做的修改。通常,这种情况发生在修改是作为计算某个结果的一部分时,但修改的数据本身并不是最终结果。这种情况下,有时最好的方法是为每个线程创建一个线程特定的数据副本。当这种副本是一个“一次性”对象时,这种模式效果最佳:每个线程都需要修改其副本,但修改的结果不需要提交回原始数据结构。
在以下示例中,可以使用一种算法来统计vector中唯一元素的数量,该算法会就地对vector进行排序:
// Example 10
void count_unique(std::vector<int> data, size_t& count) {
std::sort(data.begin(), data.end());
count = std::unique(data.begin(),
data.end()) - data.begin();
}
此外,当只需要统计满足某个谓词的元素时,首先删除所有其他元素(std::erase_if 是 C++20 的新增功能,但在之前的 C++ 版本中也很容易实现):
// Example 10
void count_unique_even(std::vector<int> data, size_t& count) {
std::erase_if(data, [](int i) { return i & 1; });
std::sort(data.begin(), data.end());
count = std::unique(data.begin(),
data.end()) - data.begin();
}
这两种操作都是对向量的破坏性操作,但只是实现目的的手段:当得到了计数,修改后的vector就可以丢弃。同时在多个线程上计算计数的最简单,且通常也是最高效的方法,就是为每个线程创建vector副本。实际上,我们已经这样做了:两个计数函数都按值传递vector参数,因此会创建副本。通常,这会是一个错误,但在我们的情况下,这是有意为之,允许两个函数并发地操作同一个vector:
// Example 10
std::vector<int> data = ...;
size_t unique_count = 0;
size_t unique_even_count = 0;
{
std::jthread t1(count_unique, data,
std::ref(unique_count));
std::jthread t2(count_unique_even, data,
std::ref(unique_even_count));
}
当然,仍然存在对原始数据的并发访问,并且这种访问是在没有锁的情况下进行的:两个线程都需要创建各自的线程特定副本。然而,这属于只读并发访问的例外情况,因此是安全的。
原则上,尽可能避免数据共享,否则使用互斥锁,足以在程序中实现无竞争的数据访问。然而,这可能不是实现此目标的高效方式,而良好的性能几乎总是并发的目标。现在,我们将考虑几种其他用于共享数据并发访问的模式,这些模式会在合适的时机提供更优的性能。我们将从超越互斥锁的同步原语开始,这些原语专门设计用于让线程高效地等待某些事件。
等待是并发程序中经常遇到的问题,表现形式多样。我们已经见过一种:互斥锁。确实,如果两个线程试图同时进入临界区,其中一个将不得不等待。但等待并非目标,而只是临界区独占访问带来的不幸副作用。还有其他一些情况,等待是主要目的。例如,可能有线程在等待某个事件发生。这可能是一个用户界面线程等待输入(性能要求很低或没有要求),或一个线程在等待网络套接字(中等性能要求),甚至是一个高性能线程,比如线程池中的计算线程,等待任务执行(极高的性能要求)。这些场景有不同的实现方式,但从根本上讲有两种方法:轮询和通知。先来看通知方式。
等待通知的基本模式是条件模式,通常由一个条件变量和一个互斥锁组成。一个或多个线程阻塞,等待在条件变量上。在此期间,还有一个线程会锁定互斥锁(从而保证独占访问),并执行其他线程所等待完成的工作。当工作完成,完成工作的线程必须释放互斥锁(以便其他线程可以访问包含此工作结果的共享数据),并通知等待的线程,让它们可以继续执行。
例如,在一个线程池中,等待的线程是池中的工作线程,等待的任务会添加到池中。由于池的任务队列是共享资源,因此线程需要独占访问才能推入或弹出任务。一个向队列添加一个或多个任务的线程必须在执行时持有互斥锁,然后通知工作线程有任务需要它们执行。
现在,来看一个只有两个线程的通知模式示例。首先,我们有主线程,它启动一个工作线程,然后等待它产生一些结果:
// Example 11
std::mutex m;
std::condition_variable cv;
size_t n = 0; // 在工作完成前为零
// 主线程
void main_thread() {
std::unique_lock l(m);
std::thread t(produce); // 启动工作线程
cv.wait(l, []{ return n != 0; });
... 生产者线程已完成,我们已获得锁 ...
}
在这种情况下,锁定由 std::unique_lock 提供,这是一个包装互斥锁的对象,具有类似互斥锁的接口,包含 lock() 和 unlock() 成员函数。互斥锁在构造函数中锁定,但当开始在条件上等待时,wait() 函数会立即将其解锁。当收到通知时,wait() 在将控制权返回给调用者之前会再次锁定互斥锁。
许多等待和条件的实现都存在所谓的“未唤醒”问题:即使没有收到通知,wait 也可能会中断。这就是为什么需要检查结果是否已准备好,本例中通过检查结果计数 n 来实现:如果它仍然是零,则没有结果,主线程则错误地唤醒,可以回到等待状态(等待线程在 wait() 返回之前仍必须获取互斥锁,因此必须等待工作线程释放此互斥锁)。
工作线程必须在访问共享数据之前锁定同一个互斥锁,然后在通知主线程工作已完成之前解锁:
// Example 11
// 工作线程
void produce() {
{
std::lock_guard l(m);
... 计算结果 ...
n = ... 计数结果 ...
} // 互斥锁已解锁
cv.notify_one(); // 通知等待的线程
}
没有必要在整个工作线程活动期间都持有互斥锁:唯一目的是保护共享数据,例如示例中的计数结果 n。
std::condition_variable 和 std::unique_lock 这两个同步原语是实现带条件等待模式的标准 C++ 工具。就像互斥锁一样,它们有许多变体。
通知的替代方案是轮询。这种模式中,等待线程会反复检查某个条件是否满足。在 C++20 中,可以使用 std::atomic_flag 实现一个简单的轮询等待示例,这本质上是一个原子布尔变量(在 C++20 之前,可以使用 std::atomic<bool> 实现相同的功能):
// Example 12
std::atomic_flag flag;
// 工作线程:
void produce() {
... 生成结果 ...
flag.test_and_set(std::memory_order_release);
}
// 等待线程:
void main_thread() {
flag.clear();
std::thread t(produce);
while (!flag.test(std::memory_order_acquire)) {} // 等待
... 结果已准备好 ...
}
原子操作(如 test_and_set())利用了内存栅栏:这是一种全局同步标志,确保在标志设置(释放,release)之前对内存所做的所有修改,在其他线程中于标志测试(获取,acquire)之后执行的操作都是可见的。关于这些栅栏还有很多内容,但这超出了本书的范围,可以在许多讨论并发和效率的书籍中找到。
这个例子与之前例子之间最重要的区别是,示例 12 中等待线程有一个显式的轮询循环。如果等待时间很长,这将非常低效,等待线程在整个等待期间都在忙于计算(从内存中读取)。实际的实现都会在等待循环中引入一些休眠,但这也会带来成本:等待线程不会在工作线程设置标志后立即唤醒,而必须先完成休眠。这些效率问题超出了本书的范围;这里,只想展示这些模式的整体结构和组件。
轮询和等待之间的界限并不总很清晰。例如,wait() 可能是通过定期轮询条件变量的某些内部状态来实现的。事实上,刚刚看到的同一个原子标志也可以用于等待通知:
// Example 13
std::atomic_flag flag;
// 工作线程:
void produce() {
... 生成结果 ...
flag.test_and_set(std::memory_order_release);
flag.notify_one();
}
// 等待线程:
void main_thread() {
flag.clear();
std::thread t(produce);
flag.wait(true, std::memory_order_acquire); // 等待
while (!flag.test(std::memory_order_acquire)) {}
... 结果已准备好 ...
}
调用 wait() 需要对应调用 notify_one()(如果有多个线程在等待该标志,则调用 notify_all())。其实现肯定比我们简单的轮询循环更高效。在收到通知且等待结束后,会检查标志位以确保它确实已设置。标准规定这并非必要,std::atomic_flag::wait() 不会出现未唤醒状态,但 GCC 和 Clang 中的 TSAN 却持不同意见(这可能是 TSAN 的误报,也可能是标准库实现中的 bug)。
还有很多其他需要等待的场景,而我们所需等待的条件也千差万别。另一种常见需求是等待特定数量的事件发生。例如,可能有多个线程在生成结果,需要它们全部完成各自的工作后,主线程才能继续执行,这可以通过在栅栏或门闩等待来实现。在 C++20 之前,需要自行实现这些同步原语,或使用第三方库,但从 C++20 开始,它们已成为标准:
// Example 14
// 工作线程
void produce(std::latch& latch) {
... 执行工作 ...
latch.count_down(); // 又一个线程完成工作
}
void main_thread() {
constexpr size_t nthread = 4;
std::jthread t[nthread];
std::latch latch(nthread); // 等待 4 次 count_down()
for (size_t i = 0; i != nthread; ++i) {
t[i] = std::jthread(std::ref(latch));
}
latch.wait(); // 等待所有生产者线程完成
... 结果已准备好 ...
}
门闩用需要等待的事件数量进行初始化。当执行了相应次数的 count_down() 调用后,门闩将解锁。
等待的应用还有很多,但几乎所有等待模式都大致属于本节中我们所见到的类别(具体实现方式在特定情况下可能对性能产生巨大影响,因此更有可能见到这些同步结构的定制化、应用特定版本,而不是见到非标准的容器或其他基本数据结构)。
接下来,将看到几个非常专用且高效的同步模式示例。它们并非适用于所有场景,但当符合需求时,往往能提供最佳性能。
大多数情况下,安全地访问共享数据依赖于互斥锁。C++ 还支持另一种用于同步并发线程的机制:原子操作。同样,详细的解释超出了本书的范围,本节需要读者对原子操作有一定的先验性了解。
其基本思想是:某些数据类型(通常是整数)具有特殊的硬件指令,允许一些简单的操作(例如读取、写入或递增数值)以原子方式、在单个不可分割的事件中完成。在执行这种原子操作期间,其他线程完全无法访问该原子变量,当一个线程执行原子操作时,所有其他线程看到的该变量要么是操作前的状态,要么是操作完成后的状态,而绝不会是操作进行到一半的中间状态。例如,递增操作本质上是“读-改-写”操作,但原子递增是一种特殊的硬件事务,当读取开始,其他线程就无法访问该变量,直到写入完成。这些原子操作通常伴随着内存栅栏;我们已经使用过,以确保不仅原子操作,程序中所有变量上的其他操作也都能同步进行,避免数据竞争。
原子操作最简单但非常有用的应用是计数。程序中经常需要对某些事物进行计数,在并发程序中,可能需要统计在多个线程上都可能发生的事件。如果只关心所有线程结束后得到的总数量,最好采用前面提到的“不共享”或线程私有计数器的方式。但如果所有线程都需要随时了解当前的计数值呢?当然可以始终使用互斥锁,但用互斥锁来保护一个简单的整数递增操作效率极低。C++ 提供了更好的方法 —— 原子计数器:
// Example 15
std::atomic<size_t> count;
void thread_work() {
size_t current_count = 0;
if (... 计数为偶数 ...) {
current_count =
count.fetch_add(1, std::memory_order_relaxed);
}
}
这个示例中,唯一的共享变量就是计数器 count 本身。由于没有其他共享数据,不需要内存栅栏(“宽松”内存序意味着对其他数据的访问顺序没有要求)。fetch_add() 操作是一个原子递增操作,它将 count 的值加一,并返回 count 增加之前的旧值。
原子计数器还可以用来让多个线程无需加锁地操作同一个数据结构:要做到,我们需要确保每个数据结构的元素最多只有一个线程在操作。以这种方式使用时,这种模式通常称为原子索引。
下一个示例中,有一个在所有线程中共享的数据数组:
// Example 16
static constexpr size_t N = 1024;
struct Data { ... };
Data data[N] {};
我们还有一个将所有需要将工作结果,存储到数组中的线程共享的原子索引。为了安全地实现,每个线程都会对这个原子索引进行递增,并使用递增前的值作为数组的索引。由于两次原子递增操作不可能产生相同的值,每个线程都能获得自己独有的数组元素进行操作:
// Example 16
std::atomic<size_t> index(0);
// 多个生产者线程
void produce(size_t& n) {
while (... more work … ) {
const size_t s =
index.fetch_add(1, std::memory_order_relaxed);
if (s >= N) return; // 没有更多空间
data[s] = ... 结果 ...
}
}
每个线程可以初始化尽可能多的数组元素,并在(自身以及所有其他线程)填满整个数组时停止。主线程必须等待所有工作完成后,才能访问结果。仅靠原子索引无法实现,原子索引是在线程开始处理某个数组元素时递增的,而不是在线程完成该工作时才递增。因此,必须使用其他同步机制,让主线程等待所有工作完成,例如使用门闩,或者在简单情况下,直接对生产者线程进行 join 操作:
// Example 16
void main_thread() {
constexpr size_t nthread = 5;
std::thread t[nthread];
for (size_t i = 0; i != nthread; ++i) {
t[i] = std::thread(produce);
}
// 等待所有生产者线程完成
for (size_t i = 0; i != nthread; ++i) {
t[i].join();
}
... 所有工作已完成,数据已准备就绪 ...
}
当访问已生成的结果时并不依赖计数值本身,原子计数就非常适用。在上一个例子中,生产者线程不需要访问其他线程计算出的数组元素,而主线程则在访问结果前等待所有线程完成。但情况往往并非如此,需要在数据生成的同时就进行访问。这时就需要用到内存栅栏。
一种最简单却出人意料强大的无锁模式,就是依赖内存栅栏的“发布协议”。该模式适用于一个线程生成某些数据,并在数据就绪后将其提供给一个或多个其他线程访问的场景。其模式如下:
// Example 17
std::atomic<Data*> data;
void produce() {
Data* p = new Data;
... 完成 *p 对象的初始化 ...
data.store(p, std::memory_order_release);
}
void consume() {
Data* p = nullptr;
while (!(p = data.load(std::memory_order_acquire))) {}
... 可以安全地使用 *p ...
}
共享变量是一个指向数据的原子指针,通常称为“根”指针,该数据本身可能是一个包含多个指针连接其各部分的复杂数据结构。此模式的关键要求是:访问整个数据结构的唯一途径就是通过这个根指针。
生产者线程首先构建它需要生成的所有数据。在此过程中,它使用一个线程私有的指针(通常是一个局部变量)来访问这些数据。此时,其他线程还无法看到这些数据,因为根指针尚未指向它,而生产者线程的局部指针也不会与其他线程共享。
最后,当数据准备就绪时,生产者线程将指向该数据的指针以原子方式存储到共享的根指针中。通常说生产者线程“原子地发布”了这些数据,因此该模式得名“发布协议”。
消费者必须等待数据被发布:只要根指针仍为空,它们就无事可做。它们会等待根指针变为非空(这种等待不必采用轮询方式,也可以使用通知机制)。当数据发布,消费者线程就可以通过根指针访问它。由于没有其他同步机制,数据当发布,线程都不能再修改它(尽管数据内部可能包含互斥锁或其他机制,以允许其部分安全地被修改)。
仅靠原子变量本身不足以保证此模式下无数据竞争:所有线程访问的不仅仅是原子指针,还有它所指向的内存。这正是我们需要特定内存栅栏的原因:在发布数据时,生产者使用释放栅栏,这不仅确保指针的初始化是原子的,还保证在原子写操作之前对内存的所有修改,对于读取到该指针新值的线程都可见。消费者则使用获取栅栏,以确保在读取到指针的新值之后对共享数据进行的操作,都能观察到数据在发布时刻的最新状态。换句话说,如果读取了指针的值然后解引用它,通常无法确定得到的是否是指针所指向数据的最新值。但如果使用获取栅栏读取指针(且该指针是用释放栅栏写入的),那么就可以确信,将读取(获取)到数据在最后一次写入(释放)时的状态。释放栅栏和获取栅栏共同作用,保证了消费者看到的共享数据,与生产者在将数据地址发布到根指针那一刻所看到的状态完全一致。
同样的模式也可用于发布线程间共享的大型数据结构中已完成的元素。例如,可以让生产者线程发布它已用结果初始化的数组元素数量:
// Example 18
constexpr size_t N = ...;
Data data[N]; // 共享数据,无需加锁
std::atomic<size_t> size;
void produce() {
for (size_t n = 0; n != N; ++n) {
data[n] = ... 结果 ...
size.store(n, std::memory_order_release);
}
}
void consume() {
size_t n = 0;
do {
n = size.load(std::memory_order_acquire);
... 可以安全访问前 n 个元素 ...
} while (n < N - 1);
}
其思想与前一个例子完全相同,只是用数组索引代替了指针。在这两种情况下,都是一个生产者线程计算并发布数据,一个或多个消费者线程等待数据发布。如果需要多个生产者,则必须使用其他同步机制,来确保它们不会处理相同的数据,例如刚刚看到的原子索引。
在一个包含多个生产者和消费者线程的程序中,通常需要结合使用多种同步模式。在下一个例子中,有一个大型的共享数据结构,组织为一个指向各个元素的指针数组。多个生产者线程将此数据结构填充结果;将使用原子索引来确保每个元素仅由一个生产者处理:
// Example 19
static constexpr size_t N = 1024;
struct Data { ... };
std::atomic<Data*> data[N] {};
std::atomic<size_t> size(0); // 原子索引
void produce() {
Data* p = new Data;
... 计算 *p ...
const size_t s =
size.fetch_add(1, std::memory_order_relaxed);
data[s].store(p, std::memory_order_release);
}
生产者线程计算出结果后,获取当前的索引值,同时将索引递增,从而确保下一个生产者无法获得相同的索引值,数组槽位 data[s] 保留给当前生产者线程使用。这足以避免生产者之间的共享冲突。然而,消费者不能使用同一个索引来获知数组中已有多少元素:索引是在对应数组元素初始化之前就递增的。对于消费者采用发布协议:每个数组元素都是一个原子指针,在数据发布之前保持为空。消费者必须等待某个指针变为非空,然后才能访问对应的数据:
// Example 19
void consumer() {
for (size_t i = 0; i != N; ++i) {
const Data* p =
data[i].load(std::memory_order_acquire);
if (!p) break; // 没有更多数据
.. 可以安全访问 *p ...
}
}
这个例子中,消费者当发现某个数据元素尚未就绪就会停止。也可以选择继续扫描数组:由于其他生产者线程可能已经填充了后续的某些元素,可能已经准备就绪。如果这样做,就必须以某种方式记住,并回头处理那些之前错过的元素。当然,具体采用哪种方法取决于需要解决的问题。
关于无锁编程的文献非常丰富,其中充满了(通常)极为复杂的示例。我们所展示的并发模式仅仅是,构建更复杂数据结构和数据同步协议的基本构件。
下一节中,我们将探讨一些更高级别的模式,这些模式适用于设计此类数据结构,甚至适用于设计整个程序及其主要组件。