博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ceph中的threadpool
阅读量:4215 次
发布时间:2019-05-26

本文共 3222 字,大约阅读时间需要 10 分钟。

ceph中采用threadpool来增加并发性能ThreadPool::ThreadPool(CephContext *cct_, string nm, string tn, int n, const char *option)  : cct(cct_), name(std::move(nm)), thread_name(std::move(tn)),    ioprio_class(-1),    ioprio_priority(-1),    _num_threads(n),{}在其构造函数中会保存cephcontext ,并会设置threadpool的name,并规定threadpool中最大支持的thread在使用之前会调用start_threads来在threadpool中创建好最大支持的threadvoid ThreadPool::start_threads(){  assert(_lock.is_locked());  #循环创建thread  while (_threads.size() < _num_threads) {  #这里的workthread是thread的子类,这里会创建形参制定的_num_threads 个thread    WorkThread *wt = new WorkThread(this);    ldout(cct, 10) << "start_threads creating and starting " << wt << dendl;    _threads.insert(wt);	#为每个thread 设置优先级    int r = wt->set_ioprio(ioprio_class, ioprio_priority);    if (r < 0)      lderr(cct) << " set_ioprio got " << cpp_strerror(r) << dendl;	#开始工作    wt->create(thread_name.c_str());  }}wt->create 后就会进入到工作函数void ThreadPool::worker(WorkThread *wt){  _lock.Lock();  ldout(cct,10) << "worker start" << dendl;    std::stringstream ss;  ss << name << " thread " << (void *)pthread_self();  heartbeat_handle_d *hb = cct->get_heartbeat_map()->add_worker(ss.str(), pthread_self());  while (!_stop) {    // manage dynamic thread pool	#检查是否要删掉一些多余的thread    join_old_threads();	#如果threadpool中的线程已经超过最大数了,则会删除多余的thread。这里首先把要删除的thread添加到_old_threads 中,最后在join_old_threads 中删除    if (_threads.size() > _num_threads) {      ldout(cct,1) << " worker shutting down; too many threads (" << _threads.size() << " > " << _num_threads << ")" << dendl;      _threads.erase(wt);      _old_threads.push_back(wt);      break;    }    if (!_pause && !work_queues.empty()) {      WorkQueue_* wq;      int tries = work_queues.size();      bool did = false;      while (tries--) {	  #表示即将要执行work的index	next_work_queue %= work_queues.size();	#得到要工作的wq	wq = work_queues[next_work_queue++];	#得到wq中保存的要工作的指针	void *item = wq->_void_dequeue();	if (item) {	  processing++;	  ldout(cct,12) << "worker wq " << wq->name << " start processing " << item			<< " (" << processing << " active)" << dendl;	  TPHandle tp_handle(cct, hb, wq->timeout_interval, wq->suicide_interval);	  tp_handle.reset_tp_timeout();	  _lock.Unlock();	  #开始执行	  wq->_void_process(item, tp_handle);	  _lock.Lock();	  #结束执行,其中_void_process 和 _void_process_finish 都要有WorkQueue_的子类来实现	  wq->_void_process_finish(item);	  processing--;	  ldout(cct,15) << "worker wq " << wq->name << " done processing " << item			<< " (" << processing << " active)" << dendl;	  if (_pause || _draining)	    _wait_cond.Signal();	  did = true;	  break;	}      }      if (did)	continue;    }    ldout(cct,20) << "worker waiting" << dendl;    cct->get_heartbeat_map()->reset_timeout(      hb,      cct->_conf->threadpool_default_timeout,      0);    _cond.WaitInterval(_lock,      utime_t(	cct->_conf->threadpool_empty_queue_max_wait, 0));  }}最后来看看void ThreadPool::join_old_threads(){  assert(_lock.is_locked());  #_old_threads 不为空,说明有thread需要删除  while (!_old_threads.empty()) {    ldout(cct, 10) << "join_old_threads joining and deleting " << _old_threads.front() << dendl;	#_old_threads 中的第一个thread ,等待其任务执行完成    _old_threads.front()->join();	#执行完成后删掉这个线程    delete _old_threads.front();    _old_threads.pop_front();  }}

转载地址:http://sjnmi.baihongyu.com/

你可能感兴趣的文章
【linux】.fuse_hiddenXXXX 文件是如何生成的?
查看>>
【LKM】整合多个LKM为1个
查看>>
【Windows C++】调用powershell上传指定目录下所有文件
查看>>
Java图形界面中单选按钮JRadioButton和按钮Button事件处理
查看>>
小练习 - 排序:冒泡、选择、快排
查看>>
SparkStreaming 如何保证消费Kafka的数据不丢失不重复
查看>>
Spark Shuffle及其调优
查看>>
数据仓库分层
查看>>
常见数据结构-TrieTree/线段树/TreeSet
查看>>
Hive数据倾斜
查看>>
TopK问题
查看>>
Hive调优
查看>>
HQL排查数据倾斜
查看>>
DAG以及任务调度
查看>>
LeetCode——DFS
查看>>
MapReduce Task数目划分
查看>>
ZooKeeper分布式锁
查看>>
3126 Prime Path
查看>>
app自动化测试---ADBInterface驱动安装失败问题:
查看>>
RobotFramework+Eclipse安装步骤
查看>>