封装线程池相关函数

news/2024/10/6 12:33:36

image

thread_pool.c

线程要执行的任务

/*******************************************************************************************  @name      :  routine*	@function  :  线程要执行的任务*  @paramsv   : None*  @retval    :  None*  @author    :  Dazz*  @date      :  2024/6/2*  @version   :  None*  @note      :  该函数会在routine线程被取消的时候自动执行*** *****************************************************************************************/
void *routine(void *arg)
{
// 调试
#ifdef DEBUGprintf("[%u] is started.\n",(unsigned)pthread_self());
#endif// 把需要传递给线程任务的参数进行备份thread_pool *pool = (thread_pool *)arg;// 新建一个任务结点用来备份struct task *p;while (1){/**************************************************************************   pthread_cleanup_push是一个宏,需要配合pthread_cleanup_pop一起使用**   如在执行pthread_cleanup_pop之前,该线程被取消了,则会调用handler这个函数,**   并将pool->lock作为参数传递给handler******************************************************************************/pthread_cleanup_push(handler, (void *)&pool->lock);// 对互斥锁上锁pthread_mutex_lock(&pool->lock);/******************************任务量为0的情况********************************************************/// 当处于等待状态的线程数量为0且销毁线程池的标志为0时while (pool->waiting_tasks == 0 && !pool->shutdown){// 对该线程进行挂起pthread_cond_wait(&pool->cond, &pool->lock);}// 当处于等待状态的线程数量为0且销毁线程池的标志为1时if (pool->waiting_tasks == 0 && pool->shutdown == true){// 解锁互斥锁pthread_mutex_unlock(&pool->lock);// 结束该线程pthread_exit(NULL); // CANNOT use 'break';}/******************************任务量不为0的情况****************************************************/// 备份任务结点p = pool->task_list->next;// 让线程池里的当前任务指向下一个任务pool->task_list->next = p->next;// 令处于等待状态的线程数量减一pool->waiting_tasks--;//================================================//// 互斥锁解锁pthread_mutex_unlock(&pool->lock);// 不执行清理处理程序,参数为1则执行清理处理程序pthread_cleanup_pop(0);//================================================//// 设置线程为不可取消pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);// 执行p,即当前任务结点里需要执行的任务(p->do_task)(p->arg);// 设置线程为可取消pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);// 释放堆内存free(p);}// 结束线程pthread_exit(NULL);
}

初始化线池

/********************************************************  @name      :  init_pool*	@function  :  初始化线池*  @params*                @pool             : 线程池的地址*                @threads_number   : 线程池中活跃的线程的数量**  @retval    : 成功返回1,否则返回0*  @author    : Dazz*  @date      : 2024/6/2*  @version   : None*  @note      : None*** *******************************************************/
bool init_pool(thread_pool *pool, unsigned int threads_number)
{// 以默认属性初始化互斥锁pthread_mutex_init(&pool->lock, NULL);// 以默认属性初始化条件量pthread_cond_init(&pool->cond, NULL);// 销毁标志设置为不销毁pool->shutdown = false;// 给链表的节点申请堆内存pool->task_list = (struct task *)malloc(sizeof(struct task));// 申请堆内存,用于存储创建出来的线程的IDpool->tids = (pthread_t *)malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);// 错误处理,对malloc进行错误处理if (pool->task_list == NULL || pool->tids == NULL){fprintf(stderr, "malloc for task_list or tids fail!,error: %d, %s\n", errno, strerror(errno));return false;}// 对任务链表中的节点的指针域进行初始化pool->task_list->next = NULL;// 设置线程池中线程数量的最大值pool->max_waiting_tasks = MAX_WAITING_TASKS;// 设置等待线程处理的任务的数量为0,说明现在没有任务pool->waiting_tasks = 0;// 设置线程池中活跃的线程的数量pool->active_threads = threads_number;// 循环创建活跃线程for (int i = 0; i < pool->active_threads; i++){// 创建线程  把线程的ID存储在申请的堆内存if (pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0){fprintf(stderr, " create threads fail!,error: %d, %s\n", errno, strerror(errno));return false;}// 用于调试
#ifdef DEBUGprintf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",(unsigned)pthread_self(), __FUNCTION__,i, (unsigned)pool->tids[i]);
#endif}return true;
}

向线程池的任务链表中添加任务

/********************************************************  @name      :  add_task*	@function  :  向线程池的任务链表中添加任务*  @params*                @pool         : 线程池的地址*                @do_task      : 需要执行的任务的函数*                @arg          :需要执行的任务的函数的参数**  @retval    : 如成功返回1,否则返回0*  @author    : Dazz*  @date      : 2024/6/2*  @version   : None*  @note      : None*** *******************************************************/
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *arg)
{// 给任务链表节点申请内存struct task *new_task = malloc(sizeof(struct task));if (new_task == NULL){fprintf(stderr, " malloc for new task fail!,error: %d, %s\n", errno, strerror(errno));return false;}new_task->do_task = do_task; // 任务函数指针指向传进来的do_tasknew_task->arg = arg;		 // 设置函数参数new_task->next = NULL;		 // 指针域设置为NULL//============ LOCK =============//// 互斥锁上锁pthread_mutex_lock(&pool->lock);//===============================//// 说明要处理的任务的数量大于能处理的任务数量if (pool->waiting_tasks >= MAX_WAITING_TASKS){pthread_mutex_unlock(&pool->lock);fprintf(stderr, "too many tasks.\n");free(new_task);return false;}// 备份任务链表struct task *tmp = pool->task_list;// 遍历链表,找到单向链表的尾节点while (tmp->next != NULL)tmp = tmp->next;// 把新的要处理的任务插入到链表的尾部  尾插tmp->next = new_task;// 要处理的任务的数量+1pool->waiting_tasks++;//=========== UNLOCK ============//pthread_mutex_unlock(&pool->lock);//===============================//// 调试
#ifdef DEBUGprintf("[%u][%s] ==> a new task has been added.\n",(unsigned)pthread_self(), __FUNCTION__);
#endif// 唤醒第一个处于阻塞队列中的线程pthread_cond_signal(&pool->cond);return true;
}

向线程池加入新线程

/********************************************************  @name      :  add_thread*	@function  :  向线程池加入新线程*  @params*                @pool                  : 线程池的地址*                @removing_threads      : 需要增加的线程数量**  @retval    : 返回正在活跃的线程数量*  @author    : Dazz*  @date      : 2024/6/2*  @version   : None*  @note      : None*** *******************************************************/
int add_thread(thread_pool *pool, unsigned additional_threads)
{// 判断需要添加的新线程的数量是否为0if (additional_threads == 0)return 0;// 计算线程池中总线程的数量unsigned total_threads =pool->active_threads + additional_threads;int i, actual_increment = 0;// 循环创建新线程for (i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++){// 创建新线程if (pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool) != 0){perror("add threads error");// 如需要加入的新线程池数量为0,则直接退出if (actual_increment == 0)return -1;break;}actual_increment++;
// 用于调试
#ifdef DEBUGprintf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",(unsigned)pthread_self(), __FUNCTION__,i, (unsigned)pool->tids[i]);
#endif}// 记录此时线程池中活跃线程的总数pool->active_threads += actual_increment;return actual_increment;
}

从线程池中删除线程

/********************************************************  @name      :  remove_thread*	@function  :  从线程池中删除线程*  @params*                @pool                  : 线程池的地址*                @removing_threads      : 需要删除的线程数量**  @retval    : 返回正在活跃的线程数量*  @author    : Dazz*  @date      : 2024/6/2*  @version   : None*  @note      : None*** *******************************************************/
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{// 如果需要删除的线程数量为0,则返回正在活跃的线程数量if (removing_threads == 0)return pool->active_threads;// 定义一个变量叫remaining_threads,remaining_threads的值为正在活跃的线程数量减去需要删除线程的数量int remaining_threads = pool->active_threads - removing_threads;// 如remaining_threads大于零,说明正在活跃的线程数量大于需要删除线程的数量// 如remaining_threads小于零,说明正在活跃的线程数量小于或等于需要删除线程的数量,并将remaining_threads赋值为1remaining_threads = remaining_threads > 0 ? remaining_threads : 1;int i;// 循环取消线程,直到正在活跃的线程数量为0for (i = pool->active_threads - 1; i > remaining_threads - 1; i--){// 如调用pthread_cancel函数失败,则退出循环errno = pthread_cancel(pool->tids[i]);if (errno != 0)break;#ifdef DEBUGprintf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",(unsigned)pthread_self(), __FUNCTION__,i, (unsigned)pool->tids[i]);
#endif}if (i == pool->active_threads - 1)return -1;// 返回现在的线程数量else{pool->active_threads = i + 1;return i + 1;}
}

释放线程池

/********************************************************  @name      :  destroy_pool*	@function  :  释放线程池*  @params*                @pool                  : 线程池的地址**  @retval    : 成功返回1,否则返回0*  @author    : Dazz*  @date      : 2024/6/2*  @version   : None*  @note      : None*** *******************************************************/
bool destroy_pool(thread_pool *pool)
{// 设置销毁标志位1pool->shutdown = true;// 唤醒所有线程pthread_cond_broadcast(&pool->cond);// 循环等待线程结束for (int i = 0; i < pool->active_threads; i++){// 等待线程结束并释放相关资源errno = pthread_join(pool->tids[i], NULL);if (errno != 0){printf("join tids[%d] error: %s\n", i, strerror(errno));}elseprintf("[%u] is joined\n", (unsigned)pool->tids[i]);}// 释放相关内存free(pool->task_list);free(pool->tids);free(pool);return true;
}

thread_pool.h

#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>#include <errno.h>
#include <pthread.h>#define MAX_WAITING_TASKS 1000 // 处于等待状态的线程数量最大为1000
#define MAX_ACTIVE_THREADS 20  // 活跃的线程数量// 任务结点  单向链表的节点,类型
struct task
{void *(*do_task)(void *arg); // 任务函数指针  指向线程要执行的任务  格式是固定的void *arg;					 // 需要传递给任务的参数,如果不需要,则NULLstruct task *next; // 指向下一个任务结点的指针
};// 线程池的管理结构体
typedef struct thread_pool
{pthread_mutex_t lock; // 互斥锁pthread_cond_t cond;  // 条件量bool shutdown; // 是否需要销毁线程池struct task *task_list; // 用于存储任务的链表pthread_t *tids; // 用于记录线程池中线程的IDunsigned max_waiting_tasks; // 线程池中线程的数量最大值unsigned waiting_tasks;		// 处于等待状态的线程数量unsigned active_threads;	// 正在活跃的线程数量
} thread_pool;/********************************************************  @name      :  init_pool*	@function  :  初始化线池*  @params*                @pool             : 线程池的地址*                @threads_number   : 线程池中活跃的线程的数量**  @retval    : 成功返回1,否则返回0*  @author    : Dazz*  @date      : 2024/6/2*  @version   : None*  @note      : None*** *******************************************************/
bool init_pool(thread_pool *pool, unsigned int threads_number);/********************************************************  @name      :  add_task*	@function  :  向线程池的任务链表中添加任务*  @params*                @pool         : 线程池的地址*                @do_task      : 需要执行的任务的函数*                @arg          :需要执行的任务的函数的参数**  @retval    : 如成功返回1,否则返回0*  @author    : Dazz*  @date      : 2024/6/2*  @version   : None*  @note      : None*** *******************************************************/
bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);/********************************************************  @name      :  add_thread*	@function  :  向线程池加入新线程*  @params*                @pool                  : 线程池的地址*                @removing_threads      : 需要增加的线程数量**  @retval    : 返回正在活跃的线程数量*  @author    : Dazz*  @date      : 2024/6/2*  @version   : None*  @note      : None*** *******************************************************/
int add_thread(thread_pool *pool, unsigned int additional_threads_number);/********************************************************  @name      :  remove_thread*	@function  :  从线程池中删除线程*  @params*                @pool                  : 线程池的地址*                @removing_threads      : 需要删除的线程数量**  @retval    : 返回正在活跃的线程数量*  @author    : Dazz*  @date      : 2024/6/2*  @version   : None*  @note      : None*** *******************************************************/
int remove_thread(thread_pool *pool, unsigned int removing_threads_number);/********************************************************  @name      :  destroy_pool*	@function  :  释放线程池*  @params*                @pool                  : 线程池的地址**  @retval    : 成功返回1,否则返回0*  @author    : Dazz*  @date      : 2024/6/2*  @version   : None*  @note      : None*** *******************************************************/
bool destroy_pool(thread_pool *pool);/*******************************************************************************************  @name      :  routine*	@function  :  线程要执行的任务*  @paramsv   : None*  @retval    :  None*  @author    :  Dazz*  @date      :  2024/6/2*  @version   :  None*  @note      :  该函数会在routine线程被取消的时候自动执行*** *****************************************************************************************/
void *routine(void *arg);#endif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hjln.cn/news/43049.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

深度体验与测评openGauss 6.0.0新版本

openGauss 6.0.0版本在安装和使用方面都带来了很大的改进和优化。一站式交互安装功能极大地简化了安装流程,降低了用户的学习成本;性能优化和中文日志支持功能则进一步提升了数据库的稳定性和易用性。本文分享自华为云社区《openGauss 6.0.0新版本安装测评》,作者:马顺华。…

【IDEA 必备插件之一】这样注入 Bean 才爽

今天我们来介绍一款免费的IDEA生产力插件,它叫Bean Assistant。我们可以在插件市场搜索Bean Assistant来安装它。前言 不知道你们在平时的工作场景中是否经常遇到以下的情况。在一个方法中,需要调用某个实例接口的方法。我们经常会先在该类里面先注入这个接口的实例 Bean,然后…

文件系统(六):一文看懂linux ext4文件系统工作原理

liwen01 2024.06.09 前言 Linux系统中的ext2、ext3、ext4 文件系统,它们都有很强的向后和向前兼容性,可以在数据不丢失的情况下进行文件系统的升级。目前ext4是一个相对较成熟、稳定且高效的文件系统,适用于绝大部分规模和需求的Linux环境。 ext4它突出的特点有:数据分段管…

【译】Visual Studio 17.10 发布了新版扩展管理器

从 Visual Studio 17.10 开始提供新的扩展管理器作为默认预览功能。我们已将基本功能简化为现代风格 UI,以帮助您发现新的扩展并管理已安装的扩展。我们将更新的扩展管理器带给所有用户!在过去的一年里,我们已经将更新后的扩展管理器作为可选的预览功能提供,并一直期待您的…

手机上玩 PC 游戏的开源项目「GitHub 热点速览」

上周国产 3A 大作《黑神话:悟空》开启预售,同时公布游戏将于北京时间 2024.8.20 正式上线。这是一款由「游戏科学」开发的西游题材单机动作角色扮演游戏,它采用「虚幻引擎5」制作。该引擎并不是完全开源的,但它提供了部分源代码的访问权限。具体来说,就是 **GitHub 账号必…

Python 数据类型

Python 数据类型 Python 的数据类型大致可以分为两大类:基本数据类型和容器数据类型。基本数据类型通常指的是单一、不可分割的数据对象,而容器数据类型则用于存储多个数据对象的集合。如下图所示:基本数据类型 整型(int) 整型数据用于表示整数。Python 中的整型变量没有固…

[转帖]探索fio参数如何选择以及全方位对比HDD和SSD性能

文章目录 1. 磁盘I/O性能指标1.1 性能指标1.2 I/O 观测1.2.1 磁盘I/O 观测1.2.2 进程I/O观测 2. Fio 性能测试2.1 环境准备2.2 测试维度选择2.3 测试2.3.1 optane ssd和nvme ssd性能测试2.3.2 aep性能测试(intel persistent memory) 真正测试之前 我们需要清楚 评判磁盘I/O性能…

3. 使用Mybatis完成CRUD

前置工作准备创建Maven项目 , 引入依赖(mybatis依赖 ,mysql 驱动依赖 ,junit依赖 ,logback 依赖) 将xml文件放到类的根路径下 提供com.north.mybatis.utils.SqlSessionUtil工具类 创建测试用例:com.north.mybatis.CarMapperTest‍ 补充知识:什么是CRUD C: Create增 R: Re…