关于android异步编程的学习

目的

android中,对异步编程进行对比,进而选择最合适的实现方式

模拟需求

分别从宜家、家乐福获取商品桌子的信息,获取到后两者进行比较,挑选出最合意的桌子。依赖关系如下图所示。

Dependency

UI效果图如下所示:

ui0 ui1

大概业务逻辑是这样:

  1. 在UI上分别显示:”正在请求宜家数据…”,”正在请求家乐福数据…”,”等待任务1和任务2…”;
  2. 开启异步任务,并发请求宜家数据和家乐福数据;
  3. 当宜家数据请求成功后,把商品信息显示在UI上;
  4. 当家乐福数据请求成功后,同时也把商品信息显示在UI上;
  5. 第3步和第4步没有先后顺序;
  6. 当第3步中的宜家数据和第4步中家乐福数据都请求下来后,开启新的异步任务,比较两家商品谁更好。同时在UI上显示:”开始比较”
  7. 有了比较结果后,把比较结果(哪家商品更好)显示到UI上。

以下分别用基础的线程、线程池、java8支持的completefuture、rxjava、协程来实现。
需要注意的是,当用户取消任务(从当前activity返回)时,我们要去调用cancel方法。ikea表示宜家,carrefour表示家乐福,goods表示商品(这里我们用买桌子来做比喻)。

博客所引用到的代码:https://github.com/sunhang/AsyncTaskDemo

线程

用最基础的线程Thread来实现。需要注意如下几点:

  • cancel时,需要调用Thread的interrupt方法,同时设置标记量canceled为true
  • 线程中捕获InterruptedException,检查标记量canceled
  • uiTask中判断ikeaGoods和carrefourGoods是否都具备了,否则不可以去调用betterGoods去做商品比较
  • canceled没有被标记为@Volatile,因为目前只在主线程中访问了canceled
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
override fun requestServer() {
view.displayIKEAGoods(Resource(Resource.LOADING))
view.displayCarrefourGoods(Resource(Resource.LOADING))
view.displayBetterGoods(Resource(Resource.WAITING))

var ikeaGoods: Goods? = null
var carrefourGoods: Goods? = null

val uiTask = { action: () -> Unit ->
mainThreadHandler.post {
if (canceled) return@post

action()
safeLet(ikeaGoods, carrefourGoods) { it0, it1 ->
betterGoods(it0, it1)
}
}
}

threads += Thread {
try {
val goods = backendWork.getGoodsFromIKEA()
uiTask {
ikeaGoods = goods
view.displayIKEAGoods(Resource(Resource.FINISH, goods))
}
} catch (e: InterruptedException) {
e.printStackTrace()
} finally {
val currentThread = Thread.currentThread()
mainThreadHandler.post {
threads -= currentThread
}
}
}.apply { start() }

threads += Thread {
try {
val goods = backendWork.getGoodsFromCarrefour()
uiTask {
carrefourGoods = goods
view.displayCarrefourGoods(Resource(Resource.FINISH, goods))
}
} catch (e: InterruptedException) {
e.printStackTrace()
} finally {
val currentThread = Thread.currentThread()
mainThreadHandler.post {
threads -= currentThread
}
}
}.apply { start() }
}

private fun betterGoods(ikeaGoods: Goods, carrefourGoods: Goods) {
view.displayBetterGoods(Resource(Resource.LOADING))

threads += Thread {
try {
val goods = backendWork.selectBetterOne(ikeaGoods, carrefourGoods)

mainThreadHandler.post {
if (canceled) return@post

view.displayBetterGoods(Resource(Resource.FINISH, goods))
}
} catch (e: InterruptedException) {
e.printStackTrace()
} finally {
val currentThread = Thread.currentThread()
mainThreadHandler.post {
threads -= currentThread
}
}
}.apply { start() }
}

cancel

1
2
3
4
override fun cancel() {
canceled = true
threads.forEach { it.interrupt() }
}

不过,android直接用Thread做异步任务的实现,已经很少了。它本身更容易出错,而且难度大一些,要去理清同步互斥,锁的操作等。

thread pool

android开发,经常要直接接触线程池的使用和设计。针对上边的业务需求,这个是使用线程池的实现版本。
我们用到了ikeaFuture.get()carrefourFuture.get(),用于模拟等待任务1和任务2。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
override fun requestServer() {
view.displayIKEAGoods(Resource(Resource.LOADING))
view.displayCarrefourGoods(Resource(Resource.LOADING))
view.displayBetterGoods(Resource(Resource.WAITING))

val ikeaFuture = backendExecutor.submit<Goods> {
backendWork.getGoodsFromIKEA().alsoPostToUI {
view.displayIKEAGoods(Resource(Resource.FINISH, it))
}
}

val carrefourFuture = backendExecutor.submit<Goods> {
backendWork.getGoodsFromCarrefour().alsoPostToUI {
view.displayCarrefourGoods(Resource(Resource.FINISH, it))
}
}

backendExecutor.submit<Goods> {
val ikeaGoods = ikeaFuture.get()
val carrefourGoods = carrefourFuture.get()

mainHandler.post {
view.displayBetterGoods(
Resource(
Resource.LOADING,
"start compare which one is better"
)
)
}

backendWork.selectBetterOne(ikeaGoods, carrefourGoods).alsoPostToUI {
view.displayBetterGoods(Resource(Resource.FINISH, it))
}
}
}

private fun Goods.alsoPostToUI(task: (Goods) -> Unit): Goods {
mainHandler.post {
task(this)
}

return this
}

cancel

需要调用线程池的shutdownNow,这里没有调用shutdown。shutdownNow不仅取消了等待队列中的任务,而且对正在执行的任务也会通知interrupt。

1
2
3
override fun cancel() {
backendExecutor.shutdownNow()
}

future

future一般是和Callable配合使用的,Callable是由另一线程执行的并返回结果值。当前线程是不知道结果值什么时候计算完成,它通过future的get来阻塞自己,当有结果值时被唤起。

异常处理

有4个方法值得建议:

  1. 使用ExecutorService.submit执行任务,利用返回的Future对象的get方法接收抛出的异常,然后进行处理;
  2. 重写ThreadPoolExecutor.afterExecute方法,处理传递到afterExecute方法中的异常;
  3. 为工作者线程设置UncaughtExceptionHandler,在uncaughtException方法中处理异常;
  4. 在我们提供的Runnable的run方法中捕获任务代码可能抛出的所有异常,包括未检测异常。

completefuture

java8开始支持CompletableFuture,不过要在android N及更高的版本才支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
override fun requestServer() {
view.displayIKEAGoods(Resource(Resource.LOADING))
view.displayCarrefourGoods(Resource(Resource.LOADING))
view.displayBetterGoods(Resource(Resource.WAITING))


val ikeaFuture = CompletableFuture.supplyAsync(Supplier {
backendWork.getGoodsFromIKEA()
}, backendExecutor).apply {
thenAcceptAsync(Consumer {
view.displayIKEAGoods(Resource(Resource.FINISH, it))
}, mainThreadExecutor)

futures += this
}

val carrefourFuture = CompletableFuture.supplyAsync(Supplier {
backendWork.getGoodsFromCarrefour()
}, backendExecutor).apply {
thenAcceptAsync(Consumer {
view.displayCarrefourGoods(Resource(Resource.FINISH, it))
}, mainThreadExecutor)

futures += this
}

futures += ikeaFuture.thenCombineAsync(
carrefourFuture,
BiFunction<Goods, Goods, Pair<Goods, Goods>> { g0, g1 ->
view.displayBetterGoods(Resource(Resource.LOADING))
Pair(g0, g1)
},
mainThreadExecutor
).thenApplyAsync(java.util.function.Function<Pair<Goods, Goods>, Goods> {
backendWork.selectBetterOne(it.first, it.second)
}, backendExecutor).thenAcceptAsync(Consumer<Goods> {
view.displayBetterGoods(Resource(Resource.FINISH, it))
}, mainThreadExecutor)
}

1
2
3
4
5
override fun cancel() {
futures.forEach {
it.cancel(true)
}
}

rxjava

如下是rxjava的实现版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
override fun requestServer() {
view.displayIKEAGoods(Resource(Resource.LOADING))
view.displayCarrefourGoods(Resource(Resource.LOADING))
view.displayBetterGoods(Resource(Resource.WAITING))

val ikeaObservable = goodsModel.getGoodsFromIKEAAsync()
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
view.displayIKEAGoods(Resource(Resource.FINISH, it))
}

val carrefourObservable = goodsModel.getGoodsFromCarrefourAsync()
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
view.displayCarrefourGoods(Resource(Resource.FINISH, it))
}

compositeDisposable += Observable.zip(
ikeaObservable,
carrefourObservable,
BiFunction { t1: Goods, t2: Goods ->
view.displayBetterGoods(Resource(Resource.LOADING))
Pair(t1, t2)
}).flatMap {
goodsModel.selectBetterOneAsync(it.first, it.second)
}.observeOn(AndroidSchedulers.mainThread())
.subscribe {
view.displayBetterGoods(Resource(Resource.FINISH, it))
}
}
1
2
3
override fun cancel() {
compositeDisposable.dispose()
}

操作符

rxjava的操作符非常多,主要参见http://reactivex.io/documentation/operators.html

协程

协程(Coroutines)是一种比线程更加轻量级的存在,正如一个进程可以拥有多个线程一样,一个线程可以拥有多个协程。进程切换和线程切换是系统级的,发生在操作系统内部。协程切换是用户级的,切换时机是用户自己的程序决定的。

如下是用协程实现的一个版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
override fun requestServer() {
uiScope.launch {
val deferredIKEAGoods = goodsModel.getGoodsFromIKEAAsync()
val deferredCarrefourGoods = goodsModel.getGoodsFromCarrefourAsync()

view.displayIKEAGoods(Resource(Resource.LOADING))
view.displayCarrefourGoods(Resource(Resource.LOADING))

launch {
val goods = deferredIKEAGoods.await()
view.displayIKEAGoods(Resource(Resource.FINISH, goods))
}

launch {
val goods = deferredCarrefourGoods.await()
view.displayCarrefourGoods(Resource(Resource.FINISH, goods))
}

view.displayBetterGoods(Resource(Resource.WAITING))

val ikeaGoods = deferredIKEAGoods.await()
val carrefourGoods = deferredCarrefourGoods.await()

view.displayBetterGoods(Resource(Resource.LOADING))

val betterGoods = goodsModel.selectBetterOneAsync(supervisorJob, ikeaGoods, carrefourGoods).await()
view.displayBetterGoods(Resource(Resource.FINISH, betterGoods))
}
}
1
2
3
override fun cancel() {
supervisorJob.cancel()
}

协程相关类的介绍

什么是协程上下文? CoroutineContext包含了协程运行时的一些信息。根据文档里的说明,CoroutineContext 的概念主要有3点:

  1. It is an indexed set of [Element] instances. 它是一个包含 Element 实例的索引集;
  2. An indexed set is a mix between a set and a map. 索引集是 set 和 map 的混合结构;
  3. Every element in this set has a unique [Key]. 这个集合中的每个元素都有一个唯一的 Key;

说的通俗一点,CoroutineContext 就是一个集合 Collection,这个集合既有 set 的特性又有 map 的特性,集合里的元素都是 Element 类型的,每个 Element 类型的元素都有一个类型为 Key 的键。具体看它的源码时,发现它内部即没有数组也没有链表,它这个数据结构是采用函数式风格来实现的(left-biased list的context)。

context、job、拦截器的类关系如下图所示。

拦截器是什么?拦截器也是一个上下文的实现,拦截器可以左右你的协程的执行,同时为了保证它的功能的正确性,协程上下文集合永远将它放在最后面,这真可谓是天选之子了。

kotlin对拦截器的定义如下

1
2
3
4
5
6
public interface ContinuationInterceptor : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<ContinuationInterceptor>

public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
...
}

调度器是什么?它本身是协程上下文的子类,同时实现了拦截器的接口, dispatch 方法会在拦截器的方法 interceptContinuation 中调用,进而实现协程的调度。

1
2
3
4
5
6
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
...
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
...
}

举例说明调度器的创建

1
2
3
4
5
6
7
suspend fun main() {
val myDispatcher= Executors.newSingleThreadExecutor{ r -> Thread(r, "MyThread") }.asCoroutineDispatcher()
GlobalScope.launch(myDispatcher) {
// ...
}.join()
// ...
}

我们可以通过主动关闭线程池或者调用myDispatcher.close()来结束它的生命周期

GlobleScope和CoroutineScope的区别是什么?
runblock和launch的区别是什么?

Flow是协程中的一个流式处理的类,具体可参考

小结

  • CompletableFuture虽然早已被java8支持,但是只能在android N及更高版本才可以使用;
  • 使用thread做异步的业务逻辑处理,虽然操作的粒度很细,但是要照顾的细节太多,还要理清锁和同步互斥,开发效率并不会提高;
  • 若使用线程池,对于任务的依赖关系只能用简单的future的get来处理,实现任务依赖的能力过于单薄;
  • rxjava的代码量相对就少很多,可以轻松进行线程切换,若条件允许,建议使用rxjava;
  • kotlin通过扩展包的方式支持了协程,可是在android上java还不支持协程。在纯kotlin项目中,相对rxjava,协程更轻量级,异步任务处理能力也挺强;
  • 但是若是在java+kotlin混合项目中,协程则不能充分应用到项目工程所有地方(仅在kotlin代码中使用)。此时为了工程保持接口统一,也可能需要放弃使用协程了。

参考