6.4. 简易的 shared_ptr 实现

实现 shared_ptr<T> 可谓困难重重,优化更是难上加难。相比 unique_ptr<T>,更建议直接使用标准库实现 —— 该类型的正确实现极具挑战性,而标准版本凝聚了多年实践经验和测试验证。本节提供的简易版本仅适用于实验目的(虽然能处理简单场景,但要实现工业级强度则需专业级开发工作)。

实现 shared_ptr 的核心难点在于需要同时管理两种资源:

  1. 共享指针指向的对象
  2. 引用计数器

这种双重职责(违反经典面向对象设计的单一职责原则)显著增加了实现的复杂度,特别是在异常安全方面需要格外谨慎。

为保持简洁性,实现将忽略标准 shared_ptr 的许多细节约束,仅聚焦于管理标量类型 T。让我们逐步构建这个类型:

#include <atomic>
#include <utility>

namespace managing_memory_book {
  // 原生 shared_ptr
  template <class T>
  class shared_ptr {
    T* p = nullptr;
    std::atomic<long long> *ctr = nullptr;
    // ...

shared_ptr<T>负责管理T*和一个指向引用计数器的指针,两者都需要在共同所有者之间进行管理和共享。需要注意的是,共享计数器是一个指向原子整型的指针,因为shared_ptr<T>在多线程场景中尤为重要 —— 无法预知哪个线程会成为对象的最后使用者,所以增减计数器等操作需要同步机制来避免数据竞争。

避免数据竞争

若程序中出现以下情况:(a) 至少两个线程并发访问同一对象,(b) 至少有一个访问是写操作,(c) 且未进行同步控制,此时程序就会产生所谓的数据竞争,将彻底丧失通过源码推断程序行为的能力,这种情况极其危险。

在实现中,共享计数器的操作极可能并发执行,因此必须进行同步控制。这正是采用原子整型(atomic integrals)作为底层同步原语来实现计数器的根本原因。

构造 shared_ptr<T> 对象需要特别注意:

如以下代码段所示,对于承担多重职责的类型而言,即便是构造过程也需要精心设计。在接收 T* 的构造函数中,可能需要分配共享计数器(可能抛出异常);在复制构造函数中,则必须处理参数可能为空 shared_ptr<T>(此时共享计数器为 null)的情况:

  // ...
public:
  shared_ptr() = default;
  shared_ptr(T* p) : p{ p } {
    if(p) try {
      ctr = new std::atomic<long long>{ 1LL };
    } catch(...) {
      delete p;
      throw;
    }
  }

  shared_ptr(const shared_ptr &other)
    : p{ other.p }, ctr{ other.ctr } {
    if(ctr) ++(*ctr);
  }

  shared_ptr(shared_ptr &&other) noexcept
    : p{ std::exchange(other.p, nullptr) },
    ctr{ std::exchange(other.ctr, nullptr) } {
  }

  bool empty() const noexcept { return !p; }
  operator bool() const noexcept { return !empty(); }
// ...

empty() 和 operator bool() 成员函数的设计与默认构造函数(即该类型的空状态)直接相关,所以代码段中包含了它们的实现:empty() 用于显式检查 shared_ptr 是否为空,operator bool() 提供布尔上下文的隐式空状态检查。

赋值操作符的行为符合预期:复制赋值会释放当前持有的资源,并共享参数所指向的资源;移动赋值会释放当前持有的资源,并将参数所持有的资源所有权转移给目标对象。

// ...
void swap(shared_ptr &other) noexcept {
  using std::swap;
  swap(p, other.p);
  swap(ctr, other.ctr);
}

shared_ptr& operator=(const shared_ptr &other) {
  shared_ptr{ other }.swap(*this);
  return *this;
}

shared_ptr& operator=(shared_ptr &&other) noexcept {
  shared_ptr{ std::move(other) }.swap(*this);
  return *this;
}
// ...

析构过程可能是该类型最复杂的部分。必须确保最后一个持有者销毁所指对象,避免出现“永生对象”。关键在于:只有当shared_ptr<T>是对象的最后使用者时,才应销毁其指向的T对象。

至少存在两种看似合理但实际不可行的简单算法:

第一种算法是“若计数器非空,则当*ctr==1时删除p和ctr”。该算法存在以下问题:当两个线程同时进入析构函数且*ctr==2时,可能出现两个线程都看不到*ctr==1的情况,导致所指对象永远不会销毁。

图6.1 —— 永生对象的竞态条件
图6.1 —— 永生对象的竞态条件

另一种算法是:“若计数器非空,则递减*ctr。若*ctr==0,则删除p和ctr”。该算法存在以下问题:当两个线程同时进入析构函数且*ctr==2时,两者可能并发递减*ctr,最终都检测到*ctr==0,从而导致对所指对象的双重删除。

图6.2 —— 导致对象双重删除的竞态条件
图6.2 —— 导致对象双重删除的竞态条件

这两种情况都很糟糕(尽管原因不同),因此需要更完善的解决方案。关键在于确保执行线程能够确认自己是使*ctr归零的那个线程。这类问题的通用解决方案需要将两个步骤(仅在变量值为预期值时修改其值,并获知本次写入是否成功)封装为单个原子操作 —— 必须依赖于多核机器提供的硬件级原子操作支持。

C++通过原子类型(atomics)对这些底层硬件操作进行抽象。其中compare_exchange_weak()就是这样一个原子操作:接收预期值(变量应具有的值)和期望值(仅在变量值等于预期值时写入的值),仅在写入成功时返回true。为方便起见,预期值通过引用传递,并在操作失败时自动更新为变量的实际当前值 —— 因为该操作通常需要在循环中调用,直到成功写入期望值为止,每次迭代都需要重新读取变量最新状态。

快照机制

当线程需要递减*ctr时,由于*ctr是可变的共享状态且可能并发修改,其值随时可能变化。因此,首先在本地变量中保存当前状态的“快照”(expected),基于这个不会改变的本地副本计算出目标值(desired)。然后尝试根据这个(可能已过时的)快照执行操作,并通过原子操作验证假设(即*ctr仍持有expected值)是否成立。这种机制能确保只有成功执行写入的线程,才能确知自己是使*ctr变为desired值的操作者。

有了这些,析构函数可进行实现:

// ...
~shared_ptr() {
  if(ctr) {
    auto expected = ctr->load();
    auto desired = expected - 1;
    while(ctr->compare_exchange_weak(expected,
                                     desired))
      desired = expected - 1;
    if(desired == 0) { // 最后一个使用者,释放资源
      delete p;
      delete ctr;
    }
  }
}
// ...

循环结束后,可以确定:当*ctr值为expected时,成功写入了desired值。若desired值为0(则expected原为1),就能确定自己是该指针对象的最后使用者。

更简单的解决方案

此处展示的compare_exchange_weak()方案只是众多可选方案之一。本书之所以选择这种方法,是因为它为解决并发更新问题提供了一个有趣的通用思路,并且如果熟悉内存序约束(此处不做深入探讨),还能开启更多优化可能。

针对这个特定场景,可以用类似if((*ctr)-- == 1)的原子递减操作来替代整个循环 —— 当原子递减前的值为1时,就能确定递减后的*ctr必定为0。

我们实现的 shared_ptr<T> 还包括以下重要成员函数:

    // ...
    bool operator==(const shared_ptr &other)
      const noexcept { return p == other.p; }

    // 自 C++20 起,可由 operator==() 推导而来
    bool operator!=(const shared_ptr &other)
      const noexcept { return !(*this == other); }

    T *get() noexcept { return p; }

    const T *get() const noexcept { return p; }

    T& operator*() noexcept { return *p; }

    const T& operator*() const noexcept { return *p; }

    T* operator->() noexcept { return p; }

    const T* operator->() const noexcept { return p; }
  };
}

若各位愿意,可以应用前面 unique_ptr 章节展示的 C++23 “deduced this” 特性来简化这段代码。另外请记住,在 C++20 中,operator!=() 会从 operator==() 自动推导得出,无需在源码中显式编写。

以下是一个使用该智能指针的极简使用端代码示例:

#include <thread>
#include <chrono>
#include <random>
#include <iostream>

using namespace std::literals;

struct X {
  int n;
  X(int n) : n{ n } {}
  ~X() { std::cout << "X::~X()\n"; }
};

int main() {
  using managing_memory_book::shared_ptr;
  std::mt19937 prng{ std::random_device{}() };
  std::uniform_int_distribution<int> die{ 200, 300 };

  shared_ptr<X> p{ new X{ 3 } };

  using std::chrono::milliseconds; // shortcut

  std::thread th0{ [p, dt = die(prng)] {
    std::this_thread::sleep_for(milliseconds{dt});
    std::cout << "end of th0, p->n : " << p->n << '\n';
  } };

  std::thread th1{ [p, dt = die(prng)] {
    std::this_thread::sleep_for(milliseconds{dt});
    std::cout << "end of th1, p->n : " << p->n << '\n';
  } };

  th1.detach();
  th0.detach();

  std::this_thread::sleep_for(350ms);
  std::cout << "end main()\n";
}

th0和th1都会先休眠一个伪随机毫秒数,然后输出信息并结束执行,无法预知th0和th1哪个会先结束。由于两个线程都是detached状态,所以后续没有地方会调用join(),所以不能假设main()会是共享资源的最后使用者。

这个示例是刻意简化的,需要再次强调的是:由于shared_ptr<T>的使用成本明显高于unique_ptr<T>,当资源存在明确最后一个所有者时,通常应该优先选用后者而非前者。

6.4.1 关于make_shared()的几点说明

阅读C++相关内容(特别是关于shared_ptr<T>)时,可能经常看到这样的建议:应尽可能将这种写法:

std::shared_ptr<X> p{ new X { /* ... 参数 ... */ };

替换为:

auto p= std::make_shared<X>( /* ... 参数 ... */ );

如果有这样的疑问:(a) 为什么这是推荐做法?(b) 为什么至今还未讨论?现在可以回答(a),而(b)的答案需要我们等到第7章掌握必要的工具和知识后才能解释。

理解为何推荐make_shared<T>()工厂函数而非直接调用shared_ptr<T>构造函数,关键在于内存分配方式:

这种优化能提升性能(当单线程短期内频繁访问指针和计数器时),但也可能降低性能(当其他线程频繁修改计数器值时)。正如优化领域的常见情况,需要实际测量以确保通用优化方案也适合具体应用场景。

显然,要实现这种优化,需要能够创建这种内存块(概念上是包含T和原子整型的结构体),并确保shared_ptr<T>能兼容两种表示方式(两个独立指针 / 指向复合块的指针)的同时保持可用性和高效性。届时,第2章和第3章介绍的技巧将发挥重要作用。