#include "threadpool.h"
TaskQueue::TaskQueue(){ pthread_mutex_init(&m_mutex,NULL); } TaskQueue::~TaskQueue(){ pthread_mutex_destroy(&m_mutex); } void TaskQueue::addTask(Task& task){ pthread_mutex_lock(&m_mutex); m_taskQ.push(task); pthread_mutex_unlock(&m_mutex); } void TaskQueue::addTask(callback f,void* arg){ pthread_mutex_lock(&m_mutex); m_taskQ.push(Task(f,arg)); pthread_mutex_unlock(&m_mutex); } Task TaskQueue::takeTask(){ Task t; pthread_mutex_lock(&m_mutex); if(!m_taskQ.empty()){ t = m_taskQ.front(); m_taskQ.pop(); } pthread_mutex_unlock(&m_mutex); return t; }
ThreadPool::ThreadPool(int min,int max){ taskQ = new TaskQueue; do{ if(taskQ == nullptr){ cout<<"creat tackqueue error..."<<endl; break; } threadID = new pthread_t[max]; if(threadID == NULL){ cout<<"creat threadpool error..."<<endl; break; } memset(threadID,0,sizeof(pthread_t) * max); minNum = min; maxNum = max; busyNum = 0; liveNum = min; exitNum = 0; if(pthread_mutex_init(&mutexPool,NULL) != 0 || pthread_cond_init(¬Empty,NULL) != 0){ cout<<"mutex or condition init fail..."<<endl; break; } shutdown = false;
pthread_create(&managerID,NULL,manager,this); for(int i = 0;i < minNum;++i){ pthread_create(&threadID[i],NULL,worker,this); } }while(0); } ThreadPool::~ThreadPool(){ shutdown = true; pthread_join(managerID,NULL); for(int i = 0;i < liveNum;++i){ pthread_cond_signal(¬Empty); } if(threadID) delete[]threadID; if(taskQ) delete taskQ; pthread_mutex_destroy(&mutexPool); pthread_cond_destroy(¬Empty); }
void ThreadPool::addTask(Task task){ if(shutdown){ pthread_mutex_unlock(&mutexPool); return; } taskQ->addTask(task); pthread_cond_signal(¬Empty); }
int ThreadPool::getBusyNum(){ pthread_mutex_lock(&mutexPool); int busyNum = this->busyNum; pthread_mutex_unlock(&mutexPool); return busyNum; }
int ThreadPool::getLiveNum(){ pthread_mutex_lock(&mutexPool); int liveNum = this->liveNum; pthread_mutex_unlock(&mutexPool); return liveNum; }
void* ThreadPool::worker(void* arg){ ThreadPool* pool = static_cast<ThreadPool*> (arg); while(true){ pthread_mutex_lock(&pool->mutexPool); while(pool->taskQ->taskNumber() == 0 && !pool->shutdown){ pthread_cond_wait(&pool->notEmpty,&pool->mutexPool); if(pool->exitNum > 0){ pool->exitNum--; if(pool->liveNum > pool->minNum){ pool->liveNum--; pthread_mutex_unlock(&pool->mutexPool); pool->threadExit(); } } } if(pool->shutdown){ pthread_mutex_unlock(&pool->mutexPool); pool->threadExit(); }
Task task = pool->taskQ->takeTask(); pool->busyNum++; pthread_mutex_unlock(&pool->mutexPool); cout<<"thread"<<pthread_self()<<"start working...\n"; task.function(task.arg); arg = nullptr;
pthread_mutex_lock(&pool->mutexPool); pool->busyNum--; pthread_mutex_unlock(&pool->mutexPool); } }
void* ThreadPool::manager(void* arg){ ThreadPool* pool = static_cast<ThreadPool*>(arg); while(!pool->shutdown){ sleep(3); pthread_mutex_lock(&pool->mutexPool); int queueSize = pool->taskQ->taskNumber(); int liveNum = pool->liveNum; int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexPool); if(queueSize > liveNum && liveNum < pool->maxNum){ pthread_mutex_lock(&pool->mutexPool); int counter = 0; for(int i = 0;i < pool->maxNum && i < pool->NUMBER && pool->liveNum < pool->maxNum;++i){ if(pool->threadID[i] == 0){ pthread_create(&pool->threadID[i],NULL,worker,pool); counter++; pool->liveNum++; } } pthread_mutex_unlock(&pool->mutexPool); } if(busyNum * 2 < liveNum && liveNum > pool->minNum){ pthread_mutex_lock(&pool->mutexPool); pool->exitNum = pool->NUMBER; pthread_mutex_unlock(&pool->mutexPool); for(int i = 0;i < pool->NUMBER;++i){ pthread_cond_signal(&pool->notEmpty); } } } }
void ThreadPool::threadExit(){ ptrdiff_t tid = pthread_self(); for(int i = 0;i < maxNum;++i){ if(threadID[i] == tid){ threadID[i] = 0; cout<<"threadExit() called,"<<tid<<"exiting...\n"; break; } } pthread_exit(NULL); }
|