在Linux系统编程中,线程池是一种常用的并发模式,它通过预先创建一组线程并复用它们来执行任务,避免了频繁创建和销毁线程的开销。本文将从零开始,带你实现一个完整的线程池,涵盖互斥锁、条件变量以及简单的日志系统,并提供详细注释,确保小白也能理解。
线程池的核心是一个任务队列和一组工作线程。主线程将任务添加到队列,工作线程不断从队列中取出任务执行。为了保证线程安全,队列的访问需要互斥锁保护,而条件变量则用于线程间的同步(如队列空时等待,有任务时唤醒)。
在Linux下,我们使用pthread库提供的pthread_mutex_t实现互斥,pthread_cond_t实现条件同步。当任务队列为空时,工作线程调用pthread_cond_wait阻塞;当有新任务加入时,主线程通过pthread_cond_signal唤醒一个等待线程。
日志可以帮助我们观察线程池运行状态。这里实现一个简单的日志宏,打印时间戳和消息到标准输出,用于调试。
#include #include #include #include #include #include // 日志宏:打印时间戳和消息#define LOG(level, msg) do { \ time_t now = time(NULL); \ char buf[32]; \ strftime(buf, sizeof(buf), \ "%Y-%m-%d %H:%M:%S", localtime(&now)); \ printf("[%s] [%s] %s\n", buf, level, msg); \} while(0)// 任务结构体typedef struct task { void (function)(void); // 任务函数 void* arg; // 函数参数 struct task* next; // 指向下一个任务} task_t;// 线程池结构体typedef struct threadpool { pthread_mutex_t lock; // 互斥锁 pthread_cond_t notify; // 条件变量 pthread_t* threads; // 线程数组 task_t* task_head; // 任务队列头 task_t* task_tail; // 任务队列尾 int thread_count; // 线程数 int shutdown; // 是否销毁线程池 int started; // 已启动线程数} threadpool_t;// 工作线程函数static void* worker(void* arg) { threadpool_t* pool = (threadpool_t)arg; task_t task; LOG("INFO", "Worker thread started"); while (1) { pthread_mutex_lock(&(pool->lock)); // 等待任务,直到有任务或线程池销毁 while (pool->task_head == NULL && !pool->shutdown) { pthread_cond_wait(&(pool->notify), &(pool->lock)); } if (pool->shutdown) { pthread_mutex_unlock(&(pool->lock)); LOG("INFO", "Worker thread exiting"); pthread_exit(NULL); } // 取出任务 task = pool->task_head; pool->task_head = task->next; if (pool->task_head == NULL) { pool->task_tail = NULL; } pthread_mutex_unlock(&(pool->lock)); // 执行任务 ((task->function))(task->arg); free(task); // 释放任务内存 } return NULL;}// 创建线程池threadpool_t threadpool_create(int thread_count) { threadpool_t* pool = (threadpool_t)malloc(sizeof(threadpool_t)); if (pool == NULL) return NULL; // 初始化互斥锁和条件变量 pthread_mutex_init(&(pool->lock), NULL); pthread_cond_init(&(pool->notify), NULL); pool->threads = (pthread_t)malloc(sizeof(pthread_t) * thread_count); pool->task_head = NULL; pool->task_tail = NULL; pool->thread_count = thread_count; pool->shutdown = 0; pool->started = 0; // 创建线程 for (int i = 0; i < thread_count; i++) { if (pthread_create(&(pool->threads[i]), NULL, worker, (void*)pool) != 0) { // 创建失败,销毁已创建的线程 threadpool_destroy(pool, 0); return NULL; } pool->started++; } LOG("INFO", "Thread pool created"); return pool;}// 添加任务int threadpool_add(threadpool_t* pool, void (function)(void), void* arg) { pthread_mutex_lock(&(pool->lock)); // 创建新任务 task_t* task = (task_t*)malloc(sizeof(task_t)); if (task == NULL) { pthread_mutex_unlock(&(pool->lock)); return -1; } task->function = function; task->arg = arg; task->next = NULL; // 插入队列尾部 if (pool->task_head == NULL) { pool->task_head = task; pool->task_tail = task; } else { pool->task_tail->next = task; pool->task_tail = task; } // 唤醒一个工作线程 pthread_cond_signal(&(pool->notify)); pthread_mutex_unlock(&(pool->lock)); LOG("INFO", "Task added to queue"); return 0;}// 销毁线程池int threadpool_destroy(threadpool_t* pool, int graceful) { if (pool == NULL) return -1; pthread_mutex_lock(&(pool->lock)); pool->shutdown = (graceful ? 0 : 1); // graceful为0表示立即销毁,1表示等待任务完成 // 唤醒所有等待线程 pthread_cond_broadcast(&(pool->notify)); pthread_mutex_unlock(&(pool->lock)); // 等待所有线程结束 for (int i = 0; i < pool->started; i++) { pthread_join(pool->threads[i], NULL); } // 清理资源 free(pool->threads); // 清空剩余任务 task_t* task = pool->task_head; while (task != NULL) { task_t* next = task->next; free(task); task = next; } pthread_mutex_destroy(&(pool->lock)); pthread_cond_destroy(&(pool->notify)); free(pool); LOG("INFO", "Thread pool destroyed"); return 0;}// 示例任务函数void example_task(void* arg) { int num = (int)arg; LOG("DEBUG", "Processing task with data: "); printf("Task data: %d\n", num); sleep(1); // 模拟工作}int main() { // 创建包含3个线程的线程池 threadpool_t* pool = threadpool_create(3); if (pool == NULL) { LOG("ERROR", "Failed to create thread pool"); return 1; } // 添加5个任务 int nums[5] = {1, 2, 3, 4, 5}; for (int i = 0; i < 5; i++) { threadpool_add(pool, example_task, (void*)&nums[i]); usleep(100); // 微调,避免参数覆盖(仅示例,实际应动态分配内存) } // 等待一段时间,让任务执行 sleep(6); // 销毁线程池(等待任务完成) threadpool_destroy(pool, 1); return 0;} 互斥锁 pthread_mutex_t保护任务队列的插入和取出操作,防止数据竞争。条件变量 pthread_cond_t配合互斥锁,使线程在队列空时休眠,避免忙等待。日志宏LOG打印时间戳和级别,便于观察。
使用gcc编译(需链接pthread):gcc -o threadpool threadpool.c -lpthread运行:./threadpool,观察日志输出和任务执行顺序。
本文通过一个完整的Linux线程池实现,展示了互斥锁、条件变量和简单日志系统的应用。掌握这些基础,你就能应对更复杂的并发场景。建议动手运行代码,加深理解。
— 教程结束,欢迎实践 —
本文由主机测评网于2026-02-28发表在主机测评网_免费VPS_免费云服务器_免费独立服务器,如有疑问,请联系我们。
本文链接:https://www.vpshk.cn/20260227709.html