前言 本文介绍线程池的作用、线程池的应用场景、线程池的工作原理、代码实现线程池以及与nginx的线程池对比分析。线程池的作用 为什么会有线程池,到底解决了什么问题减少线程的创建与销毁(线程的角度)异步解耦的作用(设计的角度)线程池的异步处理使用场景 以日志为例,在写日志loginfo(xxx),与日志落盘,是两码事,它们两之间应该是异步的。那么异步解耦就是将日志当作一个任务task,将这个任务抛给线程池去处理,由线程池去负责日志落盘。对于应用程序而言,就可以提升落盘的效率。 以nginx为例,一秒几万的请求,速度很快。如果在其中加一个日志,那么qps一下子就掉下来了,因为每请求一次就需要落盘一次,那么整个服务器的性能就下降。我们可以引入一个线程池,把日志这个任务抛给线程池,对于主循环来说,就只抛任务即可,这样就可以大大提升主线程的效率。这就是线程池异步解耦的作用 不仅仅是日志落盘,还有很多地方都可以用线程池,比较耗时的操作如数据库操作,io处理等,都可以用线程池。 线程池有必要将线程与cpu做亲和性吗?在注重cpu处理能力的时候,可以做黏合;如果注重的是异步解耦,那么这里更加注重的是任务,没必要将线程和cpu做绑定。线程池工作原理线程池应该提供哪些api 我们在使用线程池的时候,是当作一个组件去使用。所以在使用组件的时候,我们首先想到的是线程池应该提供哪些api。线程池的初始化(创建)initcreate往池里面抛任务pushtask线程池的销毁deinitdestroy 这三个api是最核心的api,其他可扩展的api都是可有可无的,而这三个api是一定要有的。线程池的三个组件 想象去银行营业厅的场景。柜员:为客户提供服务;客户:是来办业务的,对于柜员来说,这些人就是任务。那么这两个形象就构建出了pthread和task。 那么这个公示牌(xxx号来几号柜台办理业务),是谁的属性呢?告示牌的作用是管理客户和柜员有秩序工作,它不隶属于柜员,也不隶属于客户,它是一个管理工具。柜员pthread客户task告示牌管理柜员和客户有秩序的工作(不会出现一个任务同时被多个线程处理的情况) 这么这就自然而然的形成了3个组件,那么它们都应该有什么属性呢:对于柜员来说:工号id,停止工作标识符flag对于客户来说:如果办理取款需要带银行卡,如果办贷款需要带凭证等等,所以需要一个任务func(),以及对应任务的参数arg对于告示牌来说:如果没有客户,那么柜员就需要在工作中等待客户的到来,所以第一个需要条件等待cond,既然要管理有秩序的工作,肯定需要mutex来保证临界资源 下面将柜员称为执行队列,客户称为任务队列,告示牌称为池管理组件。 错误理解:要使用线程就从线程池里面拿一个线程出来使用,用完再返回给线程池。这种理解是连接池的概念。而线程池是多个线程去任务队列取任务,竞争任务。 所以线程的核心就是下面的伪代码:while(1){gettask();taskfunc();} 相关视频推荐 150行代码,带你手写线程池,自行准备linux环境 cpu密集型和io密集型的线程池在开源框架中的应用 学习地址:CCLinux服务器开发后台架构师【零声教育】学习视频教程腾讯课堂 需要CCLinux服务器架构师学习资料加qun812855908获取(资料包括CC,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCPIP,协程,DPDK,ffmpeg等),免费分享 代码实现线程池的任务队列、执行队列、池管理组件的定义与添加删除 执行队列typedefstructNWORKER{structNTHREADPOLLstructNWORKERstructNWORKER}任务队列typedefstructNTASK{void(taskfunc)(voidarg);structNTASKstructNTASK}池管理组件typedefstructNTHREADPOLL{} 头插法defineLLADD(item,list)do{itemprevNULL;if(list!NULL){}listitem}while(0)defineLLREMOVE(item,list)do{if(itemprev!NULL){}if(itemnext!NULL){}if(listitem){}itemprevitemnextNULL;}while(0)三个api 创建其实就是创建threadpollt结构体,然后按照给定的宏创建线程和worker。 push就是给task队列增加一个任务,然后用signal通知cond。 销毁将所有线程的termination置1,然后广播cond即可。intthreadpollcreate(threadpolltthreadpoll,intthreadnum){if(threadnum1)threadnum1;memset(threadpoll,0,sizeof(threadpollt));initcondpthreadcondtblankcondPTHREADCONDINITIALIZER;memcpy(threadpollcond,blankcond,sizeof(pthreadcondt));initmutexpthreadmutextblankmutexPTHREADMUTEXINITIALIZER;memcpy(threadpollmutex,blankmutex,sizeof(pthreadmutext));onethreadoneworkerintidx0;for(idx0;idx){workertworkermalloc(sizeof(workert));if(workerNULL){perror(workermallocerr);}memset(worker,0,sizeof(workert));intretpthreadcreate(workerid,NULL,threadcallback,worker);if(ret){perror(pthreadcreateerr);free(worker);}LLADD(worker,threadpollworkers);}}intthreadpollpushtask(threadpolltthreadpoll,taskttask){pthreadmutexlock(threadpollmutex);LLADD(task,threadpolltasks);pthreadcondsignal(threadpollcond);pthreadmutexunlock(threadpollmutex);}intthreaddestroy(threadpolltthreadpoll){workertworkerNULL;for(worker!NULL;workerworkernext){workertermination1;}pthreadmutexlock(threadpollmutex);pthreadcondbroadcast(threadpollcond);pthreadmutexunlock(threadpollmutex);}线程的回调函数 线程要做的就是取任务,执行任务。取任务从任务队列里面取。tasktgettask(workertworker){while(1){pthreadmutexlock(workerthreadpollmutex);while(workerthreadpollworkersNULL){if(workertermination)pthreadcondwait(workerthreadpollcond,workerthreadpollmutex);}if(workertermination){pthreadmutexunlock(workerthreadpollmutex);returnNULL;}if(task){LLREMOVE(task,workerthreadpolltasks);}pthreadmutexunlock(workerthreadpollmutex);if(task!NULL){}}};voidthreadcallback(voidarg){workertworker(workert)while(1){taskttaskgettask(worker);if(taskNULL){free(worker);pthreadexit(threadtermination);}tasktaskfunc(task);}}测试代码 这里我们创建了1000个task,开了10个thread。记住task以及task的参数,由task的func来销毁。Createdby68725on2022725。includepthread。hincludememory。hincludemalloc。hincludestdio。hincludeunistd。h头插法defineLLADD(item,list)do{itemprevNULL;if(list!NULL){}}while(0)defineLLREMOVE(item,list)do{if(itemprev!NULL){}if(itemnext!NULL){}if(listitem){}itemprevitemnextNULL;}while(0)执行队列typedefstructNWORKER{structNTHREADPOLLstructNWORKERstructNWORKER}任务队列typedefstructNTASK{void(taskfunc)(voidarg);structNTASKstructNTASK}池管理组件typedefstructNTHREADPOLL{}tasktgettask(workertworker){while(1){pthreadmutexlock(workerthreadpollmutex);while(workerthreadpollworkersNULL){if(workertermination)pthreadcondwait(workerthreadpollcond,workerthreadpollmutex);}if(workertermination){pthreadmutexunlock(workerthreadpollmutex);returnNULL;}if(task){LLREMOVE(task,workerthreadpolltasks);}pthreadmutexunlock(workerthreadpollmutex);if(task!NULL){}}};voidthreadcallback(voidarg){workertworker(workert)while(1){taskttaskgettask(worker);if(taskNULL){free(worker);pthreadexit(threadtermination);}tasktaskfunc(task);}}intthreadpollcreate(threadpolltthreadpoll,intthreadnum){if(threadnum1)threadnum1;memset(threadpoll,0,sizeof(threadpollt));initcondpthreadcondtblankcondPTHREADCONDINITIALIZER;memcpy(threadpollcond,blankcond,sizeof(pthreadcondt));initmutexpthreadmutextblankmutexPTHREADMUTEXINITIALIZER;memcpy(threadpollmutex,blankmutex,sizeof(pthreadmutext));onethreadoneworkerintidx0;for(idx0;idx){workertworkermalloc(sizeof(workert));if(workerNULL){perror(workermallocerr);}memset(worker,0,sizeof(workert));intretpthreadcreate(workerid,NULL,threadcallback,worker);if(ret){perror(pthreadcreateerr);free(worker);}LLADD(worker,threadpollworkers);}}intthreadpollpushtask(threadpolltthreadpoll,taskttask){pthreadmutexlock(threadpollmutex);LLADD(task,threadpolltasks);pthreadcondsignal(threadpollcond);pthreadmutexunlock(threadpollmutex);}intthreaddestroy(threadpolltthreadpoll){workertworkerNULL;for(worker!NULL;workerworkernext){workertermination1;}pthreadmutexlock(threadpollmutex);pthreadcondbroadcast(threadpollcond);pthreadmutexunlock(threadpollmutex);}voidcounter(taskttask){intidx(int)printf(idx:dpthreadid:llu,idx,pthreadself());free(taskuserdata);free(task);}defineTHREADCOUNT10defineTASKCOUNT1000intmain(){threadpolltthreadpoll{0};intretthreadpollcreate(threadpoll,THREADCOUNT);if(ret!THREADCOUNT){threaddestroy(threadpoll);}inti0;for(i0;iTASKCOUNT;i){createtasktaskttask(taskt)malloc(sizeof(taskt));if(taskNULL){perror(taskmallocerr);exit(1);}taskuserdatamalloc(sizeof(int));(int)pushtaskthreadpollpushtask(threadpoll,task);}getchar();threaddestroy(threadpoll);}nginx线程池实现对比分祈线程池初始化对比 cond初始化,mutex初始化,创建线程 线程回调函数对比 取任务,执行任务 push任务对比 nginx是将任务插到尾部,我们做的是插到头部 线程数量的抉择 线程到底初始化多少呢?如果是计算密集型就不用太多的线程,如果是任务密集型可以多几个。以下是经验值,不一定一定按照这个来。计算密集型:强计算,计算时间较长,线程数量与cpu核心数成比例即可,如1:1。任务密集型:处理任务,io操作。可以开多一点,如cpu核心数的2倍。线程池的动态扩缩 随着任务越来越多,线程不够用怎么办?我们可以开一个监控线程,设nrunning线程总线程。当n上水位时,监控线程创建几个线程;当n下水位时,监控线程销毁几个线程。可以设置30和70。