在开发一个rest应用的时候,一种很常见的情形是,要对服务器进行轮询和重试。当服务器正在进行某些任务的时候,我们需要(以一定的延时)查询服务器这个任务是否完成了,同时,当我们出错的时候,有时我们要进行重试。当我在想着如何利用RxJava来正确地实现服务器轮询的时候,我就想写一篇关于这个话题的文章了。最终,我在这个StackOverflow问题中找到了一个很好的解决方案。
在这篇文章中,我会解释用RxJava和Retrofit来实现这个功能是多么的容易。我假定你已经了解了RxJava,Retrofit的使用并且已经能够利用这些库来实现相应的应用架构。
在文章中我将用到的一些定义:
“Predicate” 一个被传到Observable的一些方法中的类,举例来说:Observable.filter(/在这里传进来predicate/), Observable.takeUntil(/在这里传进来predicate/)
“Child of Observable” 一个被链接(chained)在父Observable后的Observer。例如:
Observable
.filter(/*predicate here*/)
.takeUntil(/*predicate here*/)
.subscribe(/*subscriber here/)
takeUntil()返回的Observable是filter()返回的Observable的子元素(child)。作为参数传递给subscribe()的Subscriber是takeUntil()返回的Observable的子元素(child)。
所谓服务器轮询,也就是,当你需要等待服务器去完成某项任务时,你就要周期性地调用API接口来查询该项任务是否已经完成。
示例代码如下
/**
* 这个类用来映射(map)从服务器返回的json数据
*
*/
class ServerPollingResponse {
boolean isJobDone;
@Override
public String toString() {
return "isJobDone=" + isJobDone;
}
}
Subscription checkJobSubscription = mDataManager.pollServer(inputData)
.repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
Log.v(TAG, "repeatWhen, call");
/**
* 这个方法只会被调用一次。
* 5 表示每次重复的调用(repeated call)会被延迟5s。
*/
return observable.delay(5, TimeUnit.SECONDS);
}
})
.takeUntil(new Func1<ServerPollingResponse, Boolean>() {
@Override
public Boolean call(ServerPollingResponse response) {
/** 在这里,我们可以检查服务器返回的数据是否正确,和决定我们是否应该
* 停止轮询。
* 当服务器的任务完成时,我们停止轮询。
* 换句话说,“当任务(job)完成时,我们不拿(take)了”
*/
Log.v(TAG, "takeUntil, call response " + response);
return response.isJobDone;
}
})
.filter(new Func1<ServerPollingResponse, Boolean>() {
@Override
public Boolean call(ServerPollingResponse response) {
/**
* 如果我们在这里返回“false”的话,那这个结果会被过滤掉(filter)
* 过滤(Filtering) 表示 onNext() 不会被调用.
* 但是 onComplete() 仍然会被传递.
*/
Log.v(TAG, "filter, call response " + response);
return response.isJobDone;
}
})
.subscribe(
new Subscriber<ServerPollingResponse>() {
@Override
public void onCompleted() {
Log.v(TAG, "onCompleted ");
}
@Override
public void onError(Throwable e) {
Log.v(TAG, "onError ");
}
@Override
public void onNext(ServerPollingResponse response) {
Log.v(TAG, "onNext response " + response);
//服务器轮询停止了,你可以做些其他事情。
}
}
);
代码看起来很多,但是很容易理解,并且利用了优雅的链式操作符。
假如服务器在三次请求后才返回 “isJobDone=true” , log打印如下:
repeatWhen, call
//在这里发起了api请求
filter, call response isJobDone=false
takeUntil, call response isJobDone=false
//在这里再次发起了api请求
filter, call response isJobDone=false
takeUntil, call response isJobDone=false
//在这里,第三次发起api请求
filter, call response isJobDone=true
onNext response isJobDone=true
takeUntil, call response isJobDone=true
onCompleted
在下一个部分中,我将解释为什么方法会那样被调用
发起http请求后,在所有call()方法中, filter()的predicate中的call()方法会第一个被调用。
如果我们在filter() 中返回“false”,表明我们对结果(result)不满意,不要把这个结果传给 Subscriber
我们返回“false”,表明这个结果(result)不会被传递到 filter()的子元素(child)Subscriber
对onNext()的调用会被传递到 takeUntil()的Observable中,然后它的predicate的call() 方法会被调用
我们看到“job 还没有被完成”,所以我们在takeUntil()的call()方法中返回“false”
这表示repeatWhen()的onNext()和onComplete()会被调用
当onComplete() 被调用时,他会触发一个延时5s的对原始Observable的重新订阅(resubscriber)。也就是会再次发起http请求,也是我们要实现的目的。
如果我们在filter()中返回“true”,表明我们对这个结果(result)是满意的,链式事件如下:
结果(result)被传递到了filter的子元素(child)Subscriber
然后这个结果(result)被传递到了takeUntil()的predicate的call()方法中
在 takeUntil() 中,我们也返回 “true”,因为“任务完成了(job done)”
因为我们返回了“true”,takeUntil()操作符会调用它的子元素(child)filter()的onComplete() .
filter() 调用它的子元素Subscriber
takeUntil()会马上取消订阅(unsubscribe),这也是为什么repeatWhen()的Observable的onNext()或者onComplete()不会被调用的原因,http请求也就不会被再次发起。
整个链是由takeUntil()操作符调用内部的unsubscribe()来终止的。
基本原理是一样的。我们只要向repeatWhen()的predicate中增加一些链式方法(chaning method)就行了。
private static final int COUNTER_START = 1;
private static final int ATTEMPTS = 5;
private static final int ORIGINAL_DELAY_IN_SECONDS = 10;
// 这是链接在repeatWhen的predicate的call方法中的新的function
repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call (Observable < ?extends Void > observable){
Log.v(TAG, "repeatWhen, call");
return observable.zipWith(Observable.range(COUNTER_START, ATTEMPTS), new Func2<Void, Integer, Integer>() {
@Override
public Integer call(Void aVoid, Integer attempt) {
Log.v(TAG, "zipWith, call, attempt " + attempt);
return attempt;
}
}).flatMap(new Func1<Integer, Observable<?>>() {
@Override
public Observable<?> call(Integer repeatAttempt) {
Log.v(TAG, "flatMap, call, repeatAttempt " + repeatAttempt);
// 增加等待时间
return Observable.timer(repeatAttempt * ORIGINAL_DELAY_IN_SECONDS, TimeUnit.SECONDS);
}
});
}
})
在这里,每发起一次api请求前的延时时间是随着尝试次数的增长而乘法式地增加的。非常简单高效。
打印信息如下
repeatWhen, call
//这里是我们的第一次API请求
filter, call response isJobDone=false
takeUntil, response isJobDone=false
zipWith, call, attempt 1
flatMap, call, repeatAttempt 1
//等待10s后发起第二次请求
filter, call response isJobDone=false
takeUntil, response isJobDone=false
zipWith, call, attempt 2
flatMap, call, repeatAttempt 2
//等待20s后发起第三次请求
filter, call response isJobDone=true
onNext response isJobDone=true
takeUntil, response isJobDone=true
onCompleted
解释如下
我会略过前面讲过的东西,直接解释一下与zipWith() 和 flatMap()相关的东西。
当takeUntil() 完成它的工作以后,从repeatWhen() 返回的Observable会开始工作。这个Observable是zipWith() 和 flatMap()一起作用的结果(combined result)。
zipWith(parameter1, parameter2)会拿到repeatWhen()里的Observable所发射的值,也就是Void aVoid,还会拿到由它的第一个参数,也就是Observable.range(COUNTER_START, ATTEMPTS) 所发射的值,然后将这两个值传递给函数(function) call(Void, Integer)。在call() 方法中,我们可以利用这两个参数做一些操作,然后返回一个值(虽然在我们的例子中,是一个Integer,但是它也可以是其它任何类型,如果我们想返回其它类型的话,只需要改一下new Func2 < Void, Integer, / 改这个 / Integer>) )中的第三个泛型类型就行了),但是在这里,我们只要返回我们从Observable中获取的值就行了,这个值就是重复尝试的次数。
zipWith()中返回的值会被封装到一个发射(emit)这些值的Observable中。然后我们在 flatMap()中处理这些值。
flatMap()拿到这个值,并利用它们来生成一个计时器Observable(timer Observable)。计时器Observable(timer Observable) 会先等待指定的时间,然后交给链的后续部分来继续处理(passes control down the chain),也就是原来发起api请求的Observable。
我们可以忽略掉zipWith()而直接在repeatWhen()中使用 flatMap()
repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
Log.v(TAG, "repeatWhen, call");
return observable.flatMap(new Func1<Void, Observable<?>>() {
@Override
public Observable<?> call(Void aVoid) {
if(mCounter > ATTEMPTS){
// 由我们自己终止
throw new RuntimeException();
}
return Observable.timer(mCounter++ * ORIGINAL_DELAY_IN_SECONDS, TimeUnit.SECONDS);
}
});
}
})
一般这种情况,我们需要用一个计数器,自己去控制请求(attempt),并且在需要时终止这一系列操作( terminate the sequence )。而在这里,我们利用了zipWith()操作符,让RxJava帮我们做了这一切。
众所周知,在Retrofit1 中每个网络错误都是交由onError()方法处理的
为了在失去网络连接或者当返回的http状态是除了200 OK以外的不正常状态时实现重试,我们需要使用 retryWhen() 而不是repeatWhen()。同时,zipWith() 的参数也要做一点变化。
retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
Log.v(TAG, "retryWhen, call");
return observable.zipWith(Observable.range(COUNTER_START, ATTEMPTS), new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer attempt) {
Log.v(TAG, "zipWith, call, attempt " + attempt);
return attempt;
}
})
repeatWhen() 和 retryWhen()的主要区别就是repeatWhen()会在接收到 onNext()时重新subscribe,retryWhen()则在接收到 onError()时重新subscribe。
下面是相关的代码
retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
Log.v(TAG, "retryWhen, call");
return observable.compose(zipWithFlatMap());
}
}).repeatWhen(new Func1<Observable<? extends Void>, Observable<?>>() {
@Override
public Observable<?> call(Observable<? extends Void> observable) {
Log.v(TAG, "repeatWhen, call");
return observable.compose(zipWithFlatMap());
}
})
<T> Observable.Transformer<T, Long> zipWithFlatMap() {
return new Observable.Transformer<T, Long>() {
@Override
public Observable<Long> call(Observable<T> observable) {
return observable.zipWith(Observable.range(COUNTER_START, ATTEMPTS), new Func2<T, Integer, Integer>() {
@Override
public Integer call(T t, Integer repeatAttempt) {
Log.v(TAG, "zipWith, call, repeatAttempt " + repeatAttempt);
return repeatAttempt;
}
}).flatMap(new Func1<Integer, Observable<Long>>() {
@Override
public Observable<Long> call(Integer repeatAttempt) {
Log.v(TAG, "flatMap, call, repeatAttempt " + repeatAttempt);
//增加等待时间
return Observable.timer(repeatAttempt * 5, TimeUnit.SECONDS);
}
});
}
};
}
你可能注意到了,我将zipWith()和flatMap() 封装到了单独的方法中,并且利用compose让它可以在repeatWhen()和retryWhen()里能够重复使用。现在,如果我们请求失败了,我们会去重试(retry),如果成功了,但是“任务(job)还没完成”,我们会重复一遍(repeat)。
RxJavas实在是太好用了!
为它喝彩吧 :)
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。