高并发内存池项目(简化版tcmalloc)

高并发内存池项目(简化版tcmalloc)

十二月 07, 2025 次阅读

项目简介

内存的开辟和管理是高性能服务器开发中的一个重要环节。任何程序在需要分配内存时,传统的内存分配器在高并发场景下往往会成为性能瓶颈,导致系统响应变慢,且当遇到多线程环境时,频繁的锁竞争会严重影响性能。为了解决这个问题,我们设计并实现了一个简化版的高并发内存池,该项目借鉴了Google的tcmalloc的设计思想。

项目源码如下:GitHub - High Concurrency Memory Pool

主要设计思想

分层内存管理

该项目处理内存分配的核心思想是分层内存管理:

  • 所有的内存都会从 ThreadCache 中分配,ThreadCache 是每个线程独有的内存缓存区,避免了多线程间的锁竞争。
  • ThreadCache 中的内存不足时,ThreadCache 会从全局的 CentralCache 中获取内存块,CentralCache 负责管理不同大小的内存块,并将其分配给各个线程的 ThreadCache
  • CentralCache 中的内存也不足时,CentralCache 会从 PageCache 中获取更大的内存块,PageCache 直接与操作系统交互,向操作系统请求大块内存。

尺寸分类与对齐

  • 内存分配的具体大小是多少以用户申请的实际大小为依据,为了实现内存对齐以及避免内存分配的粒度太小以至于管理开销过大,我们将内存分配的大小划分为多个等级(size class),每个等级对应一个固定大小的内存块。用户申请的内存会被对齐到最近的等级。该项目的内存对齐机制可以保证实际分配给用户的内存大小与用户申请的内存大小浪费不超过10%。这种浪费是必要的,我们以极其小的内存浪费代价换了高效的内存管理和分配速度。
  • 当用户申请的内存过大时(超过了规定的界限),会直接调用系统的内存分配原始接口来分配内存,避免了复杂的内存管理开销,且大块内存分配属于低频操作。

线程本地化与批量操作

  • 每个线程都会维护自己的自由链表,避免了多线程间的锁竞争,提高了内存分配和释放的效率。大多数情况下,内存分配都可以直接从线程本地的缓存中获取,我们需要保证线程本地缓存的内存充足,具体实现参考后面的 ThreadCache 设计。
  • 当线程本地缓存中的内存不足时,会从 CentralCache 中批量获取内存块,减少了频繁的内存分配请求,提高了整体性能。

中央与页级的全局协调

  • CentralCache 负责管理不同大小的内存块,并将其分配给各个线程的 ThreadCache。它维护了多个自由链表,每个链表对应一个内存大小等级。当 ThreadCache 请求内存时,CentralCache 会从对应的链表中分配内存块。
  • PageCache 直接与操作系统交互,向操作系统请求大块内存。它维护了一个三级基数树,用于跟踪哪些页框已经被分配,哪些页框是空闲的。当 CentralCache 请求内存时,PageCache 会从操作系统获取大块内存,并将其划分为多个页框,供 CentralCache 使用。

映射与定位

  • 多级页表(PageMap)通过三级基数树结构实现虚拟地址到物理地址的映射,支持高效的内存分配和释放操作。内存分配只存在页的初始化和查询,不存在页的修改,因此不需要担心多线程并发修改页表带来的问题,即便是多个线程一起来初始化同一页,也不会影响正确性,避免了使用锁带来的性能损失。

元数据与固定对象池

  • 该项目利用定长内存池 FLMemPool<T> 为内部固定大小结构体(如 SpanPage 等)分配内存,避免了频繁调用系统内存分配接口带来的性能开销。

实现详解

核心组件 Object / FreeList

不管我们是实现比较复杂的高并发内存池,还是实现其中的定长内存池,都需要将一块块被申请和释放的内存块管理起来,当用户将资源释放时,我们并不需要着急地将资源归还给操作系统,而是将其放入内存池中,等待下一次的分配请求。因此,我们需要一个基础的组件来存储这些内存块,这个组件就是 Object

当我们用户将一块曾经从我们这里申请的内存块归还回来的时候,我们需要将这个资源存储起来,如何存储?数组吗?显然不合适,因为你无法保证用户按照内存块获取的顺序去归还内存块,所以我们需要一个更加灵活的数据结构来存储这些内存块,链表是一个不错的选择。于是我们设计了自由链表 FreeList 这个组件,它内部维护了一个单向链表,用于存储空闲的内存块。

在讲解这两个核心组件之前,我们来讲解一下最基础的内存块管理方式:

假设一个场景,我们当前已经有了一个内存块链表,此时用户又释放了一个内存块给我们,此时我们应该将这个内存块头插到这个内存块链表中,一般的链表会维护一个指针和一个数据,指针用于指向下一个节点,数据用于存储当前节点的数据。也就是将指针和数据分开处理,但用户归还的内存块中的数据是不会被我们使用的,因此我们可以将指针和数据合二为一,直接将指针存储在用户归还的内存块中,这样就节省了内存开销。

假设用户归还的内存块叫做 obj,我们当前的自由链表叫做 freeList,此时我们可以将 freeList 的头节点指针存储在 obj 中,然后将 freeList 的头节点指针指向 obj,这样就完成了内存块的归还操作。一种最原始的实现方式如下:

*(void**)obj = freeList;
freeList = obj;

没错,这样就完成了自由链表的头插,我们将用户的 obj 看作成一个二级指针,然后将 freeList 的头节点指针存储在 obj 中,最后将 freeList 的头节点指针指向 obj

obj

同理,当用户需要申请内存块时,我们只需要将 freeList 的头节点指针取出来,然后将 freeList 的头节点指针指向下一个节点即可,代码如下:

void *ret = freeList;
freeList = *(void**)freeList;
return ret;

这样写当然没啥问题,但是可读性太差了,如果我们不了解这个项目,是很难看懂这是在干什么的。于是我们将其封装成 ObjectFreeList 这两个组件,Object 负责表示一个内存块,FreeList 负责管理这些内存块的链表关系。

Object 组件

Object 组件非常简单,它就是一个空的结构体,我们只需要利用它的内存空间来存储下一个节点的指针即可,代码如下:

struct Object {
    union {
        Object* next;
        // 最小占位,表达“使用态这里是用户数据”,并提升对齐到 max_align_t
        alignas(std::max_align_t) std::byte payload[1];
    };
    Object() : next(nullptr) {}
};

可以看到,我们将 next 指针和 payload 数据放在了一个联合体中,这样就节省了内存开销。payload 用于表示用户数据的起始位置,虽然它只有1个字节,但是我们利用 alignas(std::max_align_t) 将其对齐到最大对齐方式,这样就保证了用户数据的对齐要求。
利用联合体,我们可以在不增加额外内存开销的情况下,既能存储下一个节点的指针,又能表示用户数据的起始位置。

FreeList 组件

通过 FreeList 组件,我们可以更加方便地管理内存块的链表关系,代码如下:

class FreeList
{
public:
    // 向链表头部添加一个对象
    void Push(Object *obj)
    {
        assert(obj);
        obj->next = _freelist;
        // NextObj(obj) = _freelist;
        _freelist = obj;
        ++_size;
    }
    // 从链表头部弹出一个对象
    Object *Pop()
    {
        assert(_freelist);
        Object *obj = _freelist;
        _freelist = obj->next;
        --_size;
        return obj;
    }
    // 向链表头部添加一段对象链
    void PushRange(Object *start, Object *end, size_t num)
    {
        end->next = _freelist;
        _freelist = start;
        _size += num;
    }
    // 从链表头部弹出一段对象链
    void PopRange(Object *&start, Object *&end, size_t num)
    {
        assert(num <= _size);
        start = _freelist, end = _freelist;
        for(int i = 1; i < num; ++i)
        {
            end = end->next;
        }
        _freelist = end->next;
        end->next = nullptr;
        _size -= num;
    }
    bool Empty()
    {
        return _freelist == nullptr;
    }
    // 获取当前链表的最大大小(配合慢启动算法使用)
    size_t& MaxSize()
    {
        return _maxSize;
    }
    // 获取当前链表的实际大小
    size_t Size()
    {
        return _size;
    }
private:
    Object *_freelist = nullptr;
    size_t _maxSize = 1;
    size_t _size = 0;
};

FreeList 组件提供了以下功能:

  • Push(Object *obj):将一个内存块插入到链表头部。
  • Pop():从链表头部弹出一个内存块。
  • PushRange(Object *start, Object *end, size_t num):将一段内存块链插入到链表头部。
  • PopRange(Object *&start, Object *&end, size_t num):从链表头部弹出一段内存块链。
  • Empty():判断链表是否为空。
  • MaxSize():获取当前链表的最大大小(配合慢启动算法使用)。这个后面在 ThreadCache 中会详细讲解的,这里可以先忽略。
  • Size():获取当前链表的实际大小。

通过 ObjectFreeList 组件,我们可以方便地管理内存块的链表关系,为后续的内存池设计打下基础。

定长内存池 FLMemPool<T>

定长内存池可以看做是一个特殊的内存池,它专门用于分配和管理固定大小的内存块。相比于通用内存池,定长内存池具有更高的分配和释放效率,因为它不需要处理不同大小的内存块,只需要管理一种大小的内存块即可。它是高并发内存池项目中的一个重要组成部分,主要用于为内部固定大小结构体(如 SpanPage 等)分配内存,避免了频繁调用系统内存分配接口带来的性能开销。

具体实现思路如下:

  • 每个 FLMemPool<T> 实例维护一个自由链表 FreeList,用于存储空闲的内存块。
  • 当用户申请内存时,首先从自由链表中获取一个内存块,如果自由链表为空,则从操作系统申请一大块内存,并将其划分为多个固定大小的内存块,插入到自由链表中,然后再从自由链表中获取一个内存块返回给用户。
  • 当用户释放内存时,将内存块插入到自由链表中,等待下一次的分配请求。

代码实现如下:

#pragma once

#include <iostream>
#include <vector>
#include <Windows.h>
#include <memoryapi.h>
#include "Object.hpp"
using std::cin;
using std::cout;

#undef max

const inline size_t chunk_size = 1024 * 128; //每次申请内存块的大小

template <class T>
// 定长内存池(适用于大块内存分配)
class FLMemPool
{
public:
    T *New()
    {
        T *ret = nullptr;
        if (freelist)
        {
            // 如果回收站有空间,直接将内存块从回收站中取出使用
            ret = reinterpret_cast<T *>(freelist->Pop());
        }
        else
        {
            // 当前剩余内存不够用,申请新的内存块
            if (leftMemLen < sizeof(T))
            {
                leftMemLen = chunk_size;
                // 申请内存
                memory = (char *)VirtualAlloc(nullptr, leftMemLen, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
                if (!memory)
                {
                    throw std::bad_alloc();
                }
                // 将新申请的内存块存储起来,方便析构时释放
                blocks.push_back(memory);
            }
            // 否则直接从内存块中分配
            ret = reinterpret_cast<T *>(memory);
            // 保证可以存下单个地址长度以便建立链表
            size_t obj_len = std::max(sizeof(T), sizeof(void *));
            memory += obj_len;
            leftMemLen -= obj_len;
        }
        // 定位new调用构造函数
        new (ret) T;
        return ret;
    }

    void Delete(T *obj)
    {
        obj->~T();
        // 头插法将freelist的地址存放在obj首个地址长度当中
        freelist->Push((Object *)obj);
    }

    // 最终释放内存
    ~FLMemPool()
    {
        freelist = nullptr;
        for (char *block : blocks)
            VirtualFree(block, 0, MEM_RELEASE);
    }

private:
    char *memory = nullptr;
    size_t leftMemLen = 0;
    FreeList *freelist = nullptr;
    std::vector<char *> blocks;
};

通过 FLMemPool<T>,我们可以高效地分配和管理固定大小的内存块,提升内存分配和释放的性能,满足高并发场景下的内存管理需求。避免频繁地调用系统内存分配接口带来的性能开销。

SizeClass 组件

首先我们定义一个内存对齐规则:

针对于用户申请的内存大小 byte:

  • 如果 byte <= 128,则对齐到 8 的倍数。映射到哈希桶:freeList[0]~freeList[15]
  • 如果 128 < byte <= 1024,则对齐到 16 的倍数。映射到哈希桶:freeList[16]~freeList[71]
  • 如果 1024 < byte <= 8 * 1024,则对齐到 128 的倍数。映射到哈希桶:freeList[72]~freeList[127]
  • 如果 8192 < byte <= 64 * 1024,则对齐到 1024 的倍数。映射到哈希桶:freeList[128]~freeList[183]
  • 如果 64 * 1024 < byte <= 256 * 1024,则对齐到 8192 的倍数。映射到哈希桶:freeList[184]~freeList[207]
  • 如果 byte > 256 * 1024,则直接返回 byte,不进行对齐处理

正如前面所说,当用户向我们的内存池申请一个内存块时,我们并不会直接按照用户申请的内存块大小进行分配,而是会将其对齐到最近的等级(size class),这样做的目的是为了实现内存对齐以及避免内存分配的粒度太小以至于管理开销过大,同时保证更好地将内存块管理起来,方便后面内存释放的时候进行内存块合并和归还。

我们利用哈系统的结构设计了 SizeClass 组件来实现内存块的大小等级划分和对齐功能。

SizeClass 组件负责管理内存块的大小等级划分,它提供了将用户申请的内存大小对齐到最近等级的功能。具体实现思路如下:

  • 定义一组内存大小等级(size classes),每个等级对应一个固定大小的内存块。
  • 提供一个静态方法 static inline size_t RoundUp(size_t byte),用于将用户申请的内存大小对齐到最近的等级。
  • 提供一个静态方法 static inline size_t Index(size_t byte),用于获取对应内存大小等级的索引值,方便在内存池中定位对应的自由链表。
  • 提供一个静态方法 static inline size_t NumMoveSize(size_t byte),用于计算用户实际申请的内存大小对应的内存块数量,在 ThreadCache 中向上层 CentralCache 请求内存块时使用(后续会详细讲解)。
  • 提供一个静态方法 static inline size_t NumMovePage(size_t byte),用于计算用户实际申请的内存大小对应的页框数量,在 CentralCache 中向上层 PageCache 请求页框时使用(后续会详细讲解)。

这里我们详细讲解一下 RoundUp 以及 Index 方法的实现思路:

RoundUp 方法

RoundUp 方法的实现思路如下:

首先,我们需要划分用户的申请大小 byte 属于哪个区间,然后根据区间的不同,选择不同的对齐方式进行对齐。具体实现如下:

static inline size_t RoundUp(size_t byte)
{
    if(byte <= 128)
    {
        return _RoundUp(byte, 8);
    }
    else if(byte <= 1024)
    {
        return _RoundUp(byte, 16);
    }
    else if(byte <= 8 * 1024)
    {
        return _RoundUp(byte, 128);
    }
    else if(byte <= 64 * 1024)
    {
        return _RoundUp(byte, 1024);
    }
    else if(byte <= 256 * 1024)
    {
        return _RoundUp(byte, 8 * 1024);
    }
    else
    {
        return _RoundUp(byte, 1 << PAGE_SHIFT);
    }
}

其中,_RoundUp 方法用于将 byte 对齐到 align 的倍数,代码如下:

static inline size_t _RoundUp(size_t byte, size_t alignNum)
{
    return (byte + alignNum - 1) & (~(alignNum - 1));
}

RoundUp 中划分区间的操作很好理解,我们根据 byte 的大小选择不同的对齐方式进行对齐。但是 _RoundUp 方法中的位运算可能不太好理解,我们来详细讲解一下:

通过 byte + alignNum - 1,我们可以确保当 byte 恰好是 alignNum 的倍数时,不会被错误地向上对齐。接下来,通过按位取反运算 ~(alignNum - 1),我们可以得到一个掩码,该掩码的低位部分全为 0,高位部分全为 1。最后,通过按位与运算 &,我们可以将 byte 向下对齐到最近的 alignNum 的倍数。

Index 方法

这个方法用于获取用户申请的内存大小对应的内存等级索引值,也就是我们前面提到的哈希桶索引。实现思路与 RoundUp 方法类似,我们根据 byte 的大小划分区间,然后计算对应的索引值。具体实现如下:

static inline size_t Index(size_t byte)
{
    //前缀桶数
    int pre_sum[5] = {0, 16, 72, 128, 184};
    if(byte <= 128)
    {
        return _Index(byte, 3) + pre_sum[0];
    }
    else if(byte <= 1024)
    {
        return _Index(byte - 128, 4) + pre_sum[1];
    }
    else if(byte <= 8 * 1024)
    {
        return _Index(byte - 1024, 7) + pre_sum[2];
    }
    else if(byte <= 64 * 1024)
    {
        return _Index(byte - 8 * 1024, 10) + pre_sum[3];
    }
    else if(byte <= 256 * 1024)
    {
        return _Index(byte - 64 * 1024, 13) + pre_sum[4];
    }
    assert(false);
    return 0;
}
static inline size_t _Index(size_t byte, size_t alignBit)
{
    return ((byte + (1 << alignBit) - 1) >> alignBit) - 1;
}

Index 方法中,我们需要计算前缀桶数 pre_sum,用于累加前面区间的桶数。然后根据 byte 的大小划分区间,调用 _Index 方法计算对应的索引值,并加上前缀桶数。_Index 方法的实现思路与 _RoundUp 方法类似,我们通过将 byte 加上 alignNum - 1,从而保证只有当 byte 恰好是 alignNum 的倍数时,不会被错误地向上对齐。然后通过右移 alignBit 位,计算出对应的索引值。又因为索引是从 0 开始的,所以我们需要减去 1。

ThreadCache 层(内存申请讲解)

我们按照顺序结构来讲解这个项目,因此这里我们先统一将三个层次的内存申请逻辑讲解完,后面再统一讲解内存回收逻辑。

ThreadCache 组件是高并发内存池项目中的一个重要组成部分,它负责为每个线程维护一个独立的内存缓存区,避免了多线程间的锁竞争,提高了内存分配和释放的效率。ThreadCache 主要通过维护多个自由链表来管理不同大小等级的内存块,当用户申请内存时,首先从 ThreadCache 中获取内存块,如果 ThreadCache 中的内存不足,则从全局的 CentralCache 中获取内存块。

假设用户申请了一块 byte 大小的内存,那么 ThreadCache 会将其对齐到最近的等级(假设是 16 byte),然后从对应的自由链表中获取一个 16 byte 大小的内存块返回给用户。如果对应的自由链表为空,则从 CentralCache 中获取一批 16 byte 大小的内存块,将第一个内存块返回给用户,剩余的内存块插入到对应的自由链表中。

void * ThreadCache::Allocate(size_t byte)
{
    assert(byte <= MAX_BYTES);
    Object *ret = nullptr;
    size_t memlen = SizeClass::RoundUp(byte);
    size_t index = SizeClass::Index(byte);
    if(_freelists[index].Empty())
    {
        ret = FetchFromCentralCache(index, memlen);
    }
    else
    {
        ret = _freelists[index].Pop();
    }
    return ret;
}
  • 首先,我们通过 SizeClass::RoundUp(byte) 方法将用户申请的内存大小 byte 对齐到最近的等级 memlen。
  • 然后,我们通过 SizeClass::Index(byte) 方法获取对应内存大小等级的索引值 index(方便在内存池中定位对应的自由链表)。
  • 接着,我们检查对应的自由链表 _freelists[index] 是否为空。
    • 如果为空,则调用 FetchFromCentralCache(index, memlen) 方法从 CentralCache 中获取一批内存块,并返回其中的一个内存块给用户。
      • FetchFromCentralCache 方法会向 CentralCache 请求一批内存块,并将剩余的内存块插入到对应的自由链表中。
    • 如果不为空,则直接从自由链表中弹出一个内存块返回给用户。

下面我们来看一下 FetchFromCentralCache 方法的实现:

Object* ThreadCache::FetchFromCentralCache(size_t index, size_t byte)
{
    // 计算实际获取的块数量
    size_t batchNum = std::min(SizeClass::NumMoveSize(byte), _freelists[index].MaxSize());
    if(batchNum == _freelists[index].MaxSize())
    {
        // 慢启动算法(近似网络拥塞控制算法)
        // 翻倍有界限,不能超过middle_level,否则会导致过多内存占用
        if(_freelists[index].MaxSize() < std::min(middle_level, SizeClass::NumMoveSize(byte) >> 1)) _freelists[index].MaxSize() *= 2;
        else _freelists[index].MaxSize() += 1;
    }

    // 一切就绪,准备从中央缓存获取内存
    // 处理预备数据存放到freelists以及就绪数据返回
    Object *start = nullptr, *end = nullptr;
    size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, byte);
    if(start != end)
    {
        _freelists[index].PushRange(start->next, end, actualNum - 1);
    }
    return start;
}

可以看到,这里用到了我们前面提到的 SizeClass::NumMoveSize(byte) 方法来计算用户实际申请的内存大小对应的内存块数量。我把源码贴出来:

static inline size_t NumMoveSize(size_t byte)
{
    //控制获取的freelist在[2, 512]范围内
    assert(byte > 0);
    size_t num = MAX_BYTES / byte;
    if(num < low_level) num = low_level;
    else if(num > up_level) num = up_level;
    return num;
}

当用户向我们申请一块 byte 大小的内存时,意味着用户可能会频繁地向我们申请内存块,因此我们需要预先准备好一定数量的内存块以应对用户的频繁申请。NumMoveSize 方法通过计算 MAX_BYTES / byte 来确定预先准备的内存块数量,并将其限制在 [low_level, up_level] 范围内,避免预先准备过多或过少的内存块。而 NumMoveSize 就是用来确定一个上界,防止我们一次性从 CentralCache 中获取过多的内存块,导致内存占用过高。

这段代码的具体逻辑如下:

  • 首先,我们计算实际获取的块数量 batchNum,它是用户实际申请的内存大小对应的内存块数量和当前自由链表最大大小的最小值。
  • 然后,我们检查 batchNum 是否等于当前自由链表的最大大小:
    • 如果相等,则说明当前自由链表已经达到了最大容量,我们需要进行慢启动算法来调整最大容量。
      • 慢启动算法类似于网络拥塞控制算法,我们通过翻倍的方式增加最大容量,但有一个上限 middle_level,避免过多内存占用。
      • 如果当前最大容量小于 middle_level,则将最大容量翻倍;否则,最大容量加 1。
  • 接着,我们调用 CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, byte) 方法从 CentralCache 中获取一批内存块,并将其存储在 startend 指针中,可以看到,我们需要通过一个叫做 actualNum 的变量来记录实际获取到的内存块数量,这是因为实际获取到的内存块数量可能少于请求的数量,因为 PageCache 中申请的是以页为单位的内存块,无法满足其提供给 CentralCache 的页数刚好满足 CentralCache 实际需要的总 Span 数量,又为了避免内存浪费,所以在 PageCache 中会采用向下取整的方案来折中,后面再 PageCache 中会重点讲到这一点,这里可以简单了解一下。
  • 最后,我们将获取到的内存块中除第一个内存块外的剩余内存块插入到对应的自由链表中,并返回第一个内存块给用户。

下面这个流程图总结了 ThreadCache 层的内存申请逻辑:

graph TD
    %% 样式优化(可选,让流程图更美观)
    classDef keyNode fill:#f9f,stroke:#333,stroke-width:2px;
    classDef decisionNode fill:#9ff,stroke:#333,stroke-width:2px;
    
    %% 核心流程
    A["用户发起内存申请<br/>(size: 待申请字节数)"]:::keyNode --> B{"size ≤ ThreadCache最大阈值?"}:::decisionNode
    
    %% 小内存申请分支(核心逻辑)
    B --  --> C["计算对齐后内存大小<br/>mem_len = SizeClass::RoundUp(size)"]
    C --> D["计算对应自由链表索引<br/>index = SizeClass::Index(size)"]
    D --> E{"ThreadCache._freelists[index] 非空?"}:::decisionNode
    
    %% 自由链表有可用内存
    E --  --> F["从_freelists[index]弹出一个内存块"]
    F --> G["返回内存块给用户"] --> H["内存申请完成"]
    
    %% 自由链表为空,向CentralCache申请
    E --  --> I["计算批量申请数量batch_num<br/>(取SizeClass预设值 & 慢启动上限最小值)"]
    I --> J["调用CentralCache::FetchRangeObj<br/>批量获取batch_num个内存块"]
    J --> K{"实际获取到actual_num > 0?"}:::decisionNode
    
    K --  --> L["将1个内存块返回给用户"]
    L --> M["剩余(actual_num-1)个内存块<br/>插入ThreadCache._freelists[index]"] --> H
    
    
    
    %% 大内存申请分支(跳过ThreadCache)
    B --  --> O["直接调用系统接口分配大块内存"] --> H

    %% 备注说明(可选,补充关键细节)
    note["📌 关键细节:<br/>1. SizeClass:内存大小对齐规则,避免内存碎片<br/>2. 慢启动:动态调整batch_num,平衡性能与内存占用<br/>3. ThreadCache:每个线程私有,无锁访问"]
    linkStyle 0 stroke:#666,stroke-width:1px;
    linkStyle 10 stroke:#f66,stroke-width:1.5px,color:red;

CentralCache 层(内存申请讲解)

CentralCache 组件是高并发内存池项目中的另一个重要组成部分,它负责管理不同大小等级的内存块,并将其分配给各个线程的 ThreadCacheCentralCache 主要通过维护多个自由链表来管理不同大小等级的内存块,当 ThreadCache 请求内存时,CentralCache 会从对应的链表中分配内存块。

如果说我们的这个内存申请架构有三层:ThreadCacheCentralCachePageCache,那么 CentralCache 就是中间层,负责协调 ThreadCachePageCache 之间的内存分配请求。因此,并不是因为为了提高性能才设计了 CentralCache,而是为了更好地协调内存分配请求,提升整体内存管理的效率。所以说,CentralCache 的内存申请逻辑与 ThreadCache 有些类似,但是它需要处理更多的细节,往上他需要协调 PageCache 的内存分配请求,往下他需要满足多个线程的内存分配请求,需要考虑线程安全问题。

承接上文,ThreadCache 若检查发现自己的自由链表为空,就会向 CentralCache 请求了一批内存块(假设是 16 byte 大小的内存块),那么 CentralCache 会检查对应的自由链表是否有足够的内存块可用。如果有,则直接从自由链表中分配内存块返回给 ThreadCache。如果没有,则从 PageCache 中获取更大的内存块,将其划分为多个 16 byte 大小的内存块,插入到对应的自由链表中,然后再从自由链表中分配内存块返回给 ThreadCache

CentralCache 是利用 Span 这个组件来管理内存块的,Span 负责管理一批相同大小的内存块,并维护一个自由链表用于存储这些内存块。当 CentralCache 需要分配内存块时,它会从对应的 Span 中获取内存块。

首先,我们来看一下 Span 组件的定义:

struct Span
{
    PAGE_ID pageId; //页编号
    Span *prev = nullptr;
    Span *next = nullptr;
    size_t n = 0; //页数量
    size_t useCount = 0;
    size_t objSize = 0; //小块内存大小
    Object *freelist = nullptr;
    bool isusing = false;
};
  • 页编号用于标识该 Span 对应的内存页,是直接通过地址和页大小计算得到的
  • n 表示该 Span 管理的页数量
  • useCount 表示该 Span 当前正在使用的内存块数量,方便后续进行内存回收
  • objSize 表示该 Span 管理的小块内存大小
  • freelist 是一个指向该 Span 自由链表头节点的指针
  • isusing 用于标记该 Span 是否正在被使用,避免重复分配,也方便后续进行内存回收

可以看到,我们的 Span 同样是需要通过链式结构来进行管理的,因此我们还设计了 SpanList 组件用于管理 Span 的链表关系,代码如下:

class SpanList
{
public:
    Span *Begin()
    {
        return head->next;
    }
    Span *End()
    {
        return head;
    }
    SpanList()
    {
        head = GetSpanPool().New();
        head->prev = head;
        head->next = head;
    }
    void Insert(Span *pos, Span *newSpan)
    {
        assert(pos);
        assert(newSpan);
        pos->prev->next = newSpan;
        newSpan->prev = pos->prev;
        newSpan->next = pos;
        pos->prev = newSpan;
    }
    void Erase(Span *delSpan)
    {
        assert(delSpan);
        assert(delSpan != head);
        delSpan->prev->next = delSpan->next;
        delSpan->next->prev = delSpan->prev;
    }
    void PushFront(Span *newSpan)
    {
        Insert(Begin(), newSpan);
    }
    Span *PopFront()
    {
        Span *front = head->next;
        Erase(front);
        return front;
    }
    bool Empty()
    {
        return head->next == head;
    }
    ~SpanList()
    {
        GetSpanPool().Delete(head);
    }
private:
    Span *head;
public:
    Mutex mtx; //桶锁
};

SpanList 组件提供了以下功能:

  • Begin():获取链表的第一个节点。
  • End():获取链表的尾节点(哨兵节点)。
  • Insert(Span *pos, Span *newSpan):在指定位置插入一个新的 Span 节点。
  • Erase(Span *delSpan):删除指定的 Span 节点。
  • PushFront(Span *newSpan):在链表头部插入一个新的 Span 节点。
  • PopFront():从链表头部弹出一个 Span 节点。
  • Empty():判断链表是否为空。

SpanList 组件还包含了一个桶锁 mtx,用于保护链表的线程安全。因为后续 CentralCache 是被多线程共享的,所以需要加锁来保证线程安全。在 CentralCache 中,我们需要维护多个 SpanList,每个 SpanList 对应一个内存大小等级的自由链表,因此给每一个SpanList 桶都加上了一个桶锁使得多线程访问时不会出现数据竞争的问题。这样设计也可以避免直接给整个 CentralCache 加锁,提升并发性能。

下面我们来看一下 CentralCache 中的 FetchRangeObj 方法的实现:

size_t CentralCache::FetchRangeObj(Object *&start, Object *&end, size_t batchNum, size_t byte)
{
    // 计算对应的spanlist下标
    size_t index = SizeClass::Index(byte);
    // 和 ThreadCache 不同,CentralCache 是被多线程共享的,所以需要加锁,而 ThreadCache 是线程本地的,不需要加锁
    spanLists[index].mtx.lock();

    Span *span = GetOneSpan(spanLists[index], byte);
    assert(span);
    assert(span->freelist);

    start = span->freelist;
    end = start;
    size_t actual_num = 1;
    //最多获取到链表尾部
    while(actual_num < batchNum && end->next != nullptr)
    {
        end = end->next;
        ++actual_num;
    }
    // 不需要的部分还回去
    span->freelist = end->next;
    end->next = nullptr;
    // 更新span的使用计数(引用计数用于后面的内存回收)
    span->useCount += actual_num;
    spanLists[index].mtx.unlock();
    return actual_num;
}
  • 首先,我们通过 SizeClass::Index(byte) 方法获取对应内存大小等级的索引值 index(方便在内存池中定位对应的自由链表)。
  • 然后,我们加锁对应的自由链表 spanLists[index],因为 CentralCache 是被多线程共享的,所以需要加锁,而 ThreadCache 是线程本地的,不需要加锁。
  • 接着,我们调用 GetOneSpan(spanLists[index], byte) 方法从对应的自由链表中获取一个 Span 对象,Span 对象负责管理一批相同大小的内存块。
  • 然后,我们从 Span 对象的自由链表中获取一批内存块,直到获取到 batchNum 个内存块或者链表尾部为止。
  • 接着,我们将不需要的部分还回去,更新 Span 对象的使用计数(引用计数用于后面的内存回收,后面需要根据引用计数是否为0来决定是否回收该内存块)。
  • 最后,我们解锁对应的自由链表,并返回实际获取到的内存块数量。

呼应上文,在 ThreadCache 中我们之所以需要接收参数 actualNum,是因为 CentralCache 中的 FetchRangeObj 方法返回的实际获取到的内存块数量可能少于请求的数量,因为 PageCache 中申请的是以页为单位的内存块,无法满足其提供给 CentralCache 的页数刚好满足 CentralCache 实际需要的总 Span 数量,又为了避免内存浪费,所以在 PageCache 中会采用向下取整的方案来折中。

然后,我们来看一下 GetOneSpan 方法的实现:

Span* CentralCache::GetOneSpan(SpanList &list, size_t byte)
{
    Span *it = list.Begin();
    //若找得到剩余的Span则直接返回
    while(it != list.End())
    {
        if(it->freelist != nullptr) return it;
        it = it->next;
    }

    //先释放桶锁,别的线程可能正在释放资源
    list.mtx.unlock();

    //需要向上层PageCache申请Page进行分割
    PageCache::GetInstance()->mtx.lock();
    Span *span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(byte));
    // 标记正在使用中
    span->isusing = true;
    // 设置小块内存大小
    span->objSize = byte;
    PageCache::GetInstance()->mtx.unlock();

    //对页切片
    std::byte *start = (std::byte*)(span->pageId << PAGE_SHIFT);
    size_t size = span->n << PAGE_SHIFT;
    std::byte *end = start + size;

    span->freelist = reinterpret_cast<Object *>(start);
    start += byte;
    Object *tail = span->freelist;
    // 将span切分成多个小块内存
    while(start < end)
    {
        tail->next = reinterpret_cast<Object *>(start);
        tail = tail->next;
        start += byte;
    }
    tail->next = nullptr;
    //回去之前上锁
    list.mtx.lock();
    
    list.PushFront(span);
    return span;
}

这段代码逻辑如下:

  • 首先,我们遍历对应的自由链表 list,尝试找到一个有可用内存块的 Span 对象。
    • 如果找到了,则直接返回该 Span 对象,意味着我们可以直接从该 Span 中分配内存块,无需向上层申请新的内存。
    • 如果没有找到,则说明当前自由链表中没有可用的 Span 对象,我们需要向上层的 PageCache 申请新的内存页。
  • 然后,我们释放当前自由链表的锁,避免临界资源竞争,因为这个时候我们完全用不到该自由链表,其他线程可能正在释放资源。
  • 接着,我们加锁 PageCache,调用 PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(byte)) 方法向 PageCache 申请一批新的内存页,并返回一个新的 Span 对象。
  • 然后,我们标记该 Span 对象为正在使用中,并设置小块内存大小 objSize
  • 接着,我们对申请到的内存页进行切片,将其划分为多个小块内存,并将其插入到 Span 对象的自由链表中。
  • 最后,我们重新加锁对应的自由链表,将新的 Span 对象插入到自由链表的头部,并返回该 Span 对象。

我们来重点看一下 SizeClass::NumMovePage(byte) 方法的实现:

{
    size_t num = NumMoveSize(byte);
    size_t total_byte = num * byte;
    size_t page_num = total_byte >> PAGE_SHIFT;
    //page_num至少为1
    return std::max((size_t)1, page_num);
}

代码很简单,我们首先通过 NumMoveSize(byte) 方法计算出用户实际申请的内存大小对应的内存块数量 num,然后计算出总的内存大小 total_byte,最后通过右移 PAGE_SHIFT 位来计算出对应的页框数量 page_num。为了保证至少申请一个页框,我们返回 std::max((size_t)1, page_num)。之所以选择向下取整,是为了避免内存浪费,因为页框的大小通常比较大,如果我们向上取整,可能会导致申请过多的内存,浪费系统资源。

通过 CentralCache,我们可以高效地管理不同大小等级的内存块,协调 ThreadCachePageCache 之间的内存分配请求,提升整体内存管理的效率,满足高并发场景下的内存管理需求。

同样的,我们用一张流程图来总结 CentralCache 层的内存申请逻辑:

graph TD
    %% 样式定义
    classDef keyNode fill:#f9f,stroke:#333,stroke-width:2px;
    classDef decisionNode fill:#9ff,stroke:#333,stroke-width:2px;
    classDef lockNode fill:#ff9,stroke:#333,stroke-width:2px;
    
    %% 核心流程
    A["ThreadCache请求批量内存块<br/>(batch_num个byte大小块)"]:::keyNode --> B["计算对应SpanList索引<br/>index = SizeClass::Index(byte)"]
    B --> C["对spanLists[index]加锁<br/>(保证线程安全)"]:::lockNode
    C --> D["遍历spanLists[index]查找<br/>有可用内存的Span"]
    D --> E{"找到可用Span?"}:::decisionNode
    
    %% 找到可用Span分支
    E --  --> F["从该Span的freelist中<br/>获取actual_num个内存块<br/>(最多batch_num个)"]
    F --> G["更新Span的freelist(保留剩余块)"]
    G --> H["增加Span的useCount(引用计数)"]
    H --> I["对spanLists[index]解锁"]:::lockNode
    I --> J["返回获取到的内存块链<br/>(start~end)及actual_num"] --> K["CentralCache内存分配完成"]
    
    %% 未找到可用Span分支
    E --  --> L["先对spanLists[index]解锁"]:::lockNode
    L --> M["计算需要向PageCache申请的页数<br/>page_num = SizeClass::NumMovePage(byte)"]
    M --> N["对PageCache加锁"]:::lockNode
    N --> O["调用PageCache::NewSpan(page_num)<br/>获取新的内存页块"]
    O --> P["对新Span标记isusing=true<br/>并设置objSize=byte"]
    P --> Q["对PageCache解锁"]:::lockNode
    Q --> R["将新申请的内存页切分成<br/>多个byte大小的内存块<br/>(构建成freelist)"]
    R --> S["重新对spanLists[index]加锁"]:::lockNode
    S --> T["将新Span插入spanLists[index]头部"]
    T --> F
    
    %% 备注说明
    note["📌 关键细节:<br/>1. 加锁粒度:仅锁定当前size等级的SpanList,减少锁竞争<br/>2. Span管理:每个Span对应一批同大小内存块,通过useCount跟踪使用情况<br/>3. 内存切片:从PageCache获取的大块内存需切分成小块后再分配"]

PageCache 层(内存申请讲解)

CentralCache 发现自己的自由链表 SpanList 中没有可用的 Span 对象时,它就会向上层的 PageCache 请求一批新的内存页。PageCache 组件负责管理系统内存页的分配和回收,它通过直接调用操作系统的内存分配接口来获取大块的内存页,然后将其划分为多个 Span 对象,供 CentralCache 使用。

在讲解 PageCache 的核心方法之前,我们先来看一下 PageCache 组件的定义:

class PageCache
{
public:
    static PageCache *GetInstance()
    {
        return &_pageCache;
    }
    Span *NewSpan(size_t k);
    Span *MapAddrToSpan(void *addr);
    void ReleaseSpanToPageCache(Span *span);
private:
    PageCache() = default;
    PageCache(const PageCache&) = delete;
    //防止new/delete误操作
    void* operator new(size_t) = delete;
    void* operator new[](size_t) = delete;
    void operator delete(void*) = delete;
    void operator delete[](void*) = delete;
private:
    static PageCache _pageCache;
public:
    std::mutex mtx;
    // 用于存放不同大小Span的空闲链表
    SpanList _spanLists[NPAGES + 1];
    // std::unordered_map<PAGE_ID, Span*> PageIdToSpan;
    TCMalloc_PageMap3<PAGE_BYTE - PAGE_SHIFT> PageIdToSpan;
};

inline PageCache PageCache::_pageCache;

可以看到,PageCache 组件同样是通过链式结构来管理 Span 对象的,因此我们也设计了多个 SpanList 来存放不同大小等级的 Span 对象。同时,PageCache 还维护了一个 PageIdToSpan 映射表,用于将页编号映射到对应的 Span 对象,方便后续进行内存回收。这里我们需要重点讲解一下 PageIdToSpan,它是通过 TCMalloc_PageMap3 这个组件来实现的,TCMalloc_PageMap3 组件是一个高效的页编号到数据指针的映射表,它通过多级索引结构来实现高效的映射和查找操作,而有关 TCMalloc_PageMap3 组件的具体实现这里就不展开讲解了,感兴趣的读者可以参考相关资料进行深入了解。博主也是直接搬过来用的。为什么需要它呢?我想这个还是放在内存释放部分来讲解会更合适一些。这里我们需要先记住,PageIdToSpan 用于将页编号映射到对应的 Span 对象,方便后续进行内存回收。因此我们在申请内存的时候,也需要维护 PageIdToSpan 映射表。

NewSpan 方法的实现

Span *PageCache::NewSpan(size_t k)
{
    // assert(k >= 1 && k <= NPAGES);
    // 如果请求的页数大于NPAGES,直接向系统申请大块内存
    if (k > NPAGES)
    {
        void *ptr = VirtualAlloc(0, k << PAGE_SHIFT, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
        if (ptr == nullptr)
            throw std::bad_alloc();
        // 将申请到的内存封装成Span返回,因为 Span 的数量可能非常多,而对象又很单一,因此直接存放在定长内存池中进行管理
        Span *newSpan = GetSpanPool().New();
        newSpan->n = k;
        newSpan->pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
        // PageIdToSpan[newSpan->pageId] = newSpan;
        PageIdToSpan.EnsureSet(newSpan->pageId, newSpan);
        return newSpan;
    }
    // 检查当前Span下有没有可用的PAGE
    if (!_spanLists[k].Empty())
    {
        Span *kspan = _spanLists[k].PopFront();
        // 建立返回的kspan其PageId与自身映射的地址关系
        if (!PageIdToSpan.Ensure(kspan->pageId, kspan->n)) {
            assert(false);
        }
        for (PAGE_ID i = 0; i < kspan->n; ++i)
        {
            PageIdToSpan.set(kspan->pageId + i, kspan);
        }
        return kspan;
    }
    // 否则去后面查看是否有非空Span可使用来分割
    for (int i = k + 1; i <= NPAGES; ++i)
    {
        if (_spanLists[i].Empty())
            continue;
        // 发现非空Span,可以将其分割为两份Span
        Span *span = _spanLists[i].PopFront();
        Span *kspan = GetSpanPool().New();
        kspan->pageId = span->pageId;
        kspan->n = k;
        span->pageId += k;
        span->n -= k;
        _spanLists[i - k].PushFront(span);

        // 存放Span的首尾PageId用于PageCache合并Span块
        PageIdToSpan.EnsureSet(span->pageId, span);
        PageIdToSpan.EnsureSet(span->pageId + span->n - 1, span);
        
        // 建立返回的kspan其PageId与自身映射的地址关系
        if (!PageIdToSpan.Ensure(kspan->pageId, kspan->n)) {
            assert(false);
        }
        for (PAGE_ID i = 0; i < kspan->n; ++i)
        {
            PageIdToSpan.set(kspan->pageId + i, kspan);
        }
        return kspan;
    }

    // 此时说明没有大块Span供以分割,此时需要向上层(系统)索要一块堆空间
    void *ptr = VirtualAlloc(0, NPAGES << PAGE_SHIFT, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
    if (ptr == nullptr)
        throw std::bad_alloc();
    // 将该内存分成两份
    Span *newSpan = GetSpanPool().New();
    newSpan->n = NPAGES;
    newSpan->pageId = (PAGE_ID)ptr >> PAGE_SHIFT;

    // 直接从newSpan中切割
    Span *kspan = GetSpanPool().New();
    kspan->pageId = newSpan->pageId;
    kspan->n = k;
    newSpan->pageId += k;
    newSpan->n -= k;

    // PageIdToSpan[newSpan->pageId] = newSpan;
    // PageIdToSpan[newSpan->pageId + newSpan->n - 1] = newSpan;
    PageIdToSpan.EnsureSet(newSpan->pageId + newSpan->n - 1, newSpan);
    PageIdToSpan.EnsureSet(newSpan->pageId, newSpan);

    if (!PageIdToSpan.Ensure(kspan->pageId, kspan->n)) {
        assert(false);
    }
    for (PAGE_ID i = 0; i < kspan->n; ++i)
    {
        PageIdToSpan.set(kspan->pageId + i, kspan);
    }
    _spanLists[newSpan->n].PushFront(newSpan);
    return kspan;
}

可以看到,在 PageCache 当中,我们需要频繁申请内存用于创建 Span 对象,这正是我们之前提到的对一个特定类型对象进行定长内存池管理的典型场景,因此我们同样使用了 GetSpanPool() 方法来获取一个定长内存池,用于管理 Span 对象的分配和回收:

// 全局内存池(延迟初始化,避免顺序问题)
inline FLMemPool<Span>& GetSpanPool() {
    static FLMemPool<Span> pool;
    return pool;
}

我们直接通过静态变量实现现代风格的单例模式(Meyer's Singleton),避免了顺序初始化的问题。

NewSpan 的逻辑如下:

  • 首先,我们检查请求的页数 k 是否大于 NPAGES(预设的最大页数)。
    • 如果是,则直接调用操作系统的内存分配接口 VirtualAlloc 申请一块大块内存,并将其封装成一个新的 Span 对象返回。
    • 如果不是,则继续检查当前大小等级 k 的 SpanList 是否有可用的 Span 对象。
  • 如果当前大小等级 k 的 SpanList 非空,则直接从中弹出一个 Span 对象返回,并在 PageIdToSpan 映射表中建立页编号到 Span 对象的映射关系。
  • 如果当前大小等级 k 的 SpanList 为空,则继续向后查找更大大小等级的 SpanList,尝试找到一个非空的 Span 对象进行分割。
    • 如果找到了一个非空的 Span 对象,则将其分割为两份,一份大小为 k 的 Span 对象返回给调用者,另一份剩余大小的 Span 对象重新插入到对应的 SpanList 中,并在 PageIdToSpan 映射表中建立页编号到 Span 对象的映射关系。
    • 如果没有找到非空的 Span 对象,则说明当前内存池中没有足够的内存可用,我们需要向操作系统申请一块大块内存。我们调用 VirtualAlloc 申请一块大小为 NPAGES 页的内存,并将其划分为两份,一份大小为 k 的 Span 对象返回给调用者,另一份剩余大小的 Span 对象插入到对应的 SpanList 中,并在 PageIdToSpan 映射表中建立页编号到 Span 对象的映射关系。

不论是分割还是新申请内存,我们都需要在 PageIdToSpan 映射表中建立页编号到 Span 对象的映射关系,方便后续进行内存回收。

通过 PageCache,我们可以高效地管理系统内存页的分配和回收,满足 CentralCache 对内存页的分配请求,提升整体内存管理的效率,满足高并发场景下的内存管理需求。

同样,我们用一张流程图来总结 PageCache 层的内存申请逻辑:

graph TD
    classDef keyNode fill:#f9f,stroke:#333,stroke-width:2px;
    classDef decisionNode fill:#9ff,stroke:#333,stroke-width:2px;
    
    A["CentralCache请求内存页<br/>(需求: 按NumMovePage计算的页数)"]:::keyNode --> B["PageCache加锁"]
    
    B --> C{"对应页大小的SpanList有可用Span?"}:::decisionNode
    C --  --> D["取出该Span并标记为使用中"]
    D --> E["解锁PageCache"] --> F["返回Span给CentralCache"] --> G["流程结束"]
    
    C --  --> H{"更大页大小的SpanList有可用Span?"}:::decisionNode
    H --  --> I["取出大Span并分裂为所需大小的小Span"]
    I --> J["剩余部分重新插入对应SpanList"] --> D
    
    H --  --> K["向操作系统申请连续物理页<br/>(VirtualAlloc)"]
    K --> L["创建新Span管理这些页<br/>(设置pageId、页数n等)"]
    L --> M["将新Span插入对应SpanList"] --> D

    note["📌 关键细节:<br/>1. 三级基数树:管理页映射关系,支持高效查询<br/>2. 页分裂:大Span按需拆分为小Span,减少内存浪费<br/>3. 全局锁:PageCache是全局唯一,操作需加锁保证线程安全<br/>4. 直接系统调用:当无可用Span时,直接向OS申请大块内存"]

这样一来,我们就完整地讲解了高并发内存池项目中三层内存申请架构的设计与实现,包括 ThreadCacheCentralCachePageCache 三个组件的内存申请逻辑。通过这种分层设计,我们不仅提升了内存分配的效率,还有效地降低了内存碎片化的问题,满足了高并发场景下的内存管理需求。

上面有一些地方我提到过会在内存释放部分进行讲解,所以接下来我们会继续深入探讨高并发内存池项目中的内存释放机制,了解如何高效地回收和管理内存资源。

内存释放机制讲解

在高并发内存池项目中,内存释放机制同样是一个关键的组成部分。有效的内存释放策略不仅可以减少内存泄漏,还能降低内存碎片化,提高内存利用率。接下来,我们将详细讲解 ThreadCacheCentralCachePageCache 三个组件的内存释放逻辑。

当用户释放一块内存时,首先会进入 ThreadCache 层。ThreadCache 会根据释放的内存块大小,将其插入到对应的自由链表中,供后续的内存分配请求使用。具体实现如下:

void ThreadCache::Deallocate(void *ptr_, size_t byte)
{
    assert(ptr_);
    Object *ptr = reinterpret_cast<Object *>(ptr_);
    assert(byte <= MAX_BYTES);
    size_t index = SizeClass::Index(byte);
    _freelists[index].Push(ptr);

    if(_freelists[index].Size() >= _freelists[index].MaxSize())
    {
        ListTooLong(_freelists[index], byte);
    }
}
  • 首先,我们通过 SizeClass::Index(byte) 方法获取对应内存大小等级的索引值 index。
  • 然后,我们将释放的内存块插入到对应的自由链表 _freelists[index] 中。
  • 接着,我们检查该自由链表的大小是否超过了预设的最大容量。
    • 如果超过了,则调用 ListTooLong 方法将多余的内存块释放回 CentralCache

假若我们正好释放了一块内存,而这块内存所在的自由链表已经满了,那么 ListTooLong 方法会将多余的内存块批量释放回 CentralCache,具体实现如下:

void ThreadCache::ListTooLong(FreeList &list, size_t byte)
{
    Object *start = nullptr, *end = nullptr;
    list.PopRange(start, end, list.MaxSize());
    CentralCache::GetInstance()->ReleaseListToSpans(start, byte);
}
  • 首先,我们从自由链表中弹出所有的内存块,保存在 startend 指针中。
  • 然后,我们调用 CentralCache::GetInstance()->ReleaseListToSpans(start, byte) 方法将这些内存块释放回 CentralCache

这样一来,我们就来到了 CentralCache 层的内存释放逻辑。CentralCache 会根据释放的内存块大小,将其插入到对应的 Span 对象的自由链表中,供后续的内存分配请求使用。具体实现如下:

void CentralCache::ReleaseListToSpans(Object *start, size_t byte)
{
    size_t index = SizeClass::Index(byte);
    spanLists[index].mtx.lock();
    while(start)
    {
        Object *next = start->next;
        //获取映射的*span并归还
        Span *span = PageCache::GetInstance()->MapAddrToSpan(start);
        start->next = span->freelist;
        span->freelist = start;
        span->useCount--;
        
        //若userCount为0,则说明所有内存块均已归还,此时可以将内存块交给上层
        if(span->useCount == 0)
        {
            spanLists[index].Erase(span);

            span->freelist = nullptr;
            span->next = nullptr;
            span->prev = nullptr;

            spanLists[index].mtx.unlock();
            PageCache::GetInstance()->mtx.lock();
            PageCache::GetInstance()->ReleaseSpanToPageCache(span);
            PageCache::GetInstance()->mtx.unlock();
            spanLists[index].mtx.lock();
        }

        start = next;
    }
    spanLists[index].mtx.unlock();
}

其中 MapAddrToSpan 方法的实现如下:

Span *PageCache::MapAddrToSpan(void *addr)
{
    PAGE_ID pageId = (PAGE_ID)addr >> PAGE_SHIFT;
    auto ret = (Span *)PageIdToSpan.get(pageId);
    assert(ret != nullptr);
    return ret;
}

该方法用于将内存地址映射到对应的 Span 对象,具体逻辑是通过计算页编号 pageId,然后在 PageIdToSpan 映射表中查找对应的 Span 对象并返回。这里利用内存地址和页大小的关系,可以高效地定位到对应的 Span 对象。

ReleaseListToSpans 的逻辑如下:

  • 首先,我们通过 SizeClass::Index(byte) 方法获取对应内存大小等级的索引值 index,并加锁对应的 SpanList
  • 然后,我们遍历释放的内存块链表,对于每一个内存块:
    • 我们通过 PageCache::GetInstance()->MapAddrToSpan(start) 方法获取该内存块对应的 Span 对象。
    • 然后,我们将该内存块插入到 Span 对象的自由链表中,并将 useCount 减一。
    • 接着,我们检查 useCount 是否为零。
      • 如果为零,则说明该 Span 对象中的所有内存块均已归还,此时我们可以将该 Span 对象释放回 PageCache。- 最后,我们解锁对应的 SpanList

可以看到,前面我们之所以对每一个被创建出来的 Span 对象都维护了一个 useCount,正是为了在内存释放时能够准确地判断该 Span 对象中的内存块是否全部被归还,从而决定是否将该 Span 对象释放回 PageCache

假若我们正好释放了一块内存,而这块内存所在的 Span 对象的 useCount 变为零,那么 CentralCache 会将该 Span 对象释放回 PageCache,具体实现如下:

void PageCache::ReleaseSpanToPageCache(Span *span)
{
    // 如果span的页数大于NPAGES,说明是直接向系统申请的大块内存,直接释放回系统
    if(span->n > NPAGES)
    {
        void *ptr = (void*)(span->pageId << PAGE_SHIFT);
        VirtualFree(ptr, 0, MEM_RELEASE);
        GetSpanPool().Delete(span);
        return;
    }
    // 否则进行合并操作
    // 查找span前面可以合并的Span
    while (true)
    {
        PAGE_ID prev = span->pageId - 1;
        auto prevspan = (Span *)PageIdToSpan.get(prev);

        // 如果找不到或者找到的页正在被使用或者找到的页大小合并后总大小大于NPAGES,则退出循环
        // 为什么不能超过 NPAGES?
        // 因为 PageCache 只管理最多 NPAGES 页的 Span,超过这个大小的 Span 应该直接释放回系统或者等到下一次申请时再处理
        if (prevspan == nullptr || prevspan->isusing || span->n + prevspan->n > NPAGES)
            break;

        // 否则开始合并
        span->pageId = prevspan->pageId;
        span->n += prevspan->n;

        // 删除被合并的span
        _spanLists[prevspan->n].Erase(prevspan);
        GetSpanPool().Delete(prevspan);
    }
    // 查找span后面可以合并的Span
    while (true)
    {
        PAGE_ID next = span->pageId + span->n;
        auto nextspan = (Span *)PageIdToSpan.get(next);

        // 如果找不到或者找到的页正在被使用或者找到的页大小合并后总大小大于NPAGES,则退出循环
        if (nextspan == nullptr || nextspan->isusing || span->n + nextspan->n > NPAGES)
            break;

        // 否则开始合并
        span->n += nextspan->n;

        // 删除被合并的span
        _spanLists[nextspan->n].Erase(nextspan);
        GetSpanPool().Delete(nextspan);
    }

    // 修改合并好的 span 的 状态
    span->isusing = false;
    _spanLists[span->n].PushFront(span);
    // PageIdToSpan[span->pageId] = span;
    // PageIdToSpan[span->pageId + span->n - 1] = span;
    PageIdToSpan.EnsureSet(span->pageId, span);
    PageIdToSpan.EnsureSet(span->pageId + span->n - 1, span);
}

这段代码逻辑如下:

  • 首先,我们检查 Span 对象的页数是否大于 NPAGES。
    • 如果是,则说明该 Span 对象是直接向系统申请的大块内存,我们直接调用操作系统的内存释放接口 VirtualFree 将其释放回系统,并将 Span 对象从内存池中删除。
    • 如果不是,则继续进行合并操作。
  • 然后,我们尝试查找并合并前面的相邻 Span 对象。
    • 我们通过计算前一个页编号 prev,在 PageIdToSpan 映射表中查找对应的 Span 对象 prevspan
    • 如果找不到 prevspan,或者 prevspan 正在被使用,或者合并后的总大小超过 NPAGES,则退出循环。
    • 否则,我们将 prevspan 合并到当前的 Span 对象中,并将 prevspan 从对应的 SpanList 中删除,并将其从内存池中删除。
  • 接着,我们尝试查找并合并后面的相邻 Span 对象,逻辑与前面的合并操作类似。
  • 最后,我们将合并后的 Span 对象标记为未使用状态,并将其插入到对应的 SpanList 中,同时在 PageIdToSpan 映射表中更新页编号到 Span 对象的映射关系。

为什么我们要给 PageCache 也维护一个 SpanList?想必到这里大家就彻底明白了,首先 PageCache 在分配 Span 对象时,如果发现没有合适的 Span 对象可用,它会尝试从更大大小等级的 SpanList 中分割出一个合适大小的 Span 对象;其次,在释放 Span 对象时,PageCache 也会尝试将相邻的空闲 Span 对象进行合并,以减少内存碎片化,提高内存利用率。再说根本一些,PageCache 维护 SpanList 是为了更高效地管理和复用内存页资源,提升整体内存管理的效率,避免频繁地向操作系统申请和释放内存页。

通过 PageCache,我们可以高效地管理系统内存页的分配和回收,满足 CentralCache 对内存页的释放请求,提升整体内存管理的效率,满足高并发场景下的内存管理需求。这样一来,所有被用户申请的内存都会有可能且最终被释放回操作系统,避免内存泄漏以及内存碎片化的问题。

这里,我们利用一个流程图来总结该内存释放的整体逻辑:

graph TD
    A["用户释放内存块"]:::keyNode --> B{"判断内存块大小"}:::decisionNode
    
    B -- 大内存(>256KB) --> C["直接归还给操作系统"] --> D["释放完成"]
    
    B -- 小内存(≤256KB) --> E["计算对齐后大小及索引"]
    E --> F["归还到ThreadCache对应自由链表"]
    F --> G{"链表大小是否超过阈值?"}:::decisionNode
    
    G --  --> H["批量回收至CentralCache"]
    H --> I["获取对应SpanList并加锁"]
    I --> J["更新Span的useCount"]
    J --> K{"useCount是否为0?"}:::decisionNode
    
    K --  --> L["从CentralCache移除Span"]
    L --> M["加锁PageCache"]
    M --> N["合并相邻空闲Span"]
    N --> O["归还合并后的Span至PageCache"]
    O --> P["解锁PageCache"] --> D
    
    K --  --> Q["解锁SpanList"] --> D
    
    G --  --> R["保留在ThreadCache"] --> D

    classDef keyNode fill:#f9f,stroke:#333,stroke-width:2px;
    classDef decisionNode fill:#9ff,stroke:#333,stroke-width:2px;

这样一来,我们就完整地讲解了高并发内存池项目中三层内存释放架构的设计与实现,包括 ThreadCacheCentralCachePageCache 三个组件的内存释放逻辑。通过这种分层设计,我们不仅提升了内存回收的效率,还有效地降低了内存碎片化的问题,满足了高并发场景下的内存管理需求。当然,我们的确将该项目的核心逻辑完成了,但同时我们也需要对该项目提供面向用户的接口,使得用户能够方便地使用该内存池进行内存分配和释放操作。接下来,我们将讲解如何设计和实现这些用户接口。

用户接口设计与实现

下面是我们为该高并发内存池项目设计的用户接口,包括 ConcurrentAllocConcurrentFree 两个函数,分别用于内存分配和释放操作:

#pragma once

#include "ThreadCache.hpp"
#include "PageCache.hpp"

inline void* ConcurrentAlloc(size_t byte)
{
    if(byte > MAX_BYTES)
    {
        // 计算需要的页数
        size_t alignbyte = SizeClass::RoundUp(byte);
        size_t pageNum = alignbyte >> PAGE_SHIFT;
        PageCache::GetInstance()->mtx.lock();
        Span *span = PageCache::GetInstance()->NewSpan(pageNum);
        span->isusing = true;
        span->objSize = alignbyte;
        PageCache::GetInstance()->mtx.unlock();
        void* ptr = (void*)(span->pageId << PAGE_SHIFT);
        return ptr;
    }
    if(thread_local_data_ == nullptr)
    {
        thread_local_data_ = new ThreadCache();
    }
    return thread_local_data_->Allocate(byte);
}

inline void ConcurrentFree(void *ptr)
{
    Span *span = PageCache::GetInstance()->MapAddrToSpan(ptr);
    size_t byte = span->objSize;
    if(byte > MAX_BYTES)
    {
        PageCache::GetInstance()->mtx.lock();
        PageCache::GetInstance()->ReleaseSpanToPageCache(span);
        PageCache::GetInstance()->mtx.unlock();
        return;
    }
    thread_local_data_->Deallocate(ptr, byte);
}

ConcurrentAlloc 函数对用户的内存分配请求又做了一层处理:

  • 首先,我们检查请求的内存大小是否大于 MAX_BYTES(256KB)。
    • 如果是,则说明用户请求的是大块内存,我们需要直接向 PageCache 申请一块或多块内存页。我们计算出对齐后的内存大小 alignbyte,然后计算出需要的页数 pageNum。接着,我们加锁 PageCache,调用 PageCache::GetInstance()->NewSpan(pageNum) 方法获取一个新的 Span 对象,并将其标记为正在使用中,并设置小块内存大小 objSize。最后,我们解锁 PageCache,并返回该 Span 对象对应的内存地址。
    • 如果不是,则说明用户请求的是小块内存,我们首先检查当前线程的本地缓存 thread_local_data_ 是否为空。
      • 如果为空,则说明当前线程还没有初始化本地缓存,我们创建一个新的 ThreadCache 对象并赋值给 thread_local_data_
      • 然后,我们调用 thread_local_data_->Allocate(byte) 方法从本地缓存中分配内存块,并返回该内存地址。

同样的,ConcurrentFree 函数对用户的内存释放请求也做了一层处理:

  • 首先,我们通过 PageCache::GetInstance()->MapAddrToSpan(ptr) 方法将内存地址映射到对应的 Span 对象,并获取该 Span 对象的小块内存大小 objSize
  • 然后,我们检查 objSize 是否大于 MAX_BYTES(256KB)。
    • 如果是,则说明用户释放的是大块内存,我们加锁 PageCache,调用 PageCache::GetInstance()->ReleaseSpanToPageCache(span) 方法将该 Span 对象释放回 PageCache,然后解锁 PageCache
    • 如果不是,则说明用户释放的是小块内存,我们调用 thread_local_data_->Deallocate(ptr, byte) 方法将该内存块释放回本地缓存。

这样一来,面向用户的接口就实现好了,用户只需要调用 ConcurrentAllocConcurrentFree 函数即可完成内存的分配和释放操作,而不需要关心底层的内存管理细节。

压力测试与性能评估

在完成高并发内存池的设计与实现后,我们需要对其进行压力测试与性能评估,以验证其在高并发场景下的表现。下面是我们设计的压力测试方案:

#include "Concurrent.hpp"
#include <atomic>
using namespace std;



// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
    std::vector<std::thread> vthread(nworks);
    std::atomic<size_t> malloc_costtime = 0;
    std::atomic<size_t> free_costtime = 0;

    for (size_t k = 0; k < nworks; ++k)
    {
        vthread[k] = std::thread([&, k]() {
            std::vector<void*> v;
            v.reserve(ntimes);

            for (size_t j = 0; j < rounds; ++j)
            {
                size_t begin1 = clock();
                for (size_t i = 0; i < ntimes; i++)
                {
                    // v.push_back(malloc(16));
                    v.push_back(malloc((16 + i) % 8192 + 1));
                }
                size_t end1 = clock();

                size_t begin2 = clock();
                for (size_t i = 0; i < ntimes; i++)
                {
                    free(v[i]);
                }
                size_t end2 = clock();
                v.clear();

                malloc_costtime += (end1 - begin1);
                free_costtime += (end2 - begin2);
            }
        });
    }

    for (auto& t : vthread)
    {
        t.join();
    }
    printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
        nworks, rounds, ntimes, malloc_costtime.load());

    printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
        nworks, rounds, ntimes, free_costtime.load());

    printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
        nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}


// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
    std::vector<std::thread> vthread(nworks);
    std::atomic<size_t> malloc_costtime = 0;
    std::atomic<size_t> free_costtime = 0;
    std::atomic<size_t> malloc_lock_costtime = 0;
    std::atomic<size_t> free_lock_costtime = 0;

    for (size_t k = 0; k < nworks; ++k)
    {
        vthread[k] = std::thread([&]() {
            std::vector<void*> v;
            v.reserve(ntimes);

            for (size_t j = 0; j < rounds; ++j)
            {
                size_t begin1 = clock();
                for (size_t i = 0; i < ntimes; i++)
                {
                    // v.push_back(ConcurrentAlloc(16));
                    v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
                }
                size_t end1 = clock();
                // malloc_lock_costtime += g_total_elapsed_us;
                // g_total_elapsed_us = 0;
                size_t begin2 = clock();
                for (size_t i = 0; i < ntimes; i++)
                {
                    {
                        Timer();
                        ConcurrentFree(v[i]);
                    }
                }
                size_t end2 = clock();
                v.clear();
                // free_lock_costtime += g_total_elapsed_us;
                // g_total_elapsed_us = 0;
                malloc_costtime += (end1 - begin1);
                free_costtime += (end2 - begin2);
            }
        });
    }

    for (auto& t : vthread)
    {
        t.join();
    }

    printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
        nworks, rounds, ntimes, malloc_costtime.load());

    printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
        nworks, rounds, ntimes, free_costtime.load());
    // printf("%u个线程并发执行%u轮次,每轮次concurrent alloc lock %u次: 花费:%u ms\n",
    //     nworks, rounds, ntimes, malloc_lock_costtime.load() / 1000000);
    
    printf("Timer: %u ns\n",
        g_total_elapsed_us.load());

    printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
        nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}

int main()
{
    size_t n = 1000;
    cout << "==========================================================" << endl;
    BenchmarkConcurrentMalloc(n, 4, 1024);
    cout << endl << endl;

    BenchmarkMalloc(n, 4, 1024);
    cout << "==========================================================" << endl;
    return 0;
}

首先粘贴一下我的电脑配置:

  • 操作系统:Microsoft Windows 11 家庭中文版(10.0.26220,64 位)
  • 处理器:13th Gen Intel(R) Core(TM) i9-13900H(14 核 / 20 线程,基准 2.60GHz)
  • 设备:ASUSTeK COMPUTER INC. ASUS TUF Gaming F15 FX507VV_FX507VV
  • 物理内存:16 GB
  • 编译器:GCC g++ 14.2.0(MinGW)
  • 构建类型:Debug

运行后结果如下:


PS D:\C++_Code\C++Project\HighConcurrencyMempool\High-concurrency-mempool\build> ./Benchmark.exe
==========================================================
4个线程并发执行1024轮次,每轮次concurrent alloc 1000次: 花费:37 ms
4个线程并发执行1024轮次,每轮次concurrent dealloc 1000次: 花费:511 ms
Timer: 142192000 ns
4个线程并发concurrent alloc&dealloc 4096000次,总计花费:548 ms


4个线程并发执行1024轮次,每轮次malloc 1000次: 花费:412 ms
4个线程并发执行1024轮次,每轮次free 1000次: 花费:190 ms
4个线程并发malloc&free 4096000次,总计花费:602 ms
==========================================================
PS D:\C++_Code\C++Project\HighConcurrencyMempool\High-concurrency-mempool\build> ./Benchmark.exe
==========================================================
4个线程并发执行1024轮次,每轮次concurrent alloc 1000次: 花费:13 ms
4个线程并发执行1024轮次,每轮次concurrent dealloc 1000次: 花费:508 ms
Timer: 138914000 ns
4个线程并发concurrent alloc&dealloc 4096000次,总计花费:521 ms


4个线程并发执行1024轮次,每轮次malloc 1000次: 花费:451 ms
4个线程并发执行1024轮次,每轮次free 1000次: 花费:239 ms
4个线程并发malloc&free 4096000次,总计花费:690 ms
==========================================================
PS D:\C++_Code\C++Project\HighConcurrencyMempool\High-concurrency-mempool\build> ./Benchmark.exe
==========================================================
4个线程并发执行1024轮次,每轮次concurrent alloc 1000次: 花费:40 ms
4个线程并发执行1024轮次,每轮次concurrent dealloc 1000次: 花费:474 ms
Timer: 140044000 ns
4个线程并发concurrent alloc&dealloc 4096000次,总计花费:514 ms


4个线程并发执行1024轮次,每轮次malloc 1000次: 花费:434 ms
4个线程并发执行1024轮次,每轮次free 1000次: 花费:223 ms
4个线程并发malloc&free 4096000次,总计花费:657 ms
==========================================================
PS D:\C++_Code\C++Project\HighConcurrencyMempool\High-concurrency-mempool\build> ./Benchmark.exe
ms
4个线程并发执行1024轮次,每轮次concurrent dealloc 1000次: 花费:488 ms
Timer: 133815000 ns
4个线程并发concurrent alloc&dealloc 4096000次,总计花费:512 ms


4个线程并发执行1024轮次,每轮次malloc 1000次: 花费:404 ms
4个线程并发执行1024轮次,每轮次free 1000次: 花费:208 ms
4个线程并发malloc&free 4096000次,总计花费:612 ms
==========================================================

计算下来,平均结果如下:

操作场景 测试内容 平均花费时间(ms)
并发分配 4个线程×1024轮次×每轮1000次concurrent alloc 30
并发释放 4个线程×1024轮次×每轮1000次concurrent dealloc 495.25
并发分配释放 4个线程并发concurrent alloc&dealloc 4096000次 523.75
标准库分配 4个线程×1024轮次×每轮1000次malloc 425.25
标准库释放 4个线程×1024轮次×每轮1000次free 215
标准库分配释放 4个线程并发malloc&free 4096000次 640.25

从测试结果可以看出,我们实现的高并发内存池在并发分配和释放内存方面表现优异,尤其是在分配操作上,显著优于标准库的 malloc。虽然在释放操作上花费时间较长,但整体来看,高并发内存池在分配和释放内存的总时间上仍然优于标准库的实现。