点击上方“C语言与CPP编程”,选择“关注/置顶/星标公众号”
【资料图】
干货福利,第一时间送达!
你好,我是飞宇。
线程池是一种多线程处理形式,大多用于高并发服务器上,它能合理有效地利用高并发服务器上的线程资源。在Unix网络编程中,线程与进程用于处理各项分支子功能,我们通常的操作是:/*工作线程*/void*threadpool_thread(void*threadpool){threadpool_t*pool=(threadpool_t*)threadpool;threadpool_task_ttask;while(true){pthread_mutex_lock(&(pool->lock));/*无任务则阻塞在“任务队列不为空”上,有任务则跳出*/while((pool->queue_size==0)&&(!pool->shutdown)){printf("thread0x%xiswaiting\n",(unsignedint)pthread_self());pthread_cond_wait(&(pool->queue_not_empty),&(pool->lock));/*判断是否需要清除线程,自杀功能*/if(pool->wait_exit_thr_num>0){pool->wait_exit_thr_num--;/*判断线程池中的线程数是否大于最小线程数,是则结束当前线程*/if(pool->live_thr_num>pool->min_thr_num){printf("thread0x%xisexiting\n",(unsignedint)pthread_self());pool->live_thr_num--;pthread_mutex_unlock(&(pool->lock));pthread_exit(NULL);//结束线程}}}/*线程池开关状态*/if(pool->shutdown)//关闭线程池{pthread_mutex_unlock(&(pool->lock));printf("thread0x%xisexiting\n",(unsignedint)pthread_self());pthread_exit(NULL);//线程自己结束自己}//否则该线程可以拿出任务task.function=pool->task_queue[pool->queue_front].function;//出队操作task.arg=pool->task_queue[pool->queue_front].arg;pool->queue_front=(pool->queue_front+1)%pool->queue_max_size;//环型结构pool->queue_size--;//通知可以添加新任务pthread_cond_broadcast(&(pool->queue_not_full));//释放线程锁pthread_mutex_unlock(&(pool->lock));//执行刚才取出的任务printf("thread0x%xstartworking\n",(unsignedint)pthread_self());pthread_mutex_lock(&(pool->thread_counter));//锁住忙线程变量pool->busy_thr_num++;pthread_mutex_unlock(&(pool->thread_counter));(*(task.function))(task.arg);//执行任务//任务结束处理printf("thread0x%xendworking\n",(unsignedint)pthread_self());pthread_mutex_lock(&(pool->thread_counter));pool->busy_thr_num--;pthread_mutex_unlock(&(pool->thread_counter));}pthread_exit(NULL);}
任务队列的存在形式与线程数组相似;在线程池初始化时根据传入的最大任务数开辟空间;当服务器前方后请求到来后,分类并打包消息成为任务,将任务放入任务队列并通知空闲线程来取;不同之处在于任务队列有明显的先后顺序,先进先出;而线程数组中的线程则是一个竞争关系去拿到互斥锁争取任务。
/*向线程池的任务队列中添加一个任务*/intthreadpool_add_task(threadpool_t*pool,void*(*function)(void*arg),void*arg){pthread_mutex_lock(&(pool->lock));/*如果队列满了,调用wait阻塞*/while((pool->queue_size==pool->queue_max_size)&&(!pool->shutdown))pthread_cond_wait(&(pool->queue_not_full),&(pool->lock));/*如果线程池处于关闭状态*/if(pool->shutdown){pthread_mutex_unlock(&(pool->lock));return-1;}/*清空工作线程的回调函数的参数arg*/if(pool->task_queue[pool->queue_rear].arg!=NULL){free(pool->task_queue[pool->queue_rear].arg);pool->task_queue[pool->queue_rear].arg=NULL;}/*添加任务到任务队列*/pool->task_queue[pool->queue_rear].function=function;pool->task_queue[pool->queue_rear].arg=arg;pool->queue_rear=(pool->queue_rear+1)%pool->queue_max_size;/*逻辑环*/pool->queue_size++;/*添加完任务后,队列就不为空了,唤醒线程池中的一个线程*/pthread_cond_signal(&(pool->queue_not_empty));pthread_mutex_unlock(&(pool->lock));return0;}
作为线程池的管理者,该线程的主要功能包括:
说到底,它就是一个单独的线程,定时的去检查,根据我们的一个维持平衡算法去增删线程。
/*管理线程*/void*admin_thread(void*threadpool){inti;threadpool_t*pool=(threadpool_t*)threadpool;while(!pool->shutdown){printf("admin-----------------\n");sleep(DEFAULT_TIME);/*隔一段时间再管理*/pthread_mutex_lock(&(pool->lock));/*加锁*/intqueue_size=pool->queue_size;/*任务数*/intlive_thr_num=pool->live_thr_num;/*存活的线程数*/pthread_mutex_unlock(&(pool->lock));/*解锁*/pthread_mutex_lock(&(pool->thread_counter));intbusy_thr_num=pool->busy_thr_num;/*忙线程数*/pthread_mutex_unlock(&(pool->thread_counter));printf("adminbusylive-%d--%d-\n",busy_thr_num,live_thr_num);/*创建新线程实际任务数量大于最小正在等待的任务数量,存活线程数小于最大线程数*/if(queue_size>=MIN_WAIT_TASK_NUM&&live_thr_num<=pool->max_thr_num){printf("adminadd-----------\n");pthread_mutex_lock(&(pool->lock));intadd=0;/*一次增加DEFAULT_THREAD_NUM个线程*/for(i=0;imax_thr_num&&addlive_thr_nummax_thr_num;i++){if(pool->threads[i]==0||!is_thread_alive(pool->threads[i])){pthread_create(&(pool->threads[i]),NULL,threadpool_thread,(void*)pool);add++;pool->live_thr_num++;printf("newthread-----------------------\n");}}pthread_mutex_unlock(&(pool->lock));}/*销毁多余的线程忙线程x2都小于存活线程,并且存活的大于最小线程数*/if((busy_thr_num*2)pool->min_thr_num){//printf("adminbusy--%d--%d----\n",busy_thr_num,live_thr_num);/*一次销毁DEFAULT_THREAD_NUM个线程*/pthread_mutex_lock(&(pool->lock));pool->wait_exit_thr_num=DEFAULT_THREAD_NUM;pthread_mutex_unlock(&(pool->lock));for(i=0;iqueue_not_empty));printf("admincler--\n");}}}returnNULL;/*线程是否存活*/intis_thread_alive(pthread_ttid){intkill_rc=pthread_kill(tid,0);//发送0号信号,测试是否存活if(kill_rc==ESRCH)//线程不存在{returnfalse;}returntrue;}
/*释放线程池*/intthreadpool_free(threadpool_t*pool){if(pool==NULL)return-1;if(pool->task_queue)free(pool->task_queue);if(pool->threads){free(pool->threads);pthread_mutex_lock(&(pool->lock));/*先锁住再销毁*/pthread_mutex_destroy(&(pool->lock));pthread_mutex_lock(&(pool->thread_counter));pthread_mutex_destroy(&(pool->thread_counter));pthread_cond_destroy(&(pool->queue_not_empty));pthread_cond_destroy(&(pool->queue_not_full));}free(pool);pool=NULL;return0;}
/*销毁线程池*/intthreadpool_destroy(threadpool_t*pool){inti;if(pool==NULL){return-1;}pool->shutdown=true;/*销毁管理者线程*/pthread_join(pool->admin_tid,NULL);//通知所有线程去自杀(在自己领任务的过程中)for(i=0;ilive_thr_num;i++){pthread_cond_broadcast(&(pool->queue_not_empty));}/*等待线程结束先是pthread_exit然后等待其结束*/for(i=0;ilive_thr_num;i++){pthread_join(pool->threads[i],NULL);}threadpool_free(pool);return0;}
/*线程池初始化,其管理者线程及工作线程都会启动*/threadpool_t*thp=threadpool_create(10,100,100);printf("threadpoolinit......\n");/*接收到任务后添加*/threadpool_add_task(thp,do_work,(void*)p);//....../*销毁*/threadpool_destroy(thp);
原文:https://blog.csdn.net/qq_36359022/article/details/78796784
你好,我是飞宇,本硕均于某中流985 CS就读,先后于百度搜索以及字节某电商部门担任Linux C/C++后端研发工程师。
同时,我也是知乎博主@韩飞宇,日常分享C/C++、计算机学习经验、工作体会,欢迎点击阅读原文围观我分享的学习经验
我组建了一些社群一起交流,群里有大牛也有小白,如果你有意可以一起进群交流。
欢迎你添加我的微信,我拉你进技术交流群。此外,我也会经常在微信上分享一些计算机学习经验以及工作体验,还有一些内推机会。
扫描上方二维码,加我微信
我的私人微信
点击阅读原文查看你错过的宝藏
标签:
Copyright © 2015-2022 北方财富网版权所有 备案号:京ICP备2021034106号-50 联系邮箱: 55 16 53 8@qq.com