博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
线程池excute方法执行底层过程
阅读量:2166 次
发布时间:2019-05-01

本文共 4161 字,大约阅读时间需要 13 分钟。

线程池执行过程

调用execute(task)方法底层执行步骤

  • 1.首先检查线程池的运行状态和工作线程数量,如果工作线程总数(从ctl变量中获取线程并统计)少于核心线程数,则会创建一个新的线程来执行给的的任务,通过调用addWorker来执行任务.

  • 2.如果线程处于运行状态且工作队列能够入队新的任务,则使用double-check机制再次判断是否处于运行状态及是否能够出队任务,如果不成立,则使用拒绝策略拒绝新提交的任务,如果double-check的工作线程数量为0,则添加worker线程。

    -3.如果线程池处于shutdown或饱和状态,则创建带有firstTask的worker线程失败,则拒绝新进任务。

  • 4.对应部分源码如下

    public void execute(Runnable command) {			if (command == null)				throw new NullPointerException();	int c = ctl.get();			if (workerCountOf(c) < corePoolSize) {				if (addWorker(command, true))					return;				c = ctl.get();			}			if (isRunning(c) && workQueue.offer(command)) {				int recheck = ctl.get();				if (! isRunning(recheck) && remove(command))					reject(command);				else if (workerCountOf(recheck) == 0)					addWorker(null, false);			}			else if (!addWorker(command, false))				reject(command);		}	}	````

线程池addWorker执行过程

  • 1.boolean addWorker(Runnable firstTask, boolean core),检查是否可以根据当前线程池和给定的绑定添加来添加一个新的worker线程。返回true表示接收了任务,否则表示拒绝任务
    • 具体任务是,获取线程池线程及整体,判断线程池工作队列是否为空,如果为空,则返回false(创建worker失败);如果队列不为空,则判断线程是否大于等于线程池容量或大于核心线程数量或最大线程数量,则返回false,否则进行线程池数量自增1,进行woker创建
    • 部分源码如下,判断线程池状态及工作队列是否为空
      retry:		for (;;) {			int c = ctl.get();			int rs = runStateOf(c);			// Check if queue empty only if necessary.			if (rs >= SHUTDOWN &&				! (rs == SHUTDOWN &&				   firstTask == null &&				   ! workQueue.isEmpty()))				return false;			for (;;) {				int wc = workerCountOf(c);				if (wc >= CAPACITY ||					wc >= (core ? corePoolSize : maximumPoolSize))					return false;				if (compareAndIncrementWorkerCount(c))					break retry;				c = ctl.get();  // Re-read ctl				if (runStateOf(c) != rs)					continue retry;				// else CAS failed due to workerCount change; retry inner loop			}		}
  • 2.判断线程池状态、工作队列后,进行worker线程创建.首先,worker线程创建成功后,会获取worker对应的Thread。然后,获取线程池对应的重入锁资源,判断线程池工作状态,如果是处于SHUTDOWN或处于SHUTDOWN且提交任务为空,则进一步判断worker的线程状态是否存活,若存活则抛出异常;如果线程池状态处于RUUNNING且提交firstTask不为空,则将新建worker提交到对应的HashSet在,最后是否所资源
    • 源码如下
      boolean workerStarted = false;		boolean workerAdded = false;		Worker w = null;		try {			w = new Worker(firstTask);			final Thread t = w.thread;			if (t != null) {				final ReentrantLock mainLock = this.mainLock;				mainLock.lock();				try {					// Recheck while holding lock.					// Back out on ThreadFactory failure or if					// shut down before lock acquired.					int rs = runStateOf(ctl.get());					if (rs < SHUTDOWN ||						(rs == SHUTDOWN && firstTask == null)) {						if (t.isAlive()) // precheck that t is startable							throw new IllegalThreadStateException();						workers.add(w);						int s = workers.size();						if (s > largestPoolSize)							largestPoolSize = s;						workerAdded = true;					}				} finally {					mainLock.unlock();				}
  • 3.worker运行机制,实际执行方法,ThreadPoolExecutor#runWorker,最终执行我们最开始创建的Runnable对象的run方法。执行任务时,Java充分为各阶段预留好hook,方便自定义业务进行定制,如beforeExecute()【自定义执行run方法之前的预处理工作】,afterExecute()【执行run方法后的收尾工作】
    • 源码如下
      final void runWorker(Worker w) {//获取当前线程		Thread wt = Thread.currentThread();//获取worker对象的firstTask		Runnable task = w.firstTask;		w.firstTask = null;//释放worker对象锁资源,运行中断,提高并发量		w.unlock(); // allow interrupts		boolean completedAbruptly = true;		try {//判断任务是否为空,不空则进行锁资源获取,执行具体任务			while (task != null || (task = getTask()) != null) {				w.lock();				// If pool is stopping, ensure thread is interrupted;				// if not, ensure thread is not interrupted.  This				// requires a recheck in second case to deal with				// shutdownNow race while clearing interrupt//如果线程池处于停止状态,判断是否处于中断;如果未停止,则判断当前线程是否处于中断,并且需要再次检查当清理中断时处理shutdownNow race 				if ((runStateAtLeast(ctl.get(), STOP) ||					 (Thread.interrupted() &&					  runStateAtLeast(ctl.get(), STOP))) &&					!wt.isInterrupted())					wt.interrupt();				try {					beforeExecute(wt, task);					Throwable thrown = null;					try {						task.run();					} catch (RuntimeException x) {						thrown = x; throw x;					} catch (Error x) {						thrown = x; throw x;					} catch (Throwable x) {						thrown = x; throw new Error(x);					} finally {						afterExecute(task, thrown);					}				} finally {					task = null;					w.completedTasks++;					w.unlock();				}			}			completedAbruptly = false;		} finally {			processWorkerExit(w, completedAbruptly);		}	}

转载地址:http://phjzb.baihongyu.com/

你可能感兴趣的文章
(PAT 1154) Vertex Coloring (图的广度优先遍历)
查看>>
(PAT 1115) Counting Nodes in a BST (二叉查找树-统计指定层元素个数)
查看>>
(PAT 1143) Lowest Common Ancestor (二叉查找树的LCA)
查看>>
(PAT 1061) Dating (字符串处理)
查看>>
(PAT 1118) Birds in Forest (并查集)
查看>>
数据结构 拓扑排序
查看>>
(PAT 1040) Longest Symmetric String (DP-最长回文子串)
查看>>
(PAT 1145) Hashing - Average Search Time (哈希表冲突处理)
查看>>
(1129) Recommendation System 排序
查看>>
PAT1090 Highest Price in Supply Chain 树DFS
查看>>
(PAT 1096) Consecutive Factors (质因子分解)
查看>>
(PAT 1019) General Palindromic Number (进制转换)
查看>>
(PAT 1073) Scientific Notation (字符串模拟题)
查看>>
(PAT 1080) Graduate Admission (排序)
查看>>
Play on Words UVA - 10129 (欧拉路径)
查看>>
mininet+floodlight搭建sdn环境并创建简答topo
查看>>
【linux】nohup和&的作用
查看>>
Set、WeakSet、Map以及WeakMap结构基本知识点
查看>>
【NLP学习笔记】(一)Gensim基本使用方法
查看>>
【NLP学习笔记】(二)gensim使用之Topics and Transformations
查看>>