11.5. 作用域结束时的精准回收

我们本章的第三种,也是最后一种实现方式,将确保在作用域结束时按需进行资源回收和最终化处理。我们的意思是,如果用户希望回收本应在作用域结束时延迟回收的未使用对象,可以实现。对于那些仍视为正在使用的、需延迟回收的对象,不会进行回收;而对于那些已经不再使用的对象,如果用户代码没有要求回收,也不会进行回收。当然,在程序终止时,所有仍需延迟回收的剩余对象都将回收,避免内存泄漏。

这种实现将比之前的实现更为微妙,需要考虑两个因素:(a)在程序执行的某个特定点,某个对象是否仍然引用;以及(b)在那个时刻是否有需要回收那些未引用的对象。

为了实现这一点,我们将借鉴 std::shared_ptr 的设计思路。在第6章中,曾提供过一个学术性且简化的版本。现在编写一个 counting_ptr<T> 类型的智能指针,不会在最后一个引用断开时立即销毁所指向的对象,而是将该对象标记为可回收状态。

以下是该示例的使用端代码。在某些作用域中存在 scoped_collect 类型的对象,这些对象代表了使用端代码发出的请求:在该作用域结束时回收那些不再使用的对象:

// ...
// 注意:非简单可析构类型
struct NamedThing {
  const char *name;
  NamedThing(const char *name) : name{ name } {
    std::cout << name << " ctor" << std::endl;
  }

  ~NamedThing() {
    std::cout << name << " dtor" << std::endl;
  }
};

auto g() {
  auto _ = scoped_collect{};
  [[maybe_unused]] auto p = gcnew<NamedThing>("hi");
  auto q = gcnew<NamedThing>("there");
  return q;
} // 此处可能发生垃圾回收

auto h() {
  struct X {
  int m() const { return 123; }
  };
  return gcnew<X>();
}

auto f() {
  auto _ = scoped_collect{};
  auto p = g();
  std::cout << '\"' << p->name << '\"' << std::endl;
} // 此处可能发生垃圾回收

int main() {
  using namespace std;
  cout << "Pre" << endl;
  f();
  cout << h()->m() << endl;
  cout << "Post" << endl;
} // 此处可能发生垃圾回收 (程序结束)

一个包含 scoped_collect 对象的作用域结束时,将导致所有通过 gcnew<T>() 分配,且在该时刻不再引用的对象回收;无论这些对象在该作用域内分配,还是在程序的其他地方分配,这一行为都适用。这里的设计意图是:作用域结束时,愿意为此付出一定的时间和开销,来集中回收一批对象。请不要在对速度或行为确定性要求很高的作用域中,使用 scoped_colleect 对象!

运行这段代码后,将得到以下结果:

Pre
hi ctor
there ctor
hi dtor
"there"
there dtor
123
Post

仍然引用的对象会保持可用状态,而不再引用的对象则会在 scoped_collect 对象的析构函数调用时,或在程序终止时(如果当时程序中仍存在可回收对象)被回收。

scoped_collect 类型本身非常简单,主要作用是与全局的垃圾回收器(GC)对象进行交互。它是一个不可复制、不可移动的 RAII 对象,在生命周期结束时触发一次垃圾回收操作:

// ...
struct scoped_collect {
  scoped_collect() = default;
  scoped_collect(const scoped_collect &) = delete;
  scoped_collect(scoped_collect &&) = delete;
  scoped_collect& operator=(const scoped_collect &) = delete;
  scoped_collect &operator=(scoped_collect &&) = delete;
  ~scoped_collect() {
    GC::get().collect();
  }
};
// ...

这一整套基础设施是如何工作的呢?来进行逐步分析。我们将从本章前面的章节中获得灵感:最初是在程序执行结束时回收所有对象,随后又为这些对象添加了终结(finalization)功能。本节的新颖之处在于,将引入一种能力:在程序执行过程中的不同时间点回收对象,并编写相应的代码来跟踪对象的引用情况。

为了跟踪对象的引用,将使用 counting_ptr<T> 类型对象:

#include <vector>
#include <memory>
#include <string>
#include <iostream>
#include <atomic>
#include <functional>
#include <utility>

完全可以(而且确实)仅通过标准工具来实现这一类型。count 数据成员是一个指针,它可能会在多个 counting_ptr<T> 实例之间共享:

template <class T>
class counting_ptr {
  using count_type = std::atomic<int>;

  T *p;
  count_type *count;
  std::function<void()> mark;
public:
  template <class M>
  constexpr counting_ptr(T *p, M mark) try :
  p{ p }, mark{ mark } {
    count = new count_type{ 1 };
  } catch(...) {
    delete p;
    throw;
  }

  T& operator*() noexcept {
    return *p;
  }

  const T& operator*() const noexcept {
    return *p;
  }

  T* operator->() noexcept {
    return p;
  }

  const T* operator->() const noexcept {
    return p;
  }

  constexpr bool
  operator==(const counting_ptr &other) const {
    return p == other.p;
  }
  // 从C++20起,可以省略 operator!= 的实现

  constexpr bool
  operator!=(const counting_ptr &other) const {
    return !(*this == other);
  }

  // 我们允许将 counting_ptr<T> 对象与 U* 类型的
  // 对象或其他 counting_ptr<U> 对象进行比较,
  // 以简化类层次结构中类型的处理。
  template <class U>
  constexpr bool
  operator==(const counting_ptr<U> &other) const {
    return p == &*other;
  }

  template <class U>
  constexpr bool
  operator!=(const counting_ptr<U> &other) const {
    return !(*this == other);
  }

  template <class U>
  constexpr bool operator==(const U *q) const {
    return p == q;
  }

  template <class U>
  constexpr bool operator!=(const U *q) const {
    return !(*this == q);
  }
  // ...

现在,关系操作符已经就绪,可以为我们定义的类型实现复制和移动语义:

  // ...
  void swap(counting_ptr &other) {
    using std::swap;
    swap(p, other.p);
    swap(count, other.count);
    swap(mark, other.mark);
  }

  constexpr operator bool() const noexcept {
    return p != nullptr;
  }

  counting_ptr(counting_ptr &&other) noexcept
    : p{ std::exchange(other.p, nullptr) },
      count{ std::exchange(other.count, nullptr) },
      mark{ other.mark } {
  }
  counting_ptr &
  operator=(counting_ptr &&other) noexcept {
    counting_ptr{ std::move(other) }.swap(*this);
    return *this;
  }

  counting_ptr(const counting_ptr &other)
  : p{ other.p }, count{ other.count },
  mark{ other.mark } {
    if (count) ++(*count);
  }

  counting_ptr &operator=(const counting_ptr &other) {
    counting_ptr{ other }.swap(*this);
    return *this;
  }

  ~counting_ptr() {
    if (count) {
      if ((*count)-- == 1) {
        mark();
        delete count;
      }
    }
  }
};

namespace std {
  template <class T, class M>
  void swap(counting_ptr<T> &a, counting_ptr<T> &b) {
    a.swap(b);
  }
}
// ...

counting_ptr<T> 不像 shared_ptr<T> 那样销毁引用计数器和所指向的对象,而是会删除计数器,但对所指向的对象进行“标记”,使其成为后续回收的候选对象。

从上一节中提到的通用垃圾回收机制(GC)、GC::GcRoot 以及 GC::GcNode<T> 的设计思路保持不变,但进行了如下增强:

该实现主要完成三个功能:

当scoped_collect对象的析构函数调用时,是最简便的收集触发时机。以下是这个增强版本的实现代码:

// ...
class GC {
  class GcRoot {
    void *p;

  public:
    auto get() const noexcept { return p; }
    GcRoot(void *p) : p{ p } {
    }

    GcRoot(const GcRoot&) = delete;
    GcRoot& operator=(const GcRoot&) = delete;
    virtual void destroy(void*) const noexcept = 0;
    virtual ~GcRoot() = default;
  };

  template <class T> class GcNode : public GcRoot {
    void destroy(void *q) const noexcept override {
      delete static_cast<T*>(q);
    }

  public:
    template <class ... Args>
    GcNode(Args &&... args)
    : GcRoot(new T(std::forward<Args>(args)...)) {
    }
    ~GcNode() {
      destroy(get());
    }
  };

  std::vector<
    std::pair<std::unique_ptr<GcRoot>, bool>
  > roots;

  GC() = default;

  static auto &get() {
    static GC gc;
    return gc;
  }

此场景下的回收函数实现:

  void make_collectable(void *p) {
    for (auto &[q, coll] : roots)
      if (static_cast<GcRoot*>(p) == q.get())
        coll = true;
  }

  void collect() {
    for (auto p = std::begin(roots); p != std::end(roots); ) {
      if (auto &[ptr, collectible] = *p; collectible) {
        ptr = nullptr;
        p = roots.erase(p);
      } else {
        ++p;
      }
    }
  }

  template <class T, class ... Args>
  auto add_root(Args &&... args) {
    auto q = static_cast<T*>(roots.emplace_back(
      std::make_unique<GcNode<T>>(
        std::forward<Args>(args)...
      ), false
    ).first->get());

    // 标记函数实现为一个 lambda 表达式,
    // 该表达式遍历根节点(roots),然后查找并标记指针 q 以待回收。
    // 当前的实现过于简化(使用了线性搜索),
    // 欢迎你实现更优的方案!
    return counting_ptr{
      q, [&,q]() {
        for (auto &[p, coll] : roots)
          if (static_cast<void*>(q) ==
            p.get()->get()) {
              coll = true;
              return;
            }
      }
    };
  }

  template <class T, class ... Args>
  friend counting_ptr<T> gcnew(Args&&...);
  friend struct scoped_collect;
public:
  GC(const GC &) = delete;
  GC& operator=(const GC &) = delete;
};

// ...
template <class T, class ... Args>
counting_ptr<T> gcnew(Args &&... args) {
  return GC::get().add_root<T>(
    std::forward<Args>(args)...
  );
}
// ...

最后一个示例可以从多个优化中受益,也已经可以正常工作,并且目的是足够简单以便理解和改进。

现在,在 C++ 中像其他流行语言一样成组地回收对象是可能的。它可能并不符合典型的 C++ 编程风格(idiomatic C++),但通过合理的设计和努力,完全可以实现按需(opt-in)的延迟回收机制。感觉还不错!