Linux 线程池

Linux 线程池

七月 27, 2025 次阅读

传统线程池设计

1. 线程池概述

线程池是一种多线程处理形式,它维护一组线程等待分配可并发执行的任务。这种技术避免了频繁创建和销毁线程的开销,提高了系统性能,特别是在需要处理大量短时任务的场景中。

完整代码如下:

#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_ 1;  // 防止头文件重复包含

#include <iostream>
#include <pthread.h>          // POSIX线程库
#include <unordered_map>      // 哈希表,用于存储线程信息
#include <queue>              // 任务队列

const size_t default_cap = 20; // 默认线程池容量

// 自动加解锁的RAII封装类
class LockGuard
{
public:
    // 构造函数自动加锁
    explicit LockGuard(pthread_mutex_t &mtx)
        : mtx_(mtx)
    {
        pthread_mutex_lock(&mtx_);
    }
    // 析构函数自动解锁
    ~LockGuard()
    {
        pthread_mutex_unlock(&mtx_);
    }

    // 禁用拷贝构造和赋值操作
    LockGuard(const LockGuard &) = delete;            
    LockGuard &operator=(const LockGuard &) = delete; 

private:
    pthread_mutex_t &mtx_;  // 引用管理的互斥锁
};

// 线程池模板类
template <class T>
class ThreadPool
{
private:
    // 检查任务队列是否为空
    bool IsEmptyQueue()
    {
        return tasks_.empty();
    }
    
    // 从任务队列头部取出一个任务
    T Pop()
    {
        T task = tasks_.front();
        tasks_.pop();
        return task;
    }

private:
    // 线程数据传递结构体
    struct thread_data
    {
        std::string name;    // 线程名称
        ThreadPool<T> *tp;   // 所属线程池指针
    };

public:
    // 获取线程池单例(线程安全)
    static ThreadPool<T> &GetInstance()
    {
        // C++11后静态局部变量初始化是线程安全的
        static ThreadPool<T> tp;
        return tp;
    }
    
    // 启动线程池
    void Start()
    {
        running_ = true;  // 设置运行标志
        
        // 创建指定数量的工作线程
        for (int i = 1; i <= cap_; ++i)
        {
            std::string name = "thread-" + std::to_string(i);  // 生成线程名
            pthread_t tid;
            
            // 创建线程数据
            thread_data *td = new thread_data{
                name : name,
                tp : this
            };
            
            // 创建线程
            pthread_create(&tid, nullptr, ThreadHandler, (void *)(td));
            
            // 记录线程信息
            threads_[tid] = name;
        }
    }
    
    // 向线程池添加任务
    void Push(const T &task)
    {
        {
            LockGuard lg(mtx_);  // 加锁保护
            tasks_.push(task);   // 任务入队
        }
        // 先释放锁后唤醒,提高并发效率
        pthread_cond_signal(&cond_);  // 唤醒一个等待线程
    }
    
    // 结束线程池
    void End()
    {
        {
            LockGuard lg(mtx_);
            running_ = false;  // 设置停止标志
        }
        
        // 广播唤醒所有等待线程
        pthread_cond_broadcast(&cond_);
        
        // 等待所有线程结束
        for (auto &[tid, name] : threads_)
        {
            pthread_join(tid, nullptr);
        }
        
        threads_.clear();  // 清空线程记录
    }
    
    // 线程处理函数(静态成员函数)
    static void *ThreadHandler(void *args)
    {
        // 获取线程数据
        thread_data *td = static_cast<thread_data *>(args);
        ThreadPool<T> *tp = td->tp;
        std::string thread_name = td->name;
        T task;
        
        while (true)
        {
            {
                LockGuard lg(tp->mtx_);  // 加锁
                
                // 等待条件:任务队列不为空或线程池停止
                while (tp->IsEmptyQueue())
                {
                    if (tp->running_)
                        pthread_cond_wait(&tp->cond_, &tp->mtx_);  // 等待任务
                    else
                    {
                        delete td;  // 清理线程数据
                        return nullptr;  // 线程退出
                    }
                }
                
                // 获取任务
                task = tp->Pop();
            }  // 自动解锁
            
            // 执行任务
            task();
            
            // 输出任务结果(假设T类型有GetResult方法)
            printf("%s: %s\n", thread_name.c_str(), task.GetResult().c_str());
        }
        
        delete td;  // 理论上不会执行到这里
        return nullptr;
    }

private:
    // 私有构造函数(单例模式)
    ThreadPool(size_t cap = default_cap)
        : cap_(cap),          // 线程容量
          running_(false)     // 初始状态为未运行
    {
        pthread_mutex_init(&mtx_, nullptr);    // 初始化互斥锁
        pthread_cond_init(&cond_, nullptr);    // 初始化条件变量
    }
    
    // 析构函数
    ~ThreadPool()
    {
        if (running_)
            End();  // 如果还在运行,先停止
        
        pthread_mutex_destroy(&mtx_);   // 销毁互斥锁
        pthread_cond_destroy(&cond_);   // 销毁条件变量
    }
    
    // 禁用拷贝构造和赋值操作
    ThreadPool(const ThreadPool<T> &) = delete;
    const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;

private:
    std::unordered_map<pthread_t, std::string> threads_;  // 线程ID与名称映射
    size_t cap_;                     // 线程池容量
    std::queue<T> tasks_;            // 任务队列
    pthread_mutex_t mtx_;            // 保护任务队列的互斥锁
    pthread_cond_t cond_;            // 任务通知条件变量
    bool running_;                   // 线程池运行标志
};

// 静态成员初始化
template <class T>
pthread_mutex_t lock_ = PTHREAD_MUTEX_INITIALIZER;

#endif

2. 核心实现分析

2.1 线程安全设计

  • 互斥锁保护:使用pthread_mutex_t保护共享资源(任务队列)
  • 条件变量pthread_cond_t实现线程间的任务通知机制
  • RAII锁LockGuard类实现自动加解锁,防止死锁
class LockGuard {
public:
    explicit LockGuard(pthread_mutex_t &mtx) : mtx_(mtx) {
        pthread_mutex_lock(&mtx_);
    }
    ~LockGuard() {
        pthread_mutex_unlock(&mtx_);
    }
    // 禁用拷贝构造和赋值
};

2.2 任务管理

  • 任务队列:使用std::queue存储待处理任务
  • 任务添加:Push()方法线程安全地添加任务
  • 任务获取:工作线程通过Pop()获取任务

2.3 线程管理

  • 线程创建:Start()方法创建指定数量的工作线程
  • 线程回收:End()方法安全终止所有线程
  • 线程命名:使用哈希表记录线程ID与名称的映射

3. 关键特性

  1. 单例模式:确保全局唯一线程池实例

    static ThreadPool<T> &GetInstance() {
        static ThreadPool<T> tp;
        return tp;
    }
  2. 优雅停机

    • 设置running_标志位
    • 广播条件变量唤醒所有线程
    • 等待所有线程结束
  3. 高效唤醒机制

    void Push(const T &task) {
        {
            LockGuard lg(mtx_);
            tasks_.push(task);
        }
        pthread_cond_signal(&cond_); // 在锁外唤醒
    }

4. 使用示例

我们仍然使用之前的任务类:

#ifndef _TASK_HPP_
#define _TASK_HPP_ 1

#include <iostream>

enum calc_error{
    division_by_zero_error = 1,
    operator_error,
};

class Task
{
public:
    Task(int num1 = 0, int num2 = 0, char op = '+')
    :num1_(num1),
    num2_(num2),
    op_(op){}

    void Run()
    {
        exitcode_ = 0;
        switch (op_)
        {
        case '+':
            result_ = num1_ + num2_;
            break;
        case '-':
            result_ = num1_ - num2_;
            break;
        case '*':
            result_ = num1_ * num2_;
            break;
        case '/':
            if(num2_ == 0){
                result_ = 0;
                exitcode_ = calc_error::division_by_zero_error;
            }
            else result_ = num1_ / num2_;
            break;
        case '%':
            if(num2_ == 0){
                result_ = 0;
                exitcode_ = calc_error::division_by_zero_error;
            }
            else result_ = num1_ % num2_;
            break;
        default:
            result_ = 0;
            exitcode_ = calc_error::operator_error;
            break;
        }
    }

    void operator ()()
    {
        Run();
    }

    std::string GetResult()
    {
        std::string ret = std::to_string(num1_);
        ret += op_;
        ret += std::to_string(num2_);
        ret += "=";
        ret += std::to_string(result_);
        ret += "[code: ";
        ret += std::to_string(exitcode_);
        ret += "]";

        return ret;
    }

    std::string GetTask()
    {
        std::string ret = std::to_string(num1_);
        ret += op_;
        ret += std::to_string(num2_);
        ret += "=?";
        return ret;
    }

private:
    int num1_, num2_;
    char op_;
    int result_;
    int exitcode_;
};

#endif

测试主函数如下:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include <chrono>
#include <semaphore.h>
#include "Task.hpp"
#include "thread_pool.hpp"

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

using namespace std;

const int thread_num = 20;

const int total_task = 20;

sem_t sem_cons;
sem_t sem_pro;

char opers[] = {'+', '-', '*', '/', '%'};

struct thread_data{
    string name;
    sem_t sem;
};

class Timer
{
private:
    std::chrono::time_point<std::chrono::high_resolution_clock> start_time;

public:
    Timer()
    {
        start();
    }

    void start()
    {
        start_time = std::chrono::high_resolution_clock::now();
    }

    ~Timer()
    {
        std::cout << elapsed() << endl;
    }

    // 返回经过的毫秒数
    double elapsed() const
    {
        auto end_time = std::chrono::high_resolution_clock::now();
        return std::chrono::duration<double, std::milli>(end_time - start_time).count();
    }

    // 返回经过的秒数
    double elapsedSeconds() const
    {
        return elapsed() / 1000.0;
    }
};

void *Producer(void *args)
{
    thread_data *td = static_cast<thread_data *>(args);
    string name = td->name;
    while(true)
    {
        if(sem_trywait(&sem_pro)) break;
        usleep(10);
        int num1 = rand() % 10;
        int num2 = rand() % 10;
        char op = opers[rand() % 5];
        Task task{num1, num2, op};
        ThreadPool<Task>::GetInstance().Push(task);
        printf("%s: %s\n", name.c_str(), task.GetTask().c_str());
    }
    return nullptr;
}

int main()
{
    srand(time(nullptr));
    vector<pthread_t> tids;
    sem_init(&sem_cons, 0, total_task);
    sem_init(&sem_pro, 0, total_task);
    ThreadPool<Task>::GetInstance().Start();
    for(int i = 1; i <= thread_num; ++i)
    {
        string name = "producer-" + to_string(i);
        thread_data *td = new thread_data{
            name: name
        };
        pthread_t tid;
        pthread_create(&tid, nullptr, Producer, (void*)(td));
        tids.push_back(tid);
    }
    {
        Timer time;
        for(auto tid: tids) pthread_join(tid, nullptr);
    }
    return 0;
}

同样有一个计时器,不过这里的计时器用处不是很大,因为并没有值得对比的其他类型的线程池,不过我们可以和之前的阻塞队列和环形队列相对比,因为它们都属于是一种生产者-消费者模型。

上面的测试用例是为了查看一下打印结果,验证该线程池是否能够正常工作,我们看一下打印结果:

╭─ljx@VM-16-15-debian ~/linux_review/thread  
╰─➤  ./main.o
thread-1: 0*6=0[code: 0]
thread-1: 2+7=9[code: 0]
producer-1: 0*6=?
producer-9: 2+7=?
thread-3: 6+9=15[code: 0]
producer-10: 6+9=?
thread-4: 4/6=0[code: 0]
producer-11: 4/6=?
thread-5: 5+2=7[code: 0]
producer-12: 5+2=?
thread-6: 6*2=12[code: 0]
producer-10: 6*2=?
thread-8: 9-3=6[code: 0]
thread-7: 4%8=4[code: 0]
producer-1: 4%8=?
producer-9: 9-3=?
thread-9: 8-7=1[code: 0]
producer-10: 8-7=?
thread-10: 5*8=40[code: 0]
producer-11: 5*8=?
thread-11: 6/1=6[code: 0]
producer-12: 6/1=?
producer-1: 8*3=?
thread-12: 8*3=24[code: 0]
producer-9: 5%1=?
thread-13: 5%1=0[code: 0]
producer-15: 8*8=?
thread-19: 8*8=64[code: 0]
producer-10: 5+5=?
thread-14: 5+5=10[code: 0]
producer-1: 5%3=?
thread-15: 5%3=2[code: 0]
producer-11: 5*9=?
thread-16: 5*9=45[code: 0]
producer-13: 5+6=?
thread-17: 5+6=11[code: 0]
producer-14: 1+7=?
thread-18: 1+7=8[code: 0]
producer-2: 1/3=?
thread-20: 1/3=0[code: 0]
0.928079

接下来,我们和前面的阻塞队列和环形队列进行比较,我们修改一下测试用例,让生产者和消费者数量相等,任务数量也相等,然后我们比较一下它们的执行时间:

// 修改了任务总量
const int total_task = 100000;
// 将打印信息屏蔽掉了
// main.cc
// printf("%s: %s\n", name.c_str(), task.GetTask().c_str());
// thread.hpp
// printf("%s: %s\n", thread_name.c_str(), task.GetResult().c_str());

// Task.hpp
void Run()
{
    // 给任务同样增加延迟,统一变数
    usleep(10);
    ...
}

测试后结果如下:

# 第一次
╭─ljx@VM-16-15-debian ~/linux_review/thread  
╰─➤  ./main.o
676.78
# 第二次
╭─ljx@VM-16-15-debian ~/linux_review/thread  
╰─➤  ./main.o
905.2
# 第三次
╭─ljx@VM-16-15-debian ~/linux_review/thread  
╰─➤  ./main.o
724.741
# 第四次
╭─ljx@VM-16-15-debian ~/linux_review/thread  
╰─➤  ./main.o
813.893
# 第五次
╭─ljx@VM-16-15-debian ~/linux_review/thread  
╰─➤  ./main.o
923.235

平均用时:(676.78 + 905.2 + 724.741 + 813.893 + 923.235)/ 5 = 811.8774

可以看到,效率肯定不如环形队列,因为环形队列极大减少了消费者和生产者之间的冲突,只有在队列为空为满的时候才会等待

既然如此,我们为何不试试利用环形队列来实现线程池呢?

说干就干,利用信号量来大幅度降低 PushPop 锁冲突的开销,我们来实现一个基于环形队列的线程池

基于环形队列(信号量)的线程池

代码实现

#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_ 1;  // 头文件保护宏

#include <iostream>
#include <pthread.h>          // POSIX线程库
#include <unordered_map>      // 线程管理容器
#include <queue>              // 任务队列(已弃用)
#include <semaphore.h>        // 信号量支持

const size_t default_cap = 20; // 默认线程数量

// RAII互斥锁封装类
class LockGuard {
public:
    explicit LockGuard(pthread_mutex_t &mtx)
        : push_mtx_(mtx) {
        pthread_mutex_lock(&push_mtx_);  // 构造时加锁
    }
    ~LockGuard() {
        pthread_mutex_unlock(&push_mtx_); // 析构时解锁
    }

    // 禁用拷贝构造和赋值
    LockGuard(const LockGuard &) = delete;            
    LockGuard &operator=(const LockGuard &) = delete; 

private:
    pthread_mutex_t &push_mtx_;  // 管理的互斥锁引用
};

template <class T>
class ThreadPool {
private:
    // 从环形队列取出任务
    T Pop() {
        T task = tasks_[pop_pos_];       // 获取当前读位置任务
        pop_pos_ = (pop_pos_ + 1) % task_cap_; // 环形递增读指针
        return task;
    }

    // 封装sem_wait(P操作)
    void P(sem_t &sem) {
        sem_wait(&sem);
    }

    // 封装sem_post(V操作)
    void V(sem_t &sem) {
        sem_post(&sem);
    }

private:
    // 线程上下文数据
    struct thread_data {
        std::string name;    // 线程名称
        ThreadPool<T> *tp;   // 所属线程池指针
    };

public:
    // 获取单例实例(线程安全)
    static ThreadPool<T> &GetInstance() {
        static ThreadPool<T> tp;  // C++11保证静态变量线程安全
        return tp;
    }

    // 启动线程池
    void Start() {
        running_ = true;
        for (int i = 1; i <= cap_; ++i) {
            std::string name = "thread-" + std::to_string(i);
            pthread_t tid;
            // 创建线程上下文数据
            thread_data *td = new thread_data{ name, this };
            pthread_create(&tid, nullptr, ThreadHandler, td);
            threads_[tid] = name;  // 记录线程信息
        }
    }

    // 提交任务到线程池
    void Push(const T &task) {
        P(push_sem_);  // 等待空闲槽位
        {
            LockGuard lg(push_mtx_);  // 保护写指针
            tasks_[push_pos_] = task; // 写入任务
            push_pos_ = (push_pos_ + 1) % task_cap_; // 环形递增写指针
        }
        V(pop_sem_);  // 通知有新任务
    }

    // 停止线程池
    void End() {
        {
            LockGuard pop_lg(pop_mtx_);
            running_ = false;  // 设置停止标志
        }
        // 等待所有线程退出
        for (auto &[tid, name] : threads_) {
            pthread_join(tid, nullptr);
        }
        threads_.clear();
    }

    // 线程处理函数(静态方法)
    static void *ThreadHandler(void *args) {
        thread_data *td = static_cast<thread_data *>(args);
        ThreadPool<T> *tp = td->tp;
        T task;
        int wait_ret = 0;

        while (true) {
            // 非阻塞式获取任务
            while (true) {
                wait_ret = sem_trywait(&tp->pop_sem_);  // 尝试获取任务
                if (wait_ret == 0) break;  // 成功获取
                
                if (errno == EAGAIN) {     // 无任务可用
                    if (!tp->running_) {    // 检查停止标志
                        delete td;
                        return nullptr;     // 安全退出
                    }
                    usleep(1000);          // 避免忙等待
                    continue;
                } else {                  // 其他错误
                    perror("sem_trywait");
                    break;
                }
            }

            // 处理任务
            {
                LockGuard lg(tp->pop_mtx_);  // 保护读指针
                task = tp->Pop();           // 取出任务
            }
            tp->V(tp->push_sem_);  // 释放一个空槽位
            task();                // 执行任务
        }

        delete td;
        return nullptr;
    }

private:
    // 私有构造函数(单例模式)
    ThreadPool(size_t cap = default_cap, size_t task_cap = 10)
        : cap_(cap),              // 线程数量
          task_cap_(task_cap),    // 环形队列容量
          running_(false),        // 初始状态
          push_pos_(0),           // 写指针初始化
          pop_pos_(0),            // 读指针初始化
          tasks_(task_cap)        // 预分配队列空间
    {
        pthread_mutex_init(&push_mtx_, nullptr);  // 写互斥锁初始化
        pthread_mutex_init(&pop_mtx_, nullptr);   // 读互斥锁初始化
        sem_init(&push_sem_, 0, task_cap);       // 空槽位信号量(初始=容量)
        sem_init(&pop_sem_, 0, 0);               // 任务信号量(初始=0)
    }

    ~ThreadPool() {
        if (running_) End();  // 确保资源释放
        
        pthread_mutex_destroy(&push_mtx_);
        pthread_mutex_destroy(&pop_mtx_);
        sem_destroy(&push_sem_);
        sem_destroy(&pop_sem_);
    }

    // 禁用拷贝构造和赋值
    ThreadPool(const ThreadPool<T> &) = delete;
    const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;

private:
    // 线程管理
    std::unordered_map<pthread_t, std::string> threads_;  // 线程记录
    size_t cap_;          // 线程数量上限
    size_t task_cap_;     // 环形队列容量

    // 环形队列控制
    size_t push_pos_;     // 写位置指针
    size_t pop_pos_;      // 读位置指针
    std::vector<T> tasks_;// 环形队列容器

    // 同步机制
    pthread_mutex_t push_mtx_;  // 写指针互斥锁
    pthread_mutex_t pop_mtx_;   // 读指针互斥锁
    sem_t push_sem_;      // 空槽位信号量(生产者)
    sem_t pop_sem_;       // 任务信号量(消费者)

    bool running_;        // 运行状态标志
};

#endif

一、设计思想

  1. 生产者-消费者模型优化

    • 传统队列需要同时锁住头尾指针,而环形队列通过固定大小预分配内存 + 读写指针分离,实现:
      • 生产者只修改push_pos_(写指针)
      • 消费者只修改pop_pos_(读指针)
      • 读写操作完全解耦,无需竞争同一把锁
  2. 信号量控制流量

    • push_sem_:剩余空闲槽位数(初始=队列容量)
    • pop_sem_:待消费任务数(初始=0)
    • 通过P/V操作自动阻塞/唤醒线程,比条件变量更高效
  3. 无锁化尝试

    • 使用sem_trywait + usleep实现非阻塞等待,避免线程在sem_wait中永久阻塞
    • 单生产者/消费者场景下可完全去除互斥锁(当前实现保留锁是为兼容多生产者场景)

二、关键组件解析

1. 环形队列核心结构

size_t push_pos_;        // 写指针(生产者专用)
size_t pop_pos_;         // 读指针(消费者专用)
std::vector<T> tasks_;   // 预分配环形缓冲区
size_t task_cap_;        // 队列容量
  • 环形计算pos = (pos + 1) % task_cap_ 实现指针回绕
  • 内存预分配:避免动态内存申请的开销

2. 同步机制

sem_t push_sem_;         // 空闲槽位信号量
sem_t pop_sem_;          // 任务信号量
pthread_mutex_t push_mtx_;// 写互斥锁(多生产者时需要)
pthread_mutex_t pop_mtx_; // 读互斥锁(多消费者时需要)
  • 信号量初始化

    sem_init(&push_sem_, 0, task_cap_); // 初始有task_cap_个空位
    sem_init(&pop_sem_, 0, 0);          // 初始0个任务

3. 线程控制

bool running_;           // 线程池启停标志
unordered_map<pthread_t, string> threads_; // 线程记录表
  • 优雅停止
    • 设置running_=false
    • 通过sem_post唤醒所有阻塞线程
    • 线程检查到running_==false且队列空时退出

三、工作流程

生产者提交任务(Push)

sequenceDiagram
    participant Producer
    participant push_sem_
    participant Queue
    participant pop_sem_
    
    Producer->>push_sem_: P() 等待空槽
    activate push_sem_
    push_sem_-->>Producer: 获得空槽
    deactivate push_sem_
    
    Producer->>Queue: 加锁写入任务
    Producer->>pop_sem_: V() 通知有新任务

消费者处理任务(ThreadHandler)

sequenceDiagram
    participant Consumer
    participant pop_sem_
    participant Queue
    participant push_sem_
    
    loop 非阻塞等待
        Consumer->>pop_sem_: sem_trywait()
        alt 获取成功
            pop_sem_-->>Consumer: 获得任务
        else 无任务
            Consumer->>running_: 检查停止标志
        end
    end
    
    Consumer->>Queue: 加锁取出任务
    Consumer->>push_sem_: V() 释放空槽
    Consumer->>Task: 执行任务

四、性能优化点

  1. 锁粒度最小化

    • 写操作只锁push_mtx_(保护push_pos_
    • 读操作只锁pop_mtx_(保护pop_pos_
  2. 缓存友好设计

    • std::vector连续内存布局
    • 读写指针原子操作(可升级为atomic<size_t>
  3. 避免惊群效应

    • 精准的sem_post唤醒(传统条件变量会唤醒所有等待线程)

我们来看一下这个程序的效率:

# 第一次
╭─ljx@VM-16-15-debian ~/linux_review/thread  
╰─➤  ./main.o
659.121
# 第二次
╭─ljx@VM-16-15-debian ~/linux_review/thread  
╰─➤  ./main.o
624.264
# 第三次
╭─ljx@VM-16-15-debian ~/linux_review/thread  
╰─➤  ./main.o
607.972
# 第四次
╭─ljx@VM-16-15-debian ~/linux_review/thread  
╰─➤  ./main.o
551.506
# 第五次
╭─ljx@VM-16-15-debian ~/linux_review/thread  
╰─➤  ./main.o
610.404

平均用时:(659.121+624.264+607.972+551.506+610.404)/5 = 607.576ms

与传统的线程池相比,性能提升了不少