前面基于大小的内存池示例,是针对单一固定大小的内存块,以及特定的使用模式进行优化的。在实际应用中,还有许多其他理由需要采用专门的内存分配策略。
本节中,将探讨一种“分块内存池(chunked pool)”的概念,也就是一个预先分配好、用于特定大小内存块的池化管理机制。这种内存池可以包含多个不同大小的内存块集合,每个集合用于分配特定尺寸的对象。
这个示例主要是为了教学目的而设计的,而不是直接用于生产环境。接下来的代码在性能上可以做到相当高效,甚至可以进一步优化得更快。但在本书中,我们会更关注其整体设计思路,而将具体的优化工作留给亲爱的读者你,去按照自己的需求进行改进。
本示例的思路是:用户代码计划分配尺寸相近(但不一定完全相同)、类型各异的对象,并假设对象总数有一个上限。这为我们提供了一些先验知识。利用这些信息,我们将编写一个模板类。
为了更清楚地说明,先来看一个例子:
一个 ChunkSizedAllocator<10, 20, 40, 80, 160> 对象将会预先分配足够的原始内存,用于容纳:
10 个 20 字节大小的对象,
10 个 40 字节大小的对象,
10 个 80 字节大小的对象,
10 个 160 字节大小的对象。
这些对象总共至少需要 3,000 字节的内存(即 200 + 400 + 800 + 1600)。我们之所以说“至少”,是为了让内存池真正可用,我们的类还需要考虑内存对齐(alignment)的问题。如果要避免分配出未正确对齐的对象,则实际分配的内存通常会比最小需求更大。
为了帮助各位理解我们将要做的事情,以下是一些关键点(请允许我玩个文字游戏,称它们为“指针”):
尺寸类别(size categories)Sz... 必须升序排列:要求模板参数包中的整数值按升序排列,这样可以在后续查找时获得更快的速度(线性复杂度而非平方复杂度)。由于这些值在编译时就已知(作为模板参数),这不会带来运行时开销,只是对用户的一个使用约束。当然,我们会在编译时验证这一点,以避免意外错误。
可变参数包不能为空:在 C++ 中,可变参数包是可以为空的,但在我们的设计中,没有尺寸类别是毫无意义的。我们会在编译时确保这种情况不会发生。同样地,N(每种尺寸类别的对象数量上限)也必须大于 0,否则这个类就没有意义了,因此我们也会对其进行验证。
尺寸类别必须满足最低对齐要求:一个不那么显而易见的点是:Sz... 中的每个值都必须至少是 sizeof(std::max_align_t)(我们也可以使用 alignof 来判断,但对于基本类型来说,两者等价)。实际中,我们会将这些尺寸类别调整为2的幂次,以确保可以容纳任意类型的对象。这部分逻辑我们会在内部自动处理,将这个要求强加给用户代码会比较麻烦。
查看代码时,会看到这些约束被明确地表达出来。为了让“代码叙述”更容易理解,下面的代码将逐步展示其实现过程。如果想尝试运行,请务必查看完整的示例代码。
#include <algorithm>
#include <vector>
#include <utility>
#include <memory>
#include <cassert>
#include <concepts>
#include <limits>
#include <array>
#include <iterator>
#include <mutex>
// ... 辅助函数(如下所示)...
template <int N, auto ... Sz>
class ChunkSizedAllocator {
static_assert(is_sorted(make_array(Sz...)));
static_assert(sizeof...(Sz) > 0);
static_assert(
((Sz >= sizeof(std::max_align_t)) && ...)
);
static_assert(N > 0);
static constexpr unsigned long long sizes[] {
next_power_of_two(Sz)...
};
using raw_ptr = void*;
raw_ptr blocks[sizeof...(Sz)];
int cur[sizeof...(Sz)] {}; // 初始化为0
// ...
这里有两个数据成员:
blocks:将为每个尺寸类别保存一个指向原始内存块的指针;
cur:将为每个尺寸类别保存当前内存块中下一个可用位置的索引(默认初始化为 0,因为从每个内存块的起始位置开始分配)。
这个类的代码将在稍后继续展示。目前,可能有一些未解释的辅助函数:
我们使用了 make_array(Sz...):这是一个 constexpr 函数,它根据模板参数包 Sz... 中的值构造一个类型为 std::array
我们在该 std::array
名为 sizes 的非静态成员数组将保存 Sz... 中每个值的下一个 2 的幂次(当然也包括该值本身:如果已经是 2 的幂次,那再好不过了!)。例如,如果 Sz... 是 10, 20, 32,那么 sizes 数组中将包含 16, 32, 32。
实际操作中,如果以连续方式进行分配,那么不是 2 的幂次的内存块大小,会导致在首次分配之后的对象出现未对齐(misaligned)的情况。虽然可以通过添加填充(padding)来避免这个问题,但这会使实现变得复杂得多。
为了加快分配速度,需要在编译时就为 Sz... 中的每一个尺寸计算其对应的下一个 2 的幂次(next power of two),并将这些值存储在 sizes 数组中。
所以,可能会出现两个不同的尺寸类别最终对应相同大小的内存块。例如,40 和 60 都会调整为 64 字节的块。但这只是一个次要问题。即使如此,代码仍然可以正常工作。
考虑到这是一个为有经验的使用者设计的专业化工具,这种设计权衡是合理且可以接受的。
实际上,这些辅助函数的代码通常定义在 ChunkSizedAllocator<N, Sz...> 类声明之前,如下所示:
// ...
template <class T, std::same_as<T> ... Ts>
constexpr std::array<T, sizeof...(Ts)+1>
make_array(T n, Ts ... ns) {
return { n, ns... };
}
constexpr bool is_power_of_two(std::integral auto n) {
return n && ((n & (n - 1)) == 0);
}
class integral_value_too_big {};
constexpr auto next_power_of_two(std::integral auto n) {
constexpr auto upper_limit =
std::numeric_limits<decltype(n)>::max();
for(; n != upper_limit && !is_power_of_two(n); ++n)
;
if(!is_power_of_two(n)) throw integral_value_too_big{};
return n;
}
template <class T>
constexpr bool is_sorted(const T &c) {
return std::is_sorted(std::begin(c), std::end(c));
}
// ...
make_array() 使用了 concepts 来约束所有值必须具有相同的类型,而 is_power_of_two(n) 则通过位运算确保对 n 的判断既正确又高效(同时也会检查 n 是否为 0,以避免将 0 误判为 2 的幂次)。next_power_of_two() 函数理论上可以优化得更快,但这对我们来影响不大,它仅在编译时使用。我们可以通过将其设为 consteval 来强制只在编译时调用,但考虑到可能有用户希望它在运行时也能使用,因此我们选择保留这种灵活性,使用 constexpr。
在简要介绍了这些辅助函数之后,回到 ChunkSizedAllocator<N, Sz...> 的实现。我们有一个成员函数:bool within_block(void* p, size_t i) const;该函数仅当指针 p 位于第 i 个预分配内存块 blocks[i] 内部时才返回 true。这个函数的逻辑看起来非常简单:有读者可能会想直接测试类似 blocks[i] <= p && p < blocks[i] + N 的表达式。但由于 blocks[i] 是 void* 类型,无法直接进行指针算术运算,必须进行适当的类型转换。但即使这样做了,在 C++ 中仍然是不合法的(回想在第 2 章中对指针算术复杂性的讨论)。虽然在实践中,与 C 兼容的原因,这种写法可能“碰巧有效”,但它并不是可以依赖的标准行为。
撰写本书时,C++ 标准委员会仍在讨论是否添加一个标准库函数,来判断一个指针是否位于两个指针之间。在标准支持之前,可以使用标准库提供的 std::less<> 函数对象来进行“相对合法”的比较。我知道这并不理想,但在当前几乎所有编译器上都能正常工作。更重要的是,将这种判断逻辑封装在一个专用函数中,可以让我们在未来真正有标准支持时,更容易地更新源码替换当前的实现。
// ...
bool within_block(void *p, int i) {
void* b = blocks[i];
void* e = static_cast<char*>(b) + N * sizes[i];
return p == b ||
(std::less{}(b, p) && std::less{}(p, e));
}
// ...
没有理由将 ChunkSizedAllocator<N, Sz...> 类型的对象设为全局可用:这是一个可以在程序中多次实例化、用于解决各种问题的工具。然而,我们并不希望这个类型是可复制的(copyable) —— 虽然理论上可以支持,但这会使设计变得复杂,而收益却非常有限。
通过 std::malloc(),我们的构造函数为 Sz... 中的每一个尺寸(或者更准确地说,每个尺寸对应的下一个 2 的幂次,如本节前面所述)分配原始内存块,并在之后确认所有分配都成功完成。这里使用了 assert() 来进行检查,但也可以选择在分配失败时抛出 std::bad_alloc 异常 —— 前提是你要谨慎地对之前成功分配的内存块调用 std::free()。
与本章前面介绍的内存池(arena)实现类似,ChunkSizedAllocator<N, Sz...> 对象只负责管理内存本身,而不负责管理使用端代码放置在其中的对象。必须假设使用端代码在调用 ChunkSizedAllocator 对象的析构函数之前,已经销毁了所有存储在其内存块中的对象。
并且,该类中包含了一个 std::mutex 成员变量。这是因为我们稍后需要使用它(或某种同步机制),来确保在多线程环境下,内存的分配和释放的线程安全。
// ...
std::mutex m;
public:
ChunkSizedAllocator(const ChunkSizedAllocator&) = delete;
ChunkSizedAllocator&
operator=(const ChunkSizedAllocator&) = delete;
ChunkSizedAllocator() {
int i = 0;
for(auto sz : sizes)
blocks[i++] = std::malloc(N * sz);
assert(std::none_of(
std::begin(blocks), std::end(blocks),
[](auto p) { return !p; }
));
}
~ChunkSizedAllocator() {
for(auto p : blocks)
std::free(p);
}
// ...
终于,来到了整个实现的核心部分:allocate() 和 deallocate() 成员函数。在 allocate(n) 函数中,首先查找最小的、满足条件的尺寸 sizes[i],即其对应的内存块大小足以容纳 n 字节。当找到合适的块,就锁定 std::mutex 对象,以避免竞争条件(race condition),然后检查在 blocks[i] 中是否还至少有一个未被使用的内存块。当前实现采用顺序分配策略,并且不支持内存块的重用,这是为了保持讨论的简洁性。如果找到可用内存块就取出它,更新 cur[i] 指针,并将对应的地址返回给用户代码。
需要注意的是,如果在预分配的内存块中找不到可用的空闲块,或者请求的内存大小 n 超过了预先分配块的尺寸,则将内存分配任务委托给全局的 ::operator new(),以确保分配请求仍有可能成功。这时,我们也可以选择抛出 std::bad_alloc 异常,具体取决于设计意图:如果必须确保所有分配都发生在我们自己的内存池中,在这种情况下抛出异常或以其他方式失败会是更合适的选择。
某些应用场景中,特别是在嵌入式系统、低延迟系统或实时系统领域,软件如果未能在规定时间内给出正确的结果,其后果就和输出错误结果一样严重,甚至更加危险。
举个例子:一个控制汽车刹车的系统,如果在发生碰撞之后才触发刹车动作,那作用就非常有限了。
这类系统在发布前会经过严格的测试,以确保在运行时表现出可预测的行为。因此,在开发阶段,可能更倾向于以一种能在测试阶段被发现的方式失败,而不是默认使用某种在某些情况下,可能无法满足时间要求的备用策略。当然,请不要发布那些在真实使用中会突然停止工作的系统:必须做好充分测试,并确保用户的安全!
但在某些场景下,可能会希望:如果发生了严重错误,系统能“优雅地失败” —— 例如,在某个地方打印出 "Sorry, we messed up"(抱歉,我们搞砸了),然后重启程序 —— 有时候,这种方式也是完全可以接受的,甚至更安全、更可控。
deallocate(p) 函数会遍历每一个内存块,检查指针 p 是否位于其中某一块内存范围内。前面提到的 within_block() 函数,本来可以从一个标准尚未提供的指针比较功能中受益(即判断一个指针是否在两个指针之间)。如果实际中使用这段代码,请记得留一个备注:当标准支持了这样的功能,就及时替换使用。如果 p 不在我们管理的内存块中,则很可能是通过 ::operator new() 分配的,我们也应该通过 ::operator delete() 来释放。
正如之前所说,我们的实现不会重用已经被释放的内存块。不过,在代码中已经用注释标出了应该实现内存重用的逻辑位置(同时还包含了用于锁定互斥锁的代码),所以完全可以在这个位置添加内存块的重用逻辑。
// ...
auto allocate(std::size_t n) {
using std::size;
// 使用可用的最小块
for(std::size_t i = 0; i != size(sizes); ++i) {
if(n < sizes[i]) {
std::lock_guard _ { m };
if(cur[i] < N) {
void *p = static_cast<char*>(blocks[i]) +
cur[i] * sizes[i];
++cur[i];
return p;
}
}
}
// 要么没有块匹配,要么没有块剩余
return ::operator new(n);
}
void deallocate (void *p) {
using std::size;
for(std::size_t i = 0; i != size(sizes); ++i) {
if(within_block(p, i)) {
//std::lock_guard _ { m };
// 如果想重用内存,其就在 blocks[i] 中
return;
}
}
// p不在我们的内存块里
::operator delete(p);
}
};
// ...
由于这是一种由使用端代码按需使用的专用分配机制,我们将为此使用专门的内存分配操作符重载(overloads)。这些重载,将是基于所使用的 ChunkSizedAllocator 对象模板参数的函数模板:
template <int N, auto ... Sz>
void *operator new(std::size_t n, ChunkSizedAllocator<
N, Sz...
> &chunks) {
return chunks.allocate(n);
}
template <int N, auto ... Sz>
void operator delete (void *p, ChunkSizedAllocator<
N, Sz...
> &chunks) {
return chunks.deallocate(p);
}
// new[]和delete[] 留作为练习 ;)
我们已经编写了这些内存分配功能,但为了验证这种设计是否真的带来了好处,还需要对它们进行测试。接下来,将通过实际的测试代码,来观察这种分配策略在性能、内存使用和线程安全性等方面是否优于标准内存分配机制。
现在编写一个简单的测试程序,使用一个带有合适尺寸类别的 ChunkSizedAllocator 对象,以能够从中受益的方式分配和释放符合这些尺寸类别的对象。通过这种方式,假设此类的使用者是,希望利用预先已知的尺寸类别来获得性能优势。
当然,也可以编写其他测试来验证代码,在面对不合适的尺寸请求或构造函数抛出异常等情况下的行为。如果想编写一个比我们这里提供的更复杂的测试框架,那可以试试 —— 只是为了说明与执行速度相关的讨论,我们才使用了较为简单的测试方式。
本章前面曾使用过一个 test() 函数来测试基于大小的内存池,这里我们将再次使用它。关于该函数的具体工作原理,请参见前面的章节。
要编写一个能够验证各种尺寸对象分配与释放行为的测试程序并不容易。为此,将使用一个名为 dummy<N> 的类型,它的每个对象都会在内存中占据 N 字节的空间(通过使用 char[N] 成员来实现这一点,因此知道对于所有合法的 N 值,alignof(dummy<N>) == 1)。
我们还将编写两个不同的 test_dummy<N>() 函数。每个函数都会分配并构造一个 dummy<N> 对象,并设置相应的销毁后释放逻辑:其中一个函数将使用标准库提供的内存分配操作符;另一个函数则将使用为 ChunkSizedAllocator 所定义的操作符重载。
两个 test_dummy<N>() 函数都返回一个值对(pair):其中一个是分配对象的指针,另一个是对该对象进行销毁并释放内存的代码。由于将在使用端代码中存储这些信息,因此需要这些“值对”具有统一的抽象类型。这也解释了为什么使用 void* 来表示地址,以及使用 std::function<void(void*)> 来表示销毁逻辑。这里必须使用 std::function 或类似机制:如果仅使用函数指针是不够的,因为销毁逻辑可能是带有状态(stateful)的(例如,需要记住是哪个分配器对象负责了此次分配)。
以下是这些工具的代码实现:
#include <chrono>
#include <utility>
#include <functional>
template <class F, class ... Args>
auto test(F f, Args &&... args) {
using namespace std;
using namespace std::chrono;
auto pre = high_resolution_clock::now();
auto res = f(std::forward<Args>(args)...);
auto post = high_resolution_clock::now();
return pair{ res, post - pre };
}
template <int N> struct dummy { char _[N] {}; };
template <int N> auto test_dummy() {
return std::pair<void *, std::function<void(void*)>> {
new dummy<N>{},
[](void *p) { delete static_cast<dummy<N>*>(p); }
};
}
template <int N, class T> auto test_dummy(T &alloc) {
return std::pair<void *, std::function<void(void*)>> {
new (alloc) dummy<N>{},
[&alloc](void *p) { ::operator delete(p, alloc); }
};
}
// ...
最后,需要编写测试程序。为了确保理解整个过程中涉及的所有细节,需要逐步讨论这个程序的实现。
程序首先为 ChunkSizedAllocator 对象选定一个值 N,以及该内存管理器将使用的尺寸类别 Sz...(我为 N 选择的值是任意的)。我特意使用了一个不是 2 的幂次的尺寸类别,以展示实现会如何适当地将该值“向上取整”到下一个 2 的幂次:例如,当传入大小为 62 的请求时,在构造类中的 sizes 数据成员时,它会转换为 64。接着,构造了这个对象,并将其命名为 chunks。
// ...
#include <print>
#include <vector>
int main() {
using namespace std;
using namespace std::chrono;
constexpr int N = 100'000;
using Alloc = ChunkSizedAllocator<
N, 32, 62 /* 64 */, 128
>;
Alloc chunks; // 构造 ChunkSizedAllocator
// ...
接下来的测试,无论是针对标准库还是我们自定义的专用内存分配器,都采用了相同的形式:
对测试来说,听起来比实际要复杂得多。幸运的是,当所有测试执行完毕,会输出出每个测试的执行时间,单位为微秒(microseconds):这样就可以对比标准分配器和自定义的 ChunkSizedAllocator 在性能上的差异。
// ...
auto [r0, dt0] = test([ptrs = std::vector<
std::pair<
void*, std::function<void(void*)>
>>(N * 3)]() mutable {
// 分配
for(int i = 0; i != N * 3; i += 3) {
ptrs[i] = test_dummy<30>();
ptrs[i + 1] = test_dummy<60>();
ptrs[i + 2] = test_dummy<100>();
}
// 清理
for(auto & p : ptrs)
p.second(p.first);
return std::size(ptrs);
});
auto [r1, dt1] = test([&chunks, ptrs = std::vector<
std::pair<
void*, std::function<void(void*)>
>>(N * 3)]() mutable {
// 分配
for(int i = 0; i != N * 3; i += 3) {
ptrs[i] = test_dummy<30>(chunks);
ptrs[i + 1] = test_dummy<60>(chunks);
ptrs[i + 2] = test_dummy<100>(chunks);
}
// 清理
for(auto & p : ptrs)
p.second(p.first);
return std::size(ptrs);
});
std::print("Standard version : {}\n",
duration_cast<microseconds>(dt0));
std::print("Chunked version : {}\n",
duration_cast<microseconds>(dt1));
}
好,刚才的过程稍微有些复杂,但希望对你有所启发。那么,这样做值得吗?这取决于具体需求。
当我使用相同的在线 GCC 15 编译器,并以 -O2 优化级别运行这段代码时(就像之前测试基于尺寸的内存池一样),标准库版本的执行时间为 13,360 微秒,而我们“分块”版本的执行时间是 12,023 微秒,相当于标准版本的 90.05%。这种程度的加速还是相当可观 —— 前提是:并没有计入 chunks 对象在构造函数中进行初始内存分配的时间。
这个实验的目的在于展示:在关键路径上,我们可以节省时间;而在非关键路径上,我们可以选择付出代价。
需要注意的是,我们的实现并不重用内存块,而标准版本会这么做。所以我们的性能提升可能以牺牲功能为代价(当然,前提是这个功能很重要)。在测试中,是否需要确定 std::mutex 加锁对性能有影响呢?可以参考以下信息,进行考量:
根据平台,可能有更合适的同步机制可供使用;
如果 deallocate() 成员函数也需要对 std::mutex 加锁,那这个实现可能太简单化,难以带来实际好处。
当然,这个(相当学术性的)版本还有很多可以优化的空间,我鼓励读者们动手尝试改进它(并在每一步都测试结果!)。本节的重点更多在于展示:
基于固定块大小的内存分配可以实现;
从架构设计的角度来看,如何实现它;
指出实现过程中可能遇到的一些风险和陷阱。
这个过程是还挺有趣的?不是吗?