新闻| 论坛| 博客| 在线研讨会
C/C++手撕线程池(线程池的封装和实现)
电子禅石| 2024-04-09 11:47:24 阅读:184 发布文章

本文使用的源码地址:

https://github.com/SCljh/thread_pool


线程池描述


池式结构

  在计算机体系结构中有许多池式结构:内存池、数据库连接池、请求池、消息队列、

对象池等等。


  池式结构解决的主要问题为缓冲问题,起到的是缓冲区的作用。


线程池

 通过使用线程池,我们可以有效降低多线程操作中任务申请和释放产生的性能消耗。

特别是当我们每个线程的任务处理比较快时,系统大部分性能消耗都花在了

pthread_create以及释放线程的过程中。

那既然是这样的话,何不在程序开始运行阶段提前创建好一堆线程,

等我们需要用的时候只要去这一堆线程中领一个线程,用完了再放回去,

等程序运行结束时统一释放这一堆线程呢?按照这个想法,线程池出现了。


线程池解决的问题

解决任务处理

阻塞IO

解决线程创建于销毁的成本问题

管理线程

  线程池应用之一:日志存储


  在服务器保存日志至磁盘上时,性能往往压在磁盘读写上,

而引入线程池利用异步解耦可以解决磁盘读写性能问题。


  线程池的主要作用:异步解耦

说了那么多线程池的优点,那接下来要做的就是手撕这诱人的玩意了。


朴实无华但不枯燥的代码(以c++为例)

本文主要讲解的是c++线程池的实现,C语言实现其实思想和c++是一致的,

具体的代码可见文章开头的链接。


线程池中比较关键的东西

  若想自己编写一个线程池框架,那么可以先关注线程池中比较关键的东西:


工作队列

任务队列

线程池的池

pthread_create中的回调函数

为什么说这些东西比较关键?因为这“大四元”基本上支撑起了整个线程池的框架。

而线程池的框架如下所示:

————————————————

线程池

 如图所示,我们将整个框架以及任务添加接口定义为线程池的“”,

那么在这个池子中重要的就是工作队列任务队列

以及决定工作队列中的thread到底应该工作还是休息的回调函数

  那么手撕线程池,我们就从这几个关键点入手。

工作队列

  worker队列,首先要有worker才有队列,我们首先定义worker结构体:

  可想而知,worker中要有create_pthread函数的id参数,

还需要有控制每一个worker live or die的标志terminate,

我们最好再设置一个标志表示这个worker是否在工作。

最后,我们要知道这个worker隶属于那个线程池(至于为什么下文将介绍)

struct NWORKER{ pthread_t threadid; //线程id bool terminate; //是否需要结束该worker的标志 int isWorking; //该worker是否在工作 ThreadPool *pool; //隶属于的线程池 }

任务队列

  任务队列就简单得多了,想想编程语言中的任务应该是什么?不就是函数嘛。

所以我们只需要定义一个函数该有的东西就行了。

struct NJOB{ void (*func)(void *arg); //任务函数 void *user_data; //函数参数 };

线程池本池

  对于一个线程池,任务队列和工作队列已经是必不可少的东西了,

那线程池还有需要哪些东西辅助它以达到它该有的功能呢?

  一说到线程,那处理好线程同步就是一个绕不开的话题,

那在线程池中我们需要处理的临界资源有哪些呢?

想想我们工作队列中的每个worker都在等待一个任务队列看其是否有任务到来,

所以很容易得出结论我们必须要在线程池中实现两把锁:

一把是用来控制对任务队列操作的互斥锁,

另一把是当任务队列有新任务时唤醒worker的条件锁。

  有了这两把锁,线程池中再加点必要的一些数字以及对线程池操作的函数,

那么这个类就写完了。实现代码如下:

class ThreadPool{ private: struct NWORKER{ pthread_t threadid; bool terminate; int isWorking; ThreadPool *pool; } *m_workers; struct NJOB{ void (*func)(void *arg); //任务函数 void *user_data; }; public: //线程池初始化 //numWorkers:线程数量 ThreadPool(int numWorkers, int max_jobs); //销毁线程池 ~ThreadPool(); //面向用户的添加任务 int pushJob(void (*func)(void *data), void *arg, int len); private: //向线程池中添加任务 bool _addJob(NJOB* job); //回调函数 static void* _run(void *arg); void _threadLoop(void *arg); private: std::list m_jobs_list; int m_max_jobs; //任务队列中的最大任务数 int m_sum_thread; //worker总数 int m_free_thread; //空闲worker数 pthread_cond_t m_jobs_cond; //线程条件等待 pthread_mutex_t m_jobs_mutex; //为任务加锁防止一个任务 被两个线程执行等其他情况 };

可以看到我们做了一些必要的封装,只给用户提供了构造函数、

析构函数以及添加任务的函数。这也是一个基本的线程池框架必要的接口。

回调函数

static?

  根据上方代码可以看见,回调函数为static函数。

我可不想在我使用使用回调函数的时候自动给我加上*this参数。

  首先回调函数是每个线程创建之后就开始执行的函数,

该函数作为pthread_create的第三个参数传入。我们来看看pthread_create的函数原型:

int pthread_create(pthread_t *tidp,const pthread_attr_t *attr, void *(*start_rtn)(void*),void *arg);

注意到,此处的我们传入的回调函数必须是接受一个void*参数,

且返回类型为void*的函数。如果我们将回调函数写成线程池的普通成员函数,

那么c++会在这个函数参数前默认加上一个*this参数,

**这也是为什么我们能在成员函数中使用当前对象中的一些属性。

**然而就是这个原因,若我们传入的回调函数指针为类的成员函数,

那c++编译器会破坏我们的函数结构(因为给我们加了一个形参),

导致pthread_create的第三个参数不符合要求而报错:

NON-STATIC

看吧,编译器不让我们用non-static的成员函数作为回调函数传入pthread_create中。

其实在c++中,大多数回调函数都有这个要求。


  那为什么static就可以呢?


这是因为static函数为类的静态函数,当类的成员函数被static修饰后,

调用该函数将不会默认传递*this指针,

这也是为什么static成员函数中不能使用对象的非static属性:

你*this指针都没传我上哪去找你的对象?

函数本身

  在运行回调函数的时候,我们又想用对象里的东西(比如锁),

编译器又不让我们用,那怎么办?别忘了虽然static函数没有*this指针,

但它却可以有一个*void的参数啊。有了这个*void,我们还怕少一个*this指针?

我们可以先写一个static函数,将需要的对象指针通过形参传到这个函数里,

再在这个函数中通过这个对象调用成员函数的方法,就能使用这个对象的成员属性了。


  就像这样:

//run为static函数 void* ThreadPool::_run(void *arg) { NWORKER *worker = (NWORKER *)arg; worker->pool->_threadLoop(arg); } //threadLoop为普通成员函数 void ThreadPool::_threadLoop(void *arg) { //在这里就能直接用当前ThreadPool对象的东西了 }

至于threadLoop的实现,由于线程是要一直存在的,

一个while(true)的循环肯定少不了了。这个循环中具体做什么呢:

不断检查任务队列中是否有job:


如果有,则取出这个job,并将该job从任务队列中删除,且执行job中的func函数。

如果没有,调用pthread_cond_wait函数等待job到来时被唤醒。

若当前worker的terminate为真,则退出循环结束线程。

注意在对job操作前别忘了加锁,函数实现如下:

void ThreadPool::_threadLoop(void *arg) { NWORKER *worker = (NWORKER*)arg; while (1){ //线程只有两个状态:执行\等待 //查看任务队列前先获取锁 pthread_mutex_lock(&m_jobs_mutex); //当前没有任务 while (m_jobs_list.size() == 0) { //检查worker是否需要结束生命 if (worker->terminate) break; //条件等待直到被唤醒 pthread_cond_wait(&m_jobs_cond,&m_jobs_mutex); } //检查worker是否需要结束生命 if (worker->terminate){ pthread_mutex_unlock(&m_jobs_mutex); break; } //获取到job后将该job从任务队列移出,免得其他worker过来重复做这个任务 struct NJOB *job = m_jobs_list.front(); m_jobs_list.pop_front(); //对任务队列的操作结束,释放锁 pthread_mutex_unlock(&m_jobs_mutex); m_free_thread--; worker->isWorking = true; //执行job中的func job->func(job->user_data); worker->isWorking = false; free(job->user_data); free(job); } free(worker); pthread_exit(NULL); }



原文链接:https://blog.csdn.net/ACMer_L/article/details/107578636




————————————————








原文链接:https://blog.csdn.net/ACMer_L/article/details/107578636


*博客内容为网友个人发布,仅代表博主个人观点,如有侵权请联系工作人员删除。

参与讨论
登录后参与讨论
属于自己的技术积累分享,成为嵌入式系统研发高手。
推荐文章
最近访客