Linux 线程池
七月 27, 2025
次阅读
传统线程池设计
1. 线程池概述
线程池是一种多线程处理形式,它维护一组线程等待分配可并发执行的任务。这种技术避免了频繁创建和销毁线程的开销,提高了系统性能,特别是在需要处理大量短时任务的场景中。
完整代码如下:
#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_ 1; // 防止头文件重复包含
#include <iostream>
#include <pthread.h> // POSIX线程库
#include <unordered_map> // 哈希表,用于存储线程信息
#include <queue> // 任务队列
const size_t default_cap = 20; // 默认线程池容量
// 自动加解锁的RAII封装类
class LockGuard
{
public:
// 构造函数自动加锁
explicit LockGuard(pthread_mutex_t &mtx)
: mtx_(mtx)
{
pthread_mutex_lock(&mtx_);
}
// 析构函数自动解锁
~LockGuard()
{
pthread_mutex_unlock(&mtx_);
}
// 禁用拷贝构造和赋值操作
LockGuard(const LockGuard &) = delete;
LockGuard &operator=(const LockGuard &) = delete;
private:
pthread_mutex_t &mtx_; // 引用管理的互斥锁
};
// 线程池模板类
template <class T>
class ThreadPool
{
private:
// 检查任务队列是否为空
bool IsEmptyQueue()
{
return tasks_.empty();
}
// 从任务队列头部取出一个任务
T Pop()
{
T task = tasks_.front();
tasks_.pop();
return task;
}
private:
// 线程数据传递结构体
struct thread_data
{
std::string name; // 线程名称
ThreadPool<T> *tp; // 所属线程池指针
};
public:
// 获取线程池单例(线程安全)
static ThreadPool<T> &GetInstance()
{
// C++11后静态局部变量初始化是线程安全的
static ThreadPool<T> tp;
return tp;
}
// 启动线程池
void Start()
{
running_ = true; // 设置运行标志
// 创建指定数量的工作线程
for (int i = 1; i <= cap_; ++i)
{
std::string name = "thread-" + std::to_string(i); // 生成线程名
pthread_t tid;
// 创建线程数据
thread_data *td = new thread_data{
name : name,
tp : this
};
// 创建线程
pthread_create(&tid, nullptr, ThreadHandler, (void *)(td));
// 记录线程信息
threads_[tid] = name;
}
}
// 向线程池添加任务
void Push(const T &task)
{
{
LockGuard lg(mtx_); // 加锁保护
tasks_.push(task); // 任务入队
}
// 先释放锁后唤醒,提高并发效率
pthread_cond_signal(&cond_); // 唤醒一个等待线程
}
// 结束线程池
void End()
{
{
LockGuard lg(mtx_);
running_ = false; // 设置停止标志
}
// 广播唤醒所有等待线程
pthread_cond_broadcast(&cond_);
// 等待所有线程结束
for (auto &[tid, name] : threads_)
{
pthread_join(tid, nullptr);
}
threads_.clear(); // 清空线程记录
}
// 线程处理函数(静态成员函数)
static void *ThreadHandler(void *args)
{
// 获取线程数据
thread_data *td = static_cast<thread_data *>(args);
ThreadPool<T> *tp = td->tp;
std::string thread_name = td->name;
T task;
while (true)
{
{
LockGuard lg(tp->mtx_); // 加锁
// 等待条件:任务队列不为空或线程池停止
while (tp->IsEmptyQueue())
{
if (tp->running_)
pthread_cond_wait(&tp->cond_, &tp->mtx_); // 等待任务
else
{
delete td; // 清理线程数据
return nullptr; // 线程退出
}
}
// 获取任务
task = tp->Pop();
} // 自动解锁
// 执行任务
task();
// 输出任务结果(假设T类型有GetResult方法)
printf("%s: %s\n", thread_name.c_str(), task.GetResult().c_str());
}
delete td; // 理论上不会执行到这里
return nullptr;
}
private:
// 私有构造函数(单例模式)
ThreadPool(size_t cap = default_cap)
: cap_(cap), // 线程容量
running_(false) // 初始状态为未运行
{
pthread_mutex_init(&mtx_, nullptr); // 初始化互斥锁
pthread_cond_init(&cond_, nullptr); // 初始化条件变量
}
// 析构函数
~ThreadPool()
{
if (running_)
End(); // 如果还在运行,先停止
pthread_mutex_destroy(&mtx_); // 销毁互斥锁
pthread_cond_destroy(&cond_); // 销毁条件变量
}
// 禁用拷贝构造和赋值操作
ThreadPool(const ThreadPool<T> &) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
private:
std::unordered_map<pthread_t, std::string> threads_; // 线程ID与名称映射
size_t cap_; // 线程池容量
std::queue<T> tasks_; // 任务队列
pthread_mutex_t mtx_; // 保护任务队列的互斥锁
pthread_cond_t cond_; // 任务通知条件变量
bool running_; // 线程池运行标志
};
// 静态成员初始化
template <class T>
pthread_mutex_t lock_ = PTHREAD_MUTEX_INITIALIZER;
#endif
2. 核心实现分析
2.1 线程安全设计
- 互斥锁保护:使用
pthread_mutex_t保护共享资源(任务队列) - 条件变量:
pthread_cond_t实现线程间的任务通知机制 - RAII锁:
LockGuard类实现自动加解锁,防止死锁
class LockGuard {
public:
explicit LockGuard(pthread_mutex_t &mtx) : mtx_(mtx) {
pthread_mutex_lock(&mtx_);
}
~LockGuard() {
pthread_mutex_unlock(&mtx_);
}
// 禁用拷贝构造和赋值
};
2.2 任务管理
- 任务队列:使用
std::queue存储待处理任务 - 任务添加:
Push()方法线程安全地添加任务 - 任务获取:工作线程通过
Pop()获取任务
2.3 线程管理
- 线程创建:
Start()方法创建指定数量的工作线程 - 线程回收:
End()方法安全终止所有线程 - 线程命名:使用哈希表记录线程ID与名称的映射
3. 关键特性
单例模式:确保全局唯一线程池实例
static ThreadPool<T> &GetInstance() { static ThreadPool<T> tp; return tp; }优雅停机:
- 设置
running_标志位 - 广播条件变量唤醒所有线程
- 等待所有线程结束
- 设置
高效唤醒机制:
void Push(const T &task) { { LockGuard lg(mtx_); tasks_.push(task); } pthread_cond_signal(&cond_); // 在锁外唤醒 }
4. 使用示例
我们仍然使用之前的任务类:
#ifndef _TASK_HPP_
#define _TASK_HPP_ 1
#include <iostream>
enum calc_error{
division_by_zero_error = 1,
operator_error,
};
class Task
{
public:
Task(int num1 = 0, int num2 = 0, char op = '+')
:num1_(num1),
num2_(num2),
op_(op){}
void Run()
{
exitcode_ = 0;
switch (op_)
{
case '+':
result_ = num1_ + num2_;
break;
case '-':
result_ = num1_ - num2_;
break;
case '*':
result_ = num1_ * num2_;
break;
case '/':
if(num2_ == 0){
result_ = 0;
exitcode_ = calc_error::division_by_zero_error;
}
else result_ = num1_ / num2_;
break;
case '%':
if(num2_ == 0){
result_ = 0;
exitcode_ = calc_error::division_by_zero_error;
}
else result_ = num1_ % num2_;
break;
default:
result_ = 0;
exitcode_ = calc_error::operator_error;
break;
}
}
void operator ()()
{
Run();
}
std::string GetResult()
{
std::string ret = std::to_string(num1_);
ret += op_;
ret += std::to_string(num2_);
ret += "=";
ret += std::to_string(result_);
ret += "[code: ";
ret += std::to_string(exitcode_);
ret += "]";
return ret;
}
std::string GetTask()
{
std::string ret = std::to_string(num1_);
ret += op_;
ret += std::to_string(num2_);
ret += "=?";
return ret;
}
private:
int num1_, num2_;
char op_;
int result_;
int exitcode_;
};
#endif
测试主函数如下:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include <chrono>
#include <semaphore.h>
#include "Task.hpp"
#include "thread_pool.hpp"
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
using namespace std;
const int thread_num = 20;
const int total_task = 20;
sem_t sem_cons;
sem_t sem_pro;
char opers[] = {'+', '-', '*', '/', '%'};
struct thread_data{
string name;
sem_t sem;
};
class Timer
{
private:
std::chrono::time_point<std::chrono::high_resolution_clock> start_time;
public:
Timer()
{
start();
}
void start()
{
start_time = std::chrono::high_resolution_clock::now();
}
~Timer()
{
std::cout << elapsed() << endl;
}
// 返回经过的毫秒数
double elapsed() const
{
auto end_time = std::chrono::high_resolution_clock::now();
return std::chrono::duration<double, std::milli>(end_time - start_time).count();
}
// 返回经过的秒数
double elapsedSeconds() const
{
return elapsed() / 1000.0;
}
};
void *Producer(void *args)
{
thread_data *td = static_cast<thread_data *>(args);
string name = td->name;
while(true)
{
if(sem_trywait(&sem_pro)) break;
usleep(10);
int num1 = rand() % 10;
int num2 = rand() % 10;
char op = opers[rand() % 5];
Task task{num1, num2, op};
ThreadPool<Task>::GetInstance().Push(task);
printf("%s: %s\n", name.c_str(), task.GetTask().c_str());
}
return nullptr;
}
int main()
{
srand(time(nullptr));
vector<pthread_t> tids;
sem_init(&sem_cons, 0, total_task);
sem_init(&sem_pro, 0, total_task);
ThreadPool<Task>::GetInstance().Start();
for(int i = 1; i <= thread_num; ++i)
{
string name = "producer-" + to_string(i);
thread_data *td = new thread_data{
name: name
};
pthread_t tid;
pthread_create(&tid, nullptr, Producer, (void*)(td));
tids.push_back(tid);
}
{
Timer time;
for(auto tid: tids) pthread_join(tid, nullptr);
}
return 0;
}
同样有一个计时器,不过这里的计时器用处不是很大,因为并没有值得对比的其他类型的线程池,不过我们可以和之前的阻塞队列和环形队列相对比,因为它们都属于是一种生产者-消费者模型。
上面的测试用例是为了查看一下打印结果,验证该线程池是否能够正常工作,我们看一下打印结果:
╭─ljx@VM-16-15-debian ~/linux_review/thread
╰─➤ ./main.o
thread-1: 0*6=0[code: 0]
thread-1: 2+7=9[code: 0]
producer-1: 0*6=?
producer-9: 2+7=?
thread-3: 6+9=15[code: 0]
producer-10: 6+9=?
thread-4: 4/6=0[code: 0]
producer-11: 4/6=?
thread-5: 5+2=7[code: 0]
producer-12: 5+2=?
thread-6: 6*2=12[code: 0]
producer-10: 6*2=?
thread-8: 9-3=6[code: 0]
thread-7: 4%8=4[code: 0]
producer-1: 4%8=?
producer-9: 9-3=?
thread-9: 8-7=1[code: 0]
producer-10: 8-7=?
thread-10: 5*8=40[code: 0]
producer-11: 5*8=?
thread-11: 6/1=6[code: 0]
producer-12: 6/1=?
producer-1: 8*3=?
thread-12: 8*3=24[code: 0]
producer-9: 5%1=?
thread-13: 5%1=0[code: 0]
producer-15: 8*8=?
thread-19: 8*8=64[code: 0]
producer-10: 5+5=?
thread-14: 5+5=10[code: 0]
producer-1: 5%3=?
thread-15: 5%3=2[code: 0]
producer-11: 5*9=?
thread-16: 5*9=45[code: 0]
producer-13: 5+6=?
thread-17: 5+6=11[code: 0]
producer-14: 1+7=?
thread-18: 1+7=8[code: 0]
producer-2: 1/3=?
thread-20: 1/3=0[code: 0]
0.928079
接下来,我们和前面的阻塞队列和环形队列进行比较,我们修改一下测试用例,让生产者和消费者数量相等,任务数量也相等,然后我们比较一下它们的执行时间:
// 修改了任务总量
const int total_task = 100000;
// 将打印信息屏蔽掉了
// main.cc
// printf("%s: %s\n", name.c_str(), task.GetTask().c_str());
// thread.hpp
// printf("%s: %s\n", thread_name.c_str(), task.GetResult().c_str());
// Task.hpp
void Run()
{
// 给任务同样增加延迟,统一变数
usleep(10);
...
}
测试后结果如下:
# 第一次
╭─ljx@VM-16-15-debian ~/linux_review/thread
╰─➤ ./main.o
676.78
# 第二次
╭─ljx@VM-16-15-debian ~/linux_review/thread
╰─➤ ./main.o
905.2
# 第三次
╭─ljx@VM-16-15-debian ~/linux_review/thread
╰─➤ ./main.o
724.741
# 第四次
╭─ljx@VM-16-15-debian ~/linux_review/thread
╰─➤ ./main.o
813.893
# 第五次
╭─ljx@VM-16-15-debian ~/linux_review/thread
╰─➤ ./main.o
923.235
平均用时:(676.78 + 905.2 + 724.741 + 813.893 + 923.235)/ 5 = 811.8774
可以看到,效率肯定不如环形队列,因为环形队列极大减少了消费者和生产者之间的冲突,只有在队列为空为满的时候才会等待
既然如此,我们为何不试试利用环形队列来实现线程池呢?
说干就干,利用信号量来大幅度降低 Push 和 Pop 锁冲突的开销,我们来实现一个基于环形队列的线程池
基于环形队列(信号量)的线程池
代码实现
#ifndef _THREAD_POOL_HPP_
#define _THREAD_POOL_HPP_ 1; // 头文件保护宏
#include <iostream>
#include <pthread.h> // POSIX线程库
#include <unordered_map> // 线程管理容器
#include <queue> // 任务队列(已弃用)
#include <semaphore.h> // 信号量支持
const size_t default_cap = 20; // 默认线程数量
// RAII互斥锁封装类
class LockGuard {
public:
explicit LockGuard(pthread_mutex_t &mtx)
: push_mtx_(mtx) {
pthread_mutex_lock(&push_mtx_); // 构造时加锁
}
~LockGuard() {
pthread_mutex_unlock(&push_mtx_); // 析构时解锁
}
// 禁用拷贝构造和赋值
LockGuard(const LockGuard &) = delete;
LockGuard &operator=(const LockGuard &) = delete;
private:
pthread_mutex_t &push_mtx_; // 管理的互斥锁引用
};
template <class T>
class ThreadPool {
private:
// 从环形队列取出任务
T Pop() {
T task = tasks_[pop_pos_]; // 获取当前读位置任务
pop_pos_ = (pop_pos_ + 1) % task_cap_; // 环形递增读指针
return task;
}
// 封装sem_wait(P操作)
void P(sem_t &sem) {
sem_wait(&sem);
}
// 封装sem_post(V操作)
void V(sem_t &sem) {
sem_post(&sem);
}
private:
// 线程上下文数据
struct thread_data {
std::string name; // 线程名称
ThreadPool<T> *tp; // 所属线程池指针
};
public:
// 获取单例实例(线程安全)
static ThreadPool<T> &GetInstance() {
static ThreadPool<T> tp; // C++11保证静态变量线程安全
return tp;
}
// 启动线程池
void Start() {
running_ = true;
for (int i = 1; i <= cap_; ++i) {
std::string name = "thread-" + std::to_string(i);
pthread_t tid;
// 创建线程上下文数据
thread_data *td = new thread_data{ name, this };
pthread_create(&tid, nullptr, ThreadHandler, td);
threads_[tid] = name; // 记录线程信息
}
}
// 提交任务到线程池
void Push(const T &task) {
P(push_sem_); // 等待空闲槽位
{
LockGuard lg(push_mtx_); // 保护写指针
tasks_[push_pos_] = task; // 写入任务
push_pos_ = (push_pos_ + 1) % task_cap_; // 环形递增写指针
}
V(pop_sem_); // 通知有新任务
}
// 停止线程池
void End() {
{
LockGuard pop_lg(pop_mtx_);
running_ = false; // 设置停止标志
}
// 等待所有线程退出
for (auto &[tid, name] : threads_) {
pthread_join(tid, nullptr);
}
threads_.clear();
}
// 线程处理函数(静态方法)
static void *ThreadHandler(void *args) {
thread_data *td = static_cast<thread_data *>(args);
ThreadPool<T> *tp = td->tp;
T task;
int wait_ret = 0;
while (true) {
// 非阻塞式获取任务
while (true) {
wait_ret = sem_trywait(&tp->pop_sem_); // 尝试获取任务
if (wait_ret == 0) break; // 成功获取
if (errno == EAGAIN) { // 无任务可用
if (!tp->running_) { // 检查停止标志
delete td;
return nullptr; // 安全退出
}
usleep(1000); // 避免忙等待
continue;
} else { // 其他错误
perror("sem_trywait");
break;
}
}
// 处理任务
{
LockGuard lg(tp->pop_mtx_); // 保护读指针
task = tp->Pop(); // 取出任务
}
tp->V(tp->push_sem_); // 释放一个空槽位
task(); // 执行任务
}
delete td;
return nullptr;
}
private:
// 私有构造函数(单例模式)
ThreadPool(size_t cap = default_cap, size_t task_cap = 10)
: cap_(cap), // 线程数量
task_cap_(task_cap), // 环形队列容量
running_(false), // 初始状态
push_pos_(0), // 写指针初始化
pop_pos_(0), // 读指针初始化
tasks_(task_cap) // 预分配队列空间
{
pthread_mutex_init(&push_mtx_, nullptr); // 写互斥锁初始化
pthread_mutex_init(&pop_mtx_, nullptr); // 读互斥锁初始化
sem_init(&push_sem_, 0, task_cap); // 空槽位信号量(初始=容量)
sem_init(&pop_sem_, 0, 0); // 任务信号量(初始=0)
}
~ThreadPool() {
if (running_) End(); // 确保资源释放
pthread_mutex_destroy(&push_mtx_);
pthread_mutex_destroy(&pop_mtx_);
sem_destroy(&push_sem_);
sem_destroy(&pop_sem_);
}
// 禁用拷贝构造和赋值
ThreadPool(const ThreadPool<T> &) = delete;
const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
private:
// 线程管理
std::unordered_map<pthread_t, std::string> threads_; // 线程记录
size_t cap_; // 线程数量上限
size_t task_cap_; // 环形队列容量
// 环形队列控制
size_t push_pos_; // 写位置指针
size_t pop_pos_; // 读位置指针
std::vector<T> tasks_;// 环形队列容器
// 同步机制
pthread_mutex_t push_mtx_; // 写指针互斥锁
pthread_mutex_t pop_mtx_; // 读指针互斥锁
sem_t push_sem_; // 空槽位信号量(生产者)
sem_t pop_sem_; // 任务信号量(消费者)
bool running_; // 运行状态标志
};
#endif
一、设计思想
生产者-消费者模型优化
- 传统队列需要同时锁住头尾指针,而环形队列通过固定大小预分配内存 + 读写指针分离,实现:
- 生产者只修改
push_pos_(写指针) - 消费者只修改
pop_pos_(读指针) - 读写操作完全解耦,无需竞争同一把锁
- 生产者只修改
- 传统队列需要同时锁住头尾指针,而环形队列通过固定大小预分配内存 + 读写指针分离,实现:
信号量控制流量
push_sem_:剩余空闲槽位数(初始=队列容量)pop_sem_:待消费任务数(初始=0)- 通过
P/V操作自动阻塞/唤醒线程,比条件变量更高效
无锁化尝试
- 使用
sem_trywait+usleep实现非阻塞等待,避免线程在sem_wait中永久阻塞 - 单生产者/消费者场景下可完全去除互斥锁(当前实现保留锁是为兼容多生产者场景)
- 使用
二、关键组件解析
1. 环形队列核心结构
size_t push_pos_; // 写指针(生产者专用)
size_t pop_pos_; // 读指针(消费者专用)
std::vector<T> tasks_; // 预分配环形缓冲区
size_t task_cap_; // 队列容量
- 环形计算:
pos = (pos + 1) % task_cap_实现指针回绕 - 内存预分配:避免动态内存申请的开销
2. 同步机制
sem_t push_sem_; // 空闲槽位信号量
sem_t pop_sem_; // 任务信号量
pthread_mutex_t push_mtx_;// 写互斥锁(多生产者时需要)
pthread_mutex_t pop_mtx_; // 读互斥锁(多消费者时需要)
信号量初始化:
sem_init(&push_sem_, 0, task_cap_); // 初始有task_cap_个空位 sem_init(&pop_sem_, 0, 0); // 初始0个任务
3. 线程控制
bool running_; // 线程池启停标志
unordered_map<pthread_t, string> threads_; // 线程记录表
- 优雅停止:
- 设置
running_=false - 通过
sem_post唤醒所有阻塞线程 - 线程检查到
running_==false且队列空时退出
- 设置
三、工作流程
生产者提交任务(Push)
sequenceDiagram
participant Producer
participant push_sem_
participant Queue
participant pop_sem_
Producer->>push_sem_: P() 等待空槽
activate push_sem_
push_sem_-->>Producer: 获得空槽
deactivate push_sem_
Producer->>Queue: 加锁写入任务
Producer->>pop_sem_: V() 通知有新任务
消费者处理任务(ThreadHandler)
sequenceDiagram
participant Consumer
participant pop_sem_
participant Queue
participant push_sem_
loop 非阻塞等待
Consumer->>pop_sem_: sem_trywait()
alt 获取成功
pop_sem_-->>Consumer: 获得任务
else 无任务
Consumer->>running_: 检查停止标志
end
end
Consumer->>Queue: 加锁取出任务
Consumer->>push_sem_: V() 释放空槽
Consumer->>Task: 执行任务
四、性能优化点
锁粒度最小化
- 写操作只锁
push_mtx_(保护push_pos_) - 读操作只锁
pop_mtx_(保护pop_pos_)
- 写操作只锁
缓存友好设计
std::vector连续内存布局- 读写指针原子操作(可升级为
atomic<size_t>)
避免惊群效应
- 精准的
sem_post唤醒(传统条件变量会唤醒所有等待线程)
- 精准的
我们来看一下这个程序的效率:
# 第一次
╭─ljx@VM-16-15-debian ~/linux_review/thread
╰─➤ ./main.o
659.121
# 第二次
╭─ljx@VM-16-15-debian ~/linux_review/thread
╰─➤ ./main.o
624.264
# 第三次
╭─ljx@VM-16-15-debian ~/linux_review/thread
╰─➤ ./main.o
607.972
# 第四次
╭─ljx@VM-16-15-debian ~/linux_review/thread
╰─➤ ./main.o
551.506
# 第五次
╭─ljx@VM-16-15-debian ~/linux_review/thread
╰─➤ ./main.o
610.404
平均用时:(659.121+624.264+607.972+551.506+610.404)/5 = 607.576ms
与传统的线程池相比,性能提升了不少
查看评论