18.5. 并发执行模式实现

下一组并发模式是执行模式。这些模式用于组织在多个线程上进行的计算。正如我们之前看到的同步模式一样,所有这些模式都是底层的。解决实际问题的大多数方案,必须将这些模式组合成更大、更复杂的设计。

这并不是因为 C++ 不适合此类更高级的设计;恰恰相反,正是因为 C++ 提供了如此多样的实现方式(例如,实现一个线程池就有无数种方法),针对每一个具体应用,都能找到在性能和功能上最理想的版本。很难将这些更完整的解决方案描述为“模式” —— 虽然所解决的问题是普遍的,但具体的解决方案却千差万别。

然而,所有这些设计都面临一些共同的挑战,而解决这些挑战的方法通常反复使用相同的工具。因此,可以将这些更基础的挑战,及其通用解决方案归纳为设计模式。

18.5.1 主动对象

首先看到的第一个并发执行模式是主动对象。

一个主动对象通常封装了要执行的代码、执行所需的数据,以及用于异步执行代码的控制流。这种控制流可以简单到由该对象启动并管理的一个独立线程。但不会为每个任务都启动一个新线程,因此主动对象通常会提供某种机制,使其代码能够在多线程执行器(如线程池)上运行。

从调用者的角度来看,一个主动对象是一个对象:调用者构造它,用数据初始化,然后告诉它执行自身,而执行过程是异步发生的。

一个基本的主动对象看起来像这样:

// Example 22
class Job {
  ... data ...
  std::thread t_;
  bool done_ {};
public:
  Job(... args ...) { ... initialize data ... }

  void operator()() {
    t_ = std::thread([this](){ ... computations ... });
  }

  void wait() {
    if (done_) return;
    t_.join();
    done_ = true;
  }

  ~Job() { wait(); }
  auto get() { this->wait(); return ... results ...; }
};

Job j(... 参数 ...);
j(); // 在线程上执行代码
... 执行其他工作 ...
std::cout << j.get(); // 等待结果并打印它们

最简单的示例如下所示,活动对象包含一个线程,该线程用于异步执行代码。大多数实际情况下,会使用一个执行器,该执行器会将其管理的某个线程上的工作进行调度,但这会使我们陷入特定于实现的细节中。当调用 operator() 时,执行开始;也可以通过在构造函数中调用 operator(),使对象在构造后立即执行。在某个时刻,必须等待结果。如果使用一个单独的线程,可以在那时汇入该线程(如果调用者再次调用 wait(),不要尝试汇入两次)。如果该对象代表的不是线程,而是线程池或某些其他执行器中的任务,将执行该情况下所需的清理工作。

当确定了特定的异步执行代码方式,编写具有不同数据和代码的活动对象就变成了一项相当重复的任务。没有人会像刚才那样编写活动对象,总是使用某种通用的可重用框架。实现这种框架有两种通用方法。第一种方法使用继承:基类完成样板工作,而派生类包含特定于任务的唯一数据和代码。继续对活动对象的简单方法,可以将基类编写为:

// Example 23
class Job {
  std::thread t_;
  bool done_ {};
  virtual void operator()() = 0;
public:
  void wait() {
    if (done_) return;
    t_.join();
    done_ = true;
  }

  void run() {
    t_ = std::thread([this](){ (*this)(); });
  }

  virtual ~Job() { wait(); }
};

基类对象 Job 包含了实现异步控制流所需的一切:用于执行的线程,以及一个状态标志,以确保线程仅汇入一次,还定义了通过调用非虚函数 run() 来执行代码的方式。实际在线程上执行的代码必须由派生类通过重载 operator() 来提供。只有 run() 是公有的,而 operator() 并非公有:这是非虚函数惯用法的实际应用(第14章中已经见过这种用法)。

派生类对象当然针对具体问题:

// Example 23
class TheJob final : public Job {
  ... 数据 ...
  void operator()() override { ... 工作 ... }
public:
  TheJob(... 参数 ...) {
    ... 初始化数据 ...
    this->run();
  }
  auto get() { this->wait(); return ... 结果 ...; }
};

这里唯一的微妙之处在于,在派生类对象的构造函数末尾调用了 run()。这种调用并非必需(也可以稍后手动执行该活动对象),但如果希望构造函数自动启动执行,就必须在派生类中完成。如果在线程的基类构造函数中就启动线程和异步执行(即调用 operator()),那么线程上的执行与派生类构造函数中,继续进行的其余初始化步骤之间就会产生竞争条件。出于同样的原因,那些在构造函数中就开始执行的活动对象不应再进一步继承,通过将类声明为 final 来确保。

使用活动对象非常简单:创建对象后,便会在后台(在单独的线程上)开始执行代码;当需要结果时,只需请求获取(这可能涉及等待):

// Example 23
TheJob j1(... 参数 ...);
TheJob j2(... 参数 ...);
... 执行其他操作 ...
std::cout << "Results: " << j1.get() << " " << j2.get();

如果曾经编写过C++并发代码,那么肯定已经使用过活动对象了:std::thread 就是一个活动对象,允许在单独的线程上执行任意代码。在一些C++并发库中,线程是一个基类对象,所有具体的线程类都从中派生。但C++标准库的线程并未采用这种方法。它遵循的是实现可重用活动对象的第二种方式:类型擦除。如果需要熟悉这一概念,请重读第6章。尽管 std::thread 本身就是一个类型擦除的活动对象,我们仍将实现自己的版本,以展示这种设计思路(因为标准库的代码相当难以阅读)。这一次,不再有基类。整个框架由一个单一的类提供:

// Example 24
class Job {
  bool done_ {};
  std::function<void()> f_;
  std::thread t_;
public:
  template <typename F> explicit Job(F&& f) :
    f_(f), t_(f_) {}

  void wait() {
    if (done_) return;
    t_.join();
    done_ = true;
  }

  ~Job() { wait(); }
};

为了实现类型擦除的可调用对象,我们使用了 std::function(也可以使用第6章中介绍的更高效的实现方式,或按照相同的方法自行实现类型擦除)。由调用者提供、用于在线程上执行的代码,来自构造函数参数中的可调用对象 f。类成员的顺序非常重要:异步执行在 thread t_ 初始化完成时立即开始,其他数据成员(特别是可调用对象 f_)必须在此之前完成初始化。

要使用这种风格的活动对象,需要提供一个可调用对象。可以是一个 Lambda 表达式,也可以是一个命名的对象:

// Example 24
class TheJob {
  ... 数据 ...
public:
  TheJob(... 参数 ...) { ... 初始化数据 ... }
    void operator()() { // 可调用!
    ... 执行工作 ...
  }
};

Job j(TheJob(... 参数 ...));
j.wait();

这种设计中,除非 TheJob 创建为一个具名对象,否则没有简单的方法可以访问其数据成员。因此,结果通常通过在构造函数中,以引用方式传递的参数来返回(这与使用 std::thread 的方式相同):

// Example 24
class TheJob {
  ... 数据 ...
  double& res_; // 结果
public:
  TheJob(double& res, ... 参数 ...) : res_(res) {
    ... 初始化数据 ...
  }

  void operator()() { // Callable!
    ... do the work ...
    res_ = ... 结果 ...
  }
};

double res = 0;
Job j(TheJob(res, ... 参数 ...));
j.wait();
std::cout << res;

活动对象几乎存在于每一个并发的 C++ 程序中,但其中一些使用方式较为常见且具有特定用途,因此其本身也视为独立的并发设计模式。接下来,我们将介绍其中的几种模式。

18.5.2 反应器对象模式

反应器模式通常用于事件处理或响应服务请求,解决了一个特定问题:收到来自多个线程的、针对某些操作的多个请求;然而,这些操作的性质决定了其中至少有一部分,必须在单个线程上执行,或以其他方式进行同步。反应器对象就是负责处理这些请求的对象,接收来自多个线程的请求并执行。

以下是一个反应器的示例,可以接收执行特定计算的请求,计算所需的输入由调用者提供,并将结果存储起来。这些请求可以来自任意数量的线程,每个请求都会在结果数组中分配一个槽位 —— 这部分操作必须在所有线程之间进行同步。槽位分配完成后,便可以并发地执行计算。为了实现这个反应器,我们将使用原子索引为每个请求分配唯一的数组槽位:

// Example 25
class Reactor {
  static constexpr size_t N = 1024;
  Data data_[N] {};
  std::atomic<size_t> size_{0};
public:
  bool operator()(... 参数 ...) {
    const size_t s =
      size_.fetch_add(1, std::memory_order_acq_rel);
    if (s >= N) return false; // 数组已满
    data_[s] = ... 结果 ...;
    return true;
  }

  void print_results() { ... }
};

对 operator() 的调用是线程安全的:任意数量的线程可以同时调用此操作符,每次调用都会将计算结果添加到下一个数组槽位中,而不会覆盖其他调用产生的数据。为了从对象中获取结果,我们可以选择等待所有请求完成,或者实现另一种同步机制(例如发布协议),以确保 operator() 和 print_results() 调用彼此之间也是线程安全的。

通常情况下,反应器对象会异步处理请求:拥有一个独立的线程来执行计算任务,并使用一个队列将所有请求传递给该单一线程。我们可以通过组合前面介绍的几种模式,来构建这样的反应器,可以为基本的反应器添加一个线程安全的队列,从而得到一个异步反应器(我们即将看到这种设计的一个示例)。

我们的重点一直是启动和执行任务,然后等待工作完成。接下来的模式则专注于处理异步任务的完成。

18.5.3 前摄器对象模式

前摄器模式用于执行来自一个或多个线程的异步任务,这些任务通常是长时间运行的。这听起来很像 反应器 模式,但区别在于任务完成后的处理方式:对于 反应器,我们只需等待工作完成(等待可以是阻塞或非阻塞的,但在所有情况下,都由调用者主动检查完成状态)。而 前摄器对象会为每个任务关联一个回调函数,当任务完成时,该回调函数将被异步执行。反应器 和 前摄器是解决同一问题的同步与异步两种方案:即如何处理并发任务的完成。

一个典型的前摄器对象通常拥有一个用于异步执行任务的队列,或者使用另一个执行器来调度这些任务。每个任务提交时都附带一个回调函数,通常是一个可调用对象。当任务完成时,该回调函数会被执行;通常,执行任务的同一个线程也会负责调用该回调函数。

由于回调函数的执行始终是异步的,如果回调需要修改共享数据(例如,由提交任务到 前摄器的线程所访问的数据),就必须格外小心。如果回调函数与其他线程共享数据,则必须以线程安全的方式访问这些数据。

以下是使用上一节中线程安全队列的前摄器对象示例。在此示例中,每个任务接收一个整数作为输入,并计算出一个双精度浮点数结果:

// Example 26
class Proactor{
  using callback_t = std::function<void(size_t, double)>;
    struct op_task {
    size_t n;
    callback_t f;
  };
  std::atomic<bool> done_{false}; // 必须在 t_ 之前声明
  ts_queue<op_task> q_; // 必须在 t_ 之前声明
  std::thread t_;
public:
  Proactor() : t_([this]() {
    while (true) {
      auto task = q_.pop();
      if (!task) { // 队列为空
        if (done_.load(std::memory_order_relaxed)) {
          return; // 工作已完成
        }
        continue; // 等待更多任务
      }
      ... 执行工作 ...
      double x = ... 结果 ...
      task->f(n, x);
    } // while (true)
  }) {}

  template <typename F>
  void operator()(size_t n, F&& f) {
    q_.push(op_task{n, std::forward<F>(f)});
  }

  ~Proactor() {
    done_.store(true, std::memory_order_relaxed);
    t_.join();
  }
};

队列存储了由输入数据和可调用对象组成的工作请求;任意数量的线程都可以调用 operator() 来向队列添加请求。更通用的前摄器可能会直接接受一个用于工作请求的可调用对象,而不是将计算逻辑硬编码在具体的前摄器对象中。该前摄器在单个线程上按顺序执行所有请求。当请求的计算完成后,该线程会调用相应的回调函数,并将结果传递给它。我们可以通过以下方式使用这样的前摄器对象:

// Example 26
Proactor p;
for (size_t n : ... 所有输入 ...) {
  p(n, [](double x) { std::cout << x << std::endl; });
}

我们的前摄器在单个线程上执行所有回调,而主线程不进行输出操作。否则,就需要使用互斥锁来保护 std::cout。

前摄器模式既用于执行异步事件,也用于在这些事件发生时执行额外的操作(即回调)。本节探讨的最后一种模式本身并不执行任务,而是用于响应外部事件。

18.5.4 监视器模式

监视器模式在需要观察或监控某些条件,并对特定事件做出响应时使用。通常,一个监视器在其自己的线程上运行,该线程大部分时间处于休眠或等待状态。该线程会通过通知唤醒,或者仅由时间的推移(超时)而唤醒。当唤醒时,监视器对象就会检查其所负责监控的系统状态。如果满足指定的条件,可能会执行某些操作,然后线程再次回到等待状态。

我们将看到一个使用超时机制的监视器实现;使用条件变量的监视器,可以采用相同的方法实现,但使用本章前面介绍的“等待通知”模式。

首先,我们需要有东西可供监视。假设有几个生产者线程,进行一些计算,并使用一个原子索引将结果存储到一个数组中:

// Example 27
static constexpr size_t N = 1UL << 16;
struct Data {... 数据 ... };
Data data[N] {};
std::atomic<size_t> index(0);

void produce(std::atomic<size_t>& count) {
  for (size_t n = 0; ; ++n) {
    const size_t s =
      index.fetch_add(1, std::memory_order_acq_rel);
    if (s >= N) return;
    const int niter = 1 << (8 + data[s].n);
    data[s] = ... 结果 ...
    count.store(n + 1, std::memory_order_relaxed);
  }
}

生产者线程还将线程计算出的结果数量存储在传入的 count 变量中。我们是这样启动生产者线程的:

// Example 27
std::thread t[nthread];
std::atomic<size_t> work_count[nthread] = {};

for (size_t i = 0; i != nthread; ++i) {
  t[i] = std::thread(produce, std::ref(work_count[i]));
}

我们每个线程都有一个结果计数器,因此每个生产者都有自己的计数器进行递增。那么,为什么要将这些计数器设为原子类型呢?因为这些计数器也正是要监控的对象:监控线程将定期报告已完成的工作量。因此,两个线程(生产者线程和监控线程)会访问每个工作计数器,需要使用原子操作或互斥锁来避免数据竞争。

监控器将是一个独立的线程,会定期唤醒,读取结果计数器的值,并报告工作进度:

// Example 27
std::atomic<bool> done {false};
std::thread monitor([&]() {
  auto print = [&]() { ... 输出 work_count[] ... };
  std::cout << "work counts:" << std::endl;
  while (!done.load(std::memory_order_relaxed)) {
    std::this_thread::sleep_for(
      std::chrono::duration<double, std::milli>(500));
    print();
  }
  print();
});

监控器可以在生产者线程启动之前开始,或者在需要监控工作进度的时间启动,报告每个生产者线程计算出的结果数量:

work counts:
1096 1083 957 1046 1116 -> 5298/65536
2286 2332 2135 2242 2335 -> 11330/65536
...
13153 13061 13154 12979 13189 -> 65536/65536
13153 13061 13154 12979 13189 -> 65536/65536

这里我们使用了五个线程来计算总共 64K 个结果,监控器报告了每个线程的计数以及总的结果计数。要关闭监控器,需要设置 done 标志并汇入监控器线程:

// Example 27
done.store(true, std::memory_order_relaxed);
monitor.join();

另一种常见的监控器模式变体是:不依赖定时器等待,而是等待某个条件满足。这种监控器是基本监控器与本章前面介绍的“等待通知”模式的结合。

并发编程社区已经提出了许多其他模式来解决与并发相关的常见问题;其中大多数模式可以在 C++ 程序中使用,但它们并非 C++ 所特有。C++ 特有的特性(例如原子变量)会影响实现和使用这些模式的方式。本章的示例应该为你提供了足够的指导,能够将其他并发模式适配到 C++ 中。

关于执行模式的描述,基本结束了对 C++ 并发模式的简要研究。在你翻过最后一页之前,我想向你展示一种完全不同的并发模式类型。