本文共 3222 字,大约阅读时间需要 10 分钟。
ceph中采用threadpool来增加并发性能ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option) : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)), ioprio_class(-1), ioprio_priority(-1), _num_threads(n),{}在其构造函数中会保存cephcontext ,并会设置threadpool的name,并规定threadpool中最大支持的thread在使用之前会调用start_threads来在threadpool中创建好最大支持的threadvoid ThreadPool::start_threads(){ assert(_lock.is_locked()); #循环创建thread while (_threads.size() < _num_threads) { #这里的workthread是thread的子类,这里会创建形参制定的_num_threads 个thread WorkThread *wt = new WorkThread(this); ldout(cct, 10) << "start_threads creating and starting " << wt << dendl; _threads.insert(wt); #为每个thread 设置优先级 int r = wt->set_ioprio(ioprio_class, ioprio_priority); if (r < 0) lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl; #开始工作 wt->create(thread_name.c_str()); }}wt->create 后就会进入到工作函数void ThreadPool::worker(WorkThread *wt){ _lock.Lock(); ldout(cct,10) << "worker start" << dendl; std::stringstream ss; ss << name << " thread " << (void *)pthread_self(); heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self()); while (!_stop) { // manage dynamic thread pool #检查是否要删掉一些多余的thread join_old_threads(); #如果threadpool中的线程已经超过最大数了,则会删除多余的thread。这里首先把要删除的thread添加到_old_threads 中,最后在join_old_threads 中删除 if (_threads.size() > _num_threads) { ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl; _threads.erase(wt); _old_threads.push_back(wt); break; } if (!_pause && !work_queues.empty()) { WorkQueue_* wq; int tries = work_queues.size(); bool did = false; while (tries--) { #表示即将要执行work的index next_work_queue %= work_queues.size(); #得到要工作的wq wq = work_queues[next_work_queue++]; #得到wq中保存的要工作的指针 void *item = wq->_void_dequeue(); if (item) { processing++; ldout(cct,12) << "worker wq " << wq->name << " start processing " << item << " (" << processing << " active)" << dendl; TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval); tp_handle.reset_tp_timeout(); _lock.Unlock(); #开始执行 wq->_void_process(item, tp_handle); _lock.Lock(); #结束执行,其中_void_process 和 _void_process_finish 都要有WorkQueue_的子类来实现 wq->_void_process_finish(item); processing--; ldout(cct,15) << "worker wq " << wq->name << " done processing " << item << " (" << processing << " active)" << dendl; if (_pause || _draining) _wait_cond.Signal(); did = true; break; } } if (did) continue; } ldout(cct,20) << "worker waiting" << dendl; cct->get_heartbeat_map()->reset_timeout( hb, cct->_conf->threadpool_default_timeout, 0); _cond.WaitInterval(_lock, utime_t( cct->_conf->threadpool_empty_queue_max_wait, 0)); }}最后来看看void ThreadPool::join_old_threads(){ assert(_lock.is_locked()); #_old_threads 不为空,说明有thread需要删除 while (!_old_threads.empty()) { ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl; #_old_threads 中的第一个thread ,等待其任务执行完成 _old_threads.front()->join(); #执行完成后删掉这个线程 delete _old_threads.front(); _old_threads.pop_front(); }}
转载地址:http://sjnmi.baihongyu.com/