-
Notifications
You must be signed in to change notification settings - Fork 60
AnchorTask 1.0.0 原理说明
徐俊 edited this page Feb 22, 2021
·
1 revision
AnchorTask,锚点任务,它的实现原理是构建一个有向无环图,拓扑排序之后,如果任务 B 依赖任务 A,那么 A 一定排在任务 B 之前。
了解原理之前,请必须先了解有向无环图和多线程的一些基本知识,不然,下文,你基本是看不懂的。
- 前置任务:任务 3 依赖于任务 0,1,那么任务 3 的前置任务是任务 0, 1
- 子任务:任务 0 执行完之后,任务 3 才能执行,那么称呼任务 3 为 任务 0 的子任务
这里我们采用 BFS 方法实现,算法思想大概是这样的
- 建立入度表,入度为 0 的节点先入队
- 当队列不为空,进行循环判断
- 节点出队,添加到结果 list 当中
- 将该节点的邻居入度减 1
- 若邻居课程入度为 0,加入队列
- 若结果 list 与所有节点数量相等,则证明不存在环。否则,存在环
这里要解决的主要有三个问题
- 首先我们要解决一个问题,它有哪些前置任务,这个可以用 list 存储,代表它依赖的任务 list。当它所依赖的任务 list 没有执行完毕,当前任务需要等待。
- 当前任务执行完毕之后,所有依赖它的子任务需要感知到。我们可以用一个 map 来存储这种关系,key 是当前任务名称
taskName
,value 是依赖于当前任务的集合(list) - 多线程当中,等待和唤醒功能,有多种方式可以实现。wait、notify 机制,ReentrantLock Condition 机制,CountDownLatch 机制。这里我们选择 CountDownLatch 机制,因为 CountDownLatch 有点类似于计数器,特别适合这种场景。
首先,我们定义一个 IAnchorTask 接口,主要有一个方法
-
isRunOnMainThread(): Boolean
表示是否在主线程运行,默认值是 false -
priority(): Int
方法 表示线程的优先级别,默认值是 Process.THREAD_PRIORITY_FOREGROUND -
needWait()
表示当我们调用AnchorTaskDispatcher await
时,是否需要等待,return true,表示需要等待改任务执行结束,AnchorTaskDispatcher await
方法才能继续往下执行。 -
fun run()
方法,表示任务执行的时候
interface IAnchorTask : IAnchorCallBack {
/**
* 是否在主线程执行
*/
fun isRunOnMainThread(): Boolean
/**
* 任务优先级别
*/
@IntRange(
from = Process.THREAD_PRIORITY_FOREGROUND.toLong(),
to = Process.THREAD_PRIORITY_LOWEST.toLong()
)
fun priority(): Int
/**
* 调用 await 方法,是否需要等待改任务执行完成
* true 不需要
* false 需要
*/
fun needWait(): Boolean
/**
* 任务被执行的时候回调
*/
fun run()
}
它有一个实现类 AnchorTask,增加了 await 和 countdown 方法
-
await
方法,调用它,当前任务会等待 -
countdown()
方法,如果当前计数器值 > 0,会减一,否则,什么也不操作 -
afterTask()
方法,方法参数是 taskName: String,表示添加前置任务
abstract class AnchorTask(private val name: String) : IAnchorTask {
companion object {
const val TAG = "AnchorTask"
}
private lateinit var countDownLatch: CountDownLatch
val dependList: MutableList<String> = ArrayList()
private fun getListSize() = getDependsTaskList()?.size ?: 0
override fun getTaskName(): String {
return name
}
override fun priority(): Int {
return Process.THREAD_PRIORITY_FOREGROUND
}
override fun needWait(): Boolean {
return true
}
fun afterTask(taskName: String) {
dependList.add(taskName)
}
/**
* self call,await
*/
fun await() {
tryToInitCountDown()
countDownLatch.await()
}
private fun tryToInitCountDown() {
if (!this::countDownLatch.isInitialized) {
countDownLatch = CountDownLatch(dependList.size)
}
}
/**
* parent call, countDown
*/
fun countdown() {
tryToInitCountDown()
countDownLatch.countDown()
}
override fun isRunOnMainThread(): Boolean {
return false
}
fun getDependsTaskList(): List<String>? {
return dependList
}
}
无环图的拓扑排序,这里采用的是 BFS 算法。具体的可以见 AnchorTaskUtils#getSortResult
方法,它有三个参数
- list 存储所有的任务
-
taskMap: MutableMap<String, AnchorTask>
存储所有的任务,key 是taskName
,value 是 AnchorTask -
taskChildMap: MutableMap<String, ArrayList<AnchorTask>?>
,储存当前任务的子任务, key 是当前任务的taskName
,value 是 AnchorTask 的 list
算法思想
- 首先找出所有入度为 0 的队列,用 queue 变量存储
- 当队列不为空,进行循环判断。
- 从队列 pop 出,添加到结果队列
- 遍历当前任务的子任务,通知他们的入度减一(其实是遍历 taskChildMap),如果入度为 0,添加到队列 queue 里面
- 当结果队列和 list size 不相等试,证明有环
@JvmStatic
fun getSortResult(
list: MutableList<AnchorTask>, taskMap: MutableMap<String, AnchorTask>,
taskChildMap: MutableMap<String, ArrayList<AnchorTask>?>
): MutableList<AnchorTask> {
val result = ArrayList<AnchorTask>()
// 入度为 0 的队列
val queue = ArrayDeque<AnchorTask>()
val taskIntegerHashMap = HashMap<String, Int>()
// 建立每个 task 的入度关系
list.forEach { anchorTask: AnchorTask ->
val taskName = anchorTask.getTaskName()
if (taskIntegerHashMap.containsKey(taskName)) {
throw AnchorTaskException("anchorTask is repeat, anchorTask is $anchorTask, list is $list")
}
val size = anchorTask.getDependsTaskList()?.size ?: 0
taskIntegerHashMap[taskName] = size
taskMap[taskName] = anchorTask
if (size == 0) {
queue.offer(anchorTask)
}
}
// 建立每个 task 的 children 关系
list.forEach { anchorTask: AnchorTask ->
anchorTask.getDependsTaskList()?.forEach { taskName: String ->
var list = taskChildMap[taskName]
if (list == null) {
list = ArrayList<AnchorTask>()
}
list.add(anchorTask)
taskChildMap[taskName] = list
}
}
taskChildMap.entries.iterator().forEach {
LogUtils.d("TAG","key is ${it.key}, value is ${it.value}")
}
// 使用 BFS 方法获得有向无环图的拓扑排序
while (!queue.isEmpty()) {
val anchorTask = queue.pop()
result.add(anchorTask)
val taskName = anchorTask.getTaskName()
taskChildMap[taskName]?.forEach { // 遍历所有依赖这个顶点的顶点,移除该顶点之后,如果入度为 0,加入到改队列当中
val key = it.getTaskName()
var result = taskIntegerHashMap[key] ?: 0
result--
if (result == 0) {
queue.offer(it)
}
taskIntegerHashMap[key] = result
}
}
// size 不相等,证明有环
if (list.size != result.size) {
throw AnchorTaskException("Ring appeared,Please check.list is $list, result is $result")
}
return result
}
AnchorProject 建造者模式的实现,我们主要看 build 方法。
- 获取有向无环图的拓扑排序
- 根据 task 是否在主线程,分别添加到 mainList 和 threadList
fun build(): AnchorProject {
val sortResult = AnchorTaskUtils.getSortResult(list, taskMap, taskChildMap)
LogUtils.d(TAG, "start: sortResult is $sortResult")
sortResult.forEach {
if (it.isRunOnMainThread()) {
mainList.add(it)
} else {
threadList.add(it)
}
}
countDownLatch = CountDownLatch(needWaitCount.get())
return AnchorProject(this)
}
这两个类 这个类很重要,有向无环图的拓扑排序和多线程的依赖唤醒,都是借助这个核心类完成的。
它主要有几个成员变量
// 存储所有的任务,key 是 taskName,value 是 AnchorTask
private val taskMap: MutableMap<String, AnchorTask> = builder.taskMap
// 储存当前任务的子任务, key 是当前任务的 taskName,value 是 AnchorTask 的 list
private val taskChildMap: MutableMap<String, ArrayList<AnchorTask>?> = builder.taskChildMap
//需要等待的任务总数,用于阻塞
private val countDownLatch: CountDownLatch = builder.countDownLatch
// 拓扑排序之后的主线程任务
private val mainList = builder.mainList
// 拓扑排序之后的子线程任务
private val threadList = builder.threadList
// 需要等待的任务的总数
private val totalTaskSize = builder.list.size
private val finishTask = AtomicInteger(0)
它有一个比较重要的方法 setNotifyChildren(anchorTask: AnchorTask)
,有一个方法参数 AnchorTask,它的作用是通知该任务的子任务,当前任务执行完毕,入度数减一。
/**
* 通知 child countdown,当前的阻塞任务书也需要 countdown
*/
fun setNotifyChildren(anchorTask: AnchorTask) {
taskChildMap[anchorTask.getTaskName()]?.forEach {
taskMap[it.getTaskName()]?.countdown()
}
if (anchorTask.needWait()) {
countDownLatch.countDown()
}
listeners.forEach {
it.onTaskFinish(anchorTask.getTaskName())
}
finishTask.incrementAndGet()
if (finishTask.get() == totalTaskSize) {
executeMonitor.recordProjectFinish()
ThreadUtils.runOnUiThread(Runnable {
onGetMonitorRecordCallback?.onGetProjectExecuteTime(executeMonitor.projectCostTime)
onGetMonitorRecordCallback?.onGetTaskExecuteRecord(executeMonitor.executeTimeMap)
})
listeners.forEach {
it.onProjectFinish()
}
}
}
接下来看一下 start 方法
fun start(): AnchorProject {
executeMonitor.recordProjectStart()
this.listeners.forEach {
it.onProjectStart()
}
this.threadList.forEach {
threadPoolExecutor.execute(AnchorTaskRunnable(this, anchorTask = it))
}
this.mainList.forEach {
AnchorTaskRunnable(this, anchorTask = it).run()
}
return this
}
它会根据拓扑排序的排序结果,执行相应的任务。可以看到在执行任务的时候,我们使用 AnchorTaskRunnable
包裹起来
class AnchorTaskRunnable(
private val anchorProject: AnchorProject,
private val anchorTask: AnchorTask
) : Runnable {
override fun run() {
Process.setThreadPriority(anchorTask.priority())
// 前置任务没有执行完毕的话,等待,执行完毕的话,往下走
anchorTask.await()
anchorTask.onStart()
// 执行任务
val startTime = SystemClock.elapsedRealtime()
anchorTask.run()
val executeTime = SystemClock.elapsedRealtime() - startTime
anchorProject.record(anchorTask.getTaskName(), executeTime)
anchorTask.onFinish()
// 通知子任务,当前任务执行完毕了,相应的计数器要减一。
anchorProject.setNotifyChildren(anchorTask)
}
}
AnchorTaskRunnable 有点类似于装饰者模式,多线程依赖的执行关系在这里都得到体现,只有几行代码
- 前置任务没有执行完毕的话,等待,执行完毕的话,往下走
- 执行任务
- 通知子任务,当前任务执行完毕了,相应的计数器(入度数)要减一。
AnchorTask 的原理不复杂,本质是有向无环图与多线程知识的结合。
- 根据 BFS 构建出有向无环图,并得到它的拓扑排序
- 在多线程执行过程中,我们是通过任务的子任务关系和 CounDownLatch 确保先后执行关系的
- 前置任务没有执行完毕的话,等待,执行完毕的话,往下走
- 执行任务
- 通知子任务,当前任务执行完毕了,相应的计数器(入度数)要减一。