书接上回,我们聊了为什么要有异步编程模式以及异步编程的魅力。
本文主要聊一聊异步编程中的promise模式;promise模式广泛应用于前端开发
,所以故事先从前端开发讲起。
因为前端的ajax技术,与服务端通信皆是基于异步编程,需要处理各种回调;
回调本身并不可怕,但是如果需要对多个回调结果进行compose
, 那就比较麻烦了,会出现所谓的回调地狱;在异步编程方面,前端同学先痛起来了,所以,在这个方向上,前端同学确实走的也比较靠前,模式也比较成熟。
那么对于java而言,先不谈协程
的玩法,异步编程可选的模式就不多了,其中一个便是promise模式,以及接下来会进行介绍的reactor模式。
接下来将按照案例对promise模式进行安利。
(特别提示,本系列文章皆是采用kotlin编写,写过kotlin之后,真的很难切回java,由俭入奢易,由奢入俭难啊)
案例一
有一远程服务名曰“计算器”,提供加法和减法两个http接口
- 计算 n + m :/add?a=n&b=m
- 计算 n - m :/sub?a=n&b=m
基于上面的计算器服务,需要在我们的服务中开发一个接口,计算m + n - l
, 采用异步代码该如何写呢?此处基于vert.x
进行示范
override fun start(startPromise: Promise<Void>?) {
webClient = WebClient.create(vertx)
var eventBus = vertx.eventBus()
eventBus.consumer<JsonObject>("calc.promise"){ msg ->
var msgBody = msg.body()
var m = msgBody.getInteger("m", 0)
var n = msgBody.getInteger("n", 0)
var l = msgBody.getInteger("l", 0)
webClient.get(7777, "pi", "/add?a=$m&b=$n").send{
if (it.succeeded()) {
var `m + n` = it.result().bodyAsString().toInt()
// 此处是vertx的webClient,是一个异步的http client
// 7777是端口,pi是host(在下部署在树莓派上做的测试),后面是uri
webClient.get(7777, "pi", "/sub?a=$`m + n`&b=$l").send{
if (it.succeeded()) {
var `m + n - l` = it.result().bodyAsString().toInt()
replyHandler(`m + n - l`.toString())
} else {
replyHandler("calc error: m + n - l")
}
}
} else {
replyHandler("calc error: m + n")
}
}
}
}
上面是一段基于callback的写法,略微有一些回调地狱
的迷之缩进
那么如果采用promise模式,在本案例的场景下,是可以缓解回调地狱
的迷之缩进
,大家感受一下(此处同样也是使用了vert.x里的promise类):
override fun start(startPromise: Promise<Void>?) {
webClient = WebClient.create(vertx)
var eventBus = vertx.eventBus()
eventBus.consumer<JsonObject>("calc.promise"){ msg ->
var msgBody = msg.body()
var m = msgBody.getInteger("a", 0)
var n = msgBody.getInteger("b", 0)
var l = msgBody.getInteger("c", 0)
add(m, n).future().onSuccess {
sub(it, l).future().onSuccess {
msg.reply(it.toString())
}
}
}
}
fun add(a: Int, b: Int) : Promise<Int> {
var promise = Promise.promise<Int>()
webClient.get(7777, "pi", "/add?a=$a&b=$b").send{
if (it.succeeded()) {
var addResult = it.result().bodyAsString().toInt()
promise.complete(addResult)
} else {
promise.fail("calc failed $a add $b")
}
}
return promise
}
fun sub(a: Int, b: Int) : Promise<Int> {
var promise = Promise.promise<Int>()
webClient.get(7777, "pi", "/sub?a=$a&b=$b").send{
if (it.succeeded()) {
var addResult = it.result().bodyAsString().toInt()
promise.complete(addResult)
} else {
promise.fail("calc failed $a sub $b")
}
}
return promise
}
上面这种写法是基于promise模式的,虽然有所缓解,但主要功效还是在把一些代码抽离了,核心的计算逻辑,依然还是迷之缩进
的,莫着急,后面还有内容。
此处先仔细看下这两段代码,vertx原生提供的io.vertx.core.Promise
类,就是用来做promise模式的, 那么promise相比于callback,改变到底是啥?经过日夜揣摩,在下得出结论:
当调用一个异步方法时,promise 模式可以把针对该方法的执行结果的处理逻辑从入参(回调函数),变为返回值;这样带来的编程变化是:基于返回值的处理,更方便的支持链式写法,把缩进改为一条链
这个话题先聊到这里,稍微找一找promise的感觉,咱们接着案例往下聊
案例二
基于计算器服务,实现一个接口: a + ((b -c)+ d) -e -f + g
先看看基于回调怎么写:
//源码地址:https://github.com/HongkaiWen/vertx-kotlin/blob/promise/src/main/kotlin/com/github/hongkaiwen/reactor/vk/calc/CallbackVerticle.kt
class CallbackVerticle : AbstractVerticle(){
lateinit var webClient: WebClient
override fun start(startPromise: Promise<Void>?) {
webClient = WebClient.create(vertx)
var eventBus = vertx.eventBus()
eventBus.consumer<JsonObject>("calc.callback"){
var msgBody = it.body()
var a = msgBody.getInteger("a", 0)
var b = msgBody.getInteger("b", 0)
var c = msgBody.getInteger("c", 0)
var d = msgBody.getInteger("d", 0)
var e = msgBody.getInteger("e", 0)
var f = msgBody.getInteger("f", 0)
var g = msgBody.getInteger("g", 0)
calc(a, b, c, d, e, f, g){reply ->
it.reply(reply)
}
}
}
fun calc(a: Int, b: Int, c: Int, d: Int, e: Int, f: Int, g: Int, replyHandler: (String) -> Unit){
webClient.get(7777, "pi", "/sub?a=$b&b=$c").send{
if (it.succeeded()) {
var `b-c` = it.result().bodyAsString().toInt()
webClient.get(7777, "pi", "/add?a=$`b-c`&b=$d").send{
if (it.succeeded()) {
var `(b-c)+d` = it.result().bodyAsString().toInt()
webClient.get(7777, "pi", "/add?a=$a&b=$`(b-c)+d`").send{
if (it.succeeded()) {
var `a+(b-c)+d` = it.result().bodyAsString().toInt()
webClient.get(7777, "pi", "/sub?a=$`a+(b-c)+d`&b=$e").send{
if (it.succeeded()) {
var `a+(b-c)+d-e` = it.result().bodyAsString().toInt()
webClient.get(7777, "pi", "/sub?a=$`a+(b-c)+d-e`&b=$f").send{
if (it.succeeded()) {
var `a+(b-c)+d-e-f` = it.result().bodyAsString().toInt()
webClient.get(7777, "pi", "/add?a=$`a+(b-c)+d-e-f`&b=$g").send{
if (it.succeeded()) {
var `a+(b-c)+d-e-f+g` = it.result().bodyAsString().toInt()
replyHandler(`a+(b-c)+d-e-f+g`.toString())
} else {
replyHandler("calc error: a + (b - c) + d -e -f + g")
}
}
} else {
replyHandler("calc error: a + (b - c) + d -e -f")
}
}
} else {
replyHandler("calc error: a + (b - c) + d -e")
}
}
} else {
replyHandler("calc error: a + (b - c) + d")
}
}
} else {
replyHandler("calc error: (b - c) + d")
}
}
} else {
replyHandler("calc error: b - c")
}
}
}
}
上面这段代码,实在让人吐血,终于知道啥叫地狱
了, 针对上面的方法,基于promise来做改进,效果如何呢:
//源码地址:https://github.com/HongkaiWen/vertx-kotlin/blob/promise/src/main/kotlin/com/github/hongkaiwen/reactor/vk/calc/PromiseVerticle.kt
class PromiseVerticle : AbstractVerticle(){
lateinit var webClient: WebClient
override fun start(startPromise: Promise<Void>?) {
webClient = WebClient.create(vertx)
var eventBus = vertx.eventBus()
eventBus.consumer<JsonObject>("calc.promise"){ msg ->
var msgBody = msg.body()
var a = msgBody.getInteger("a", 0)
var b = msgBody.getInteger("b", 0)
var c = msgBody.getInteger("c", 0)
var d = msgBody.getInteger("d", 0)
var e = msgBody.getInteger("e", 0)
var f = msgBody.getInteger("f", 0)
var g = msgBody.getInteger("g", 0)
sub(b, c).future().onSuccess {
add(it, d).future().onSuccess {
add(a, it).future().onSuccess {
sub(it, e).future().onSuccess {
sub(it, f).future().onSuccess {
add(it, g).future().onSuccess {
msg.reply(it.toString())
}
}
}
}
}
}
}
}
fun add(a: Int, b: Int) : Promise<Int> {
var promise = Promise.promise<Int>()
webClient.get(7777, "pi", "/add?a=$a&b=$b").send{
if (it.succeeded()) {
var addResult = it.result().bodyAsString().toInt()
promise.complete(addResult)
} else {
promise.fail("calc failed $a add $b")
}
}
return promise
}
fun sub(a: Int, b: Int) : Promise<Int> {
var promise = Promise.promise<Int>()
webClient.get(7777, "pi", "/sub?a=$a&b=$b").send{
if (it.succeeded()) {
var addResult = it.result().bodyAsString().toInt()
promise.complete(addResult)
} else {
promise.fail("calc failed $a sub $b")
}
}
return promise
}
}
大地狱和小地狱的区别而已,因此就在思忖,貌似使用姿势不对,基于promise的代码依然是迷之缩进
的,除非我们可以把promise的处理逻辑做成链式的
JavaScript的promise
感觉上面的路子还是不对,所以我们来参考下前端同学是如何玩promise的;因为在下对JavaScript不感兴趣,所以不求甚解的从大神的博客 摘取一段,如下:
function multiply(input) {
return new Promise(function (resolve, reject) {
log('calculating ' + input + ' x ' + input + '...');
setTimeout(resolve, 500, input * input);
});
}
// 0.5秒后返回input+input的计算结果:
function add(input) {
return new Promise(function (resolve, reject) {
log('calculating ' + input + ' + ' + input + '...');
setTimeout(resolve, 500, input + input);
});
}
var p = new Promise(function (resolve, reject) {
log('start new Promise...');
resolve(123);
});
p.then(multiply)
.then(add)
.then(multiply)
.then(add)
.then(function (result) {
log('Got value: ' + result);
});
发现JavaScript里的promise是链式的(就应该这样嘛),那么上面的例子里的姿势一定是错了,问题在哪呢?
JavaScript里的用法,promise是通过then函数进行顺序链式条用的,那么这个then函数,输入是一个处理函数,返回的是一个新的promise才行,这样可以针对一环一环的处理作出管道的效果来。
因为我采用的vertx的promise来做研究,这个类本身处理promise的结果的方法 onSuccess
返回的是future本身,并不是一个新的promise的future,所以不好做链式调用
:
@Fluent
default Future<T> onSuccess(Handler<T> handler) {
return onComplete(ar -> {
if (ar.succeeded()) {
handler.handle(ar.result());
}
});
}
改进方案
仔细看了一圈io.vertx.core.Promise
的相关方法, 发现compose
正是我们的解药。
class PromiseLineVerticle3 : AbstractVerticle(){
lateinit var webClient: WebClient
override fun start(startPromise: Promise<Void>?) {
webClient = WebClient.create(vertx)
var eventBus = vertx.eventBus()
eventBus.consumer<JsonObject>("calc.promise.line3"){ msg ->
var msgBody = msg.body()
var a = msgBody.getInteger("a", 0)
var b = msgBody.getInteger("b", 0)
var c = msgBody.getInteger("c", 0)
var d = msgBody.getInteger("d", 0)
var e = msgBody.getInteger("e", 0)
var f = msgBody.getInteger("f", 0)
var g = msgBody.getInteger("g", 0)
//只要一个onFailure即可
(b asyncSub c)
.compose { it asyncAdd d }
.compose { it asyncAdd a }
.compose { it asyncSub e }
.compose { it asyncSub f }
.compose { it asyncAdd g }
.onSuccess { msg.reply(it.toString()) }
.onFailure{msg.fail(500, it.message)}
}
}
infix fun Int.asyncAdd(input : Int) : Future<Int> {
return calc(this, input, CalcOperator.add)
}
infix fun Int.asyncSub(input : Int) : Future<Int> {
return calc(this, input, CalcOperator.sub)
}
/**
* 所有异常必须被处理
*/
fun calc(a: Int, b: Int, operator: CalcOperator) : Future<Int> {
var promise = Promise.promise<Int>()
webClient.get(7777, "pi", "/${operator.name}?a=$a&b=$b").send{
if (it.succeeded()) {
try{
var addResult = it.result().bodyAsString().toInt()
promise.complete(addResult)
println("$a - $b = $addResult")
} catch (e: Exception) {
promise.fail(e)
}
} else {
it.cause().printStackTrace()
promise.fail("calc failed $a add $b")
}
}
return promise.future()
}
}
enum class CalcOperator {
add, sub
}
通过compose
函数代码清爽了许多。
特别要提一下的是,上面的写法it asyncAdd d
是kotlin的中缀表达式
,另外这个地方的it,java程序员一定会问怎么没有声明过就使用?对的,kotlin中的lambda如果只有一个参数,就可以默认用it代替,写法简单而且避免不知道该其起啥变量名。
基于CompletableFuture实现promise
因为笔者最近vert.x
用的多,各种例子都是基于vert.x的;但其实如果把java和promise一起做搜索的话,得到的结果一般可能是jdk的CompletableFuture实现;而且vert.x
社区也有roadmap准备向jdk的CompletionStage
靠拢(RFC),所以再写一个CompletableFuture的实现版本:
class PromiseLineVerticle4 : AbstractVerticle(){
lateinit var webClient: WebClient
override fun start(startPromise: Promise<Void>?) {
webClient = WebClient.create(vertx)
var eventBus = vertx.eventBus()
eventBus.consumer<JsonObject>("calc.promise.line4"){ msg ->
var msgBody = msg.body()
var a = msgBody.getInteger("a", 0)
var b = msgBody.getInteger("b", 0)
var c = msgBody.getInteger("c", 0)
var d = msgBody.getInteger("d", 0)
var e = msgBody.getInteger("e", 0)
var f = msgBody.getInteger("f", 0)
var g = msgBody.getInteger("g", 0)
(b asyncSub c)
.thenCompose { it asyncAdd d }
.thenCompose { it asyncAdd a }
.thenCompose { it asyncSub e }
.thenCompose { it asyncSub f }
.thenCompose { it asyncAdd g }
.thenAccept { msg.reply(it.toString()) }
.exceptionally {
msg.fail(500, it.message)
null
}
}
}
infix fun Int.asyncAdd(input : Int) : CompletableFuture<Int> {
return calc(this, input, CalcOperator.add)
}
infix fun Int.asyncSub(input : Int) : CompletableFuture<Int> {
return calc(this, input, CalcOperator.sub)
}
/**
* 所有异常必须被处理
*/
fun calc(a: Int, b: Int, operator: CalcOperator) : CompletableFuture<Int> {
var promise = CompletableFuture<Int>()
webClient.get(7777, "pi", "/${operator.name}?a=$a&b=$b")
.expect(ResponsePredicate.SC_OK).send{
if (it.succeeded()) {
try{
var addResult = it.result().bodyAsString().toInt()
promise.complete(addResult)
println("$a - $b = $addResult")
} catch (e: Exception) {
promise.completeExceptionally(e)
}
} else {
it.cause().printStackTrace()
promise.completeExceptionally(RuntimeException("calc failed $a add $b"))
}
}
return promise
}
}
其实无论是vert.x的promise还是jdk的CompletableFuture, 都是promise模式,写发生也比较类似,代码整洁度上也相似,当然笔者很乐于看到java生态圈能够对此形成标准,就和JS的ES6统一定义了前端的promise一样,免得很多口水。
实践上升理论
基于上面的实践,相信对promise是有一点感觉的了。那么需要实践上升理论了,理论参见:
https://en.wikipedia.org/wiki/Futures_and_promises
理论说明相关文章很多,笔者理解主要是状态机的维护,笔者就不再献丑了,后续如果得空在写一篇vert.x的promise源码分析的文章。
promise是异步编程的一个模式,在某些场景下可以解决回调地狱
的问题。
接下来的一篇文章讲来聊聊reactor
模式。
系列文章快速导航:
异步编程一:异步编程的魅力
异步编程二:promise模式
异步编程三:reactor模式
异步编程四:协程