前言
鸿蒙也支持Java语言开发,所以也可以使用RxJava,RxJava除了有众多操作符外,还有便捷的线程切换,例如子线程执行完耗时操作,在主线程中更新UI
Android中,与RxJava
配套的有RxAndroid
,提供了主线程Scheduler调度器,而鸿蒙也有主线程的概念,API也比较类似,按道理是可以按葫芦画瓢,实现一个RxHarmony
代码原理
要写一个RxRxHarmony库,必须先了解RxAndroid
,了解后,才能理解怎么实现。原理可以参考这篇文章,RxAndroid 源码分析
大概原理就是,RxAndroid使用Handler把异步任务发消息到主线程处理,实现线程切换。而RxHarmony则使用EventHandler发消息到主线程,原理基本一致
类结构
- RxHarmonyPlugins,工具类,提供一系列的static方法,外部可以调用进行配置,在特定时机,进行hook和处理
- HarmonySchedulers,线程调度器工厂
- EventHandlerScheduler,Harmony主线程调度器,内部通过EventHandler,发送消息到主线程,对要执行的异步任务进行处理,实现线程切换
- MainThreadDisposable,主线程Disposable实现类,一般配合RxBinding使用
RxHarmonyPlugins
/**
* 工具类,提供一系列的static方法,外部可以调用进行配置,在特定时机,进行hook和处理
*/
public final class RxHarmonyPlugins {
private static volatile Function<Callable<Scheduler>, Scheduler> onInitMainThreadHandler;
private static volatile Function<Scheduler, Scheduler> onMainThreadHandler;
/**
* 工具类,隐藏构造方法,如果被反射,抛出异常
*/
private RxHarmonyPlugins() {
throw new AssertionError("No instances.");
}
/**
* 初始化主线程调度器
*
* @param scheduler 默认的调度器,被Callable回调包裹
* @return 要被应用的主线程调度器
*/
public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
//判空
if (scheduler == null) {
throw new NullPointerException("scheduler == null");
}
//如果有通过setInitMainThreadSchedulerHandler()方法设置了,hook回调函数,则通过hook回调函数进行处理,再返回调度器
Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
if (f == null) {
//没有设置,判空后,再返回
return callRequireNonNull(scheduler);
}
//设置了,调用设置的回调函数,进行处理
return applyRequireNonNull(f, scheduler);
}
/**
* 设置主线程调度器hook回调函数
*/
public static void setMainThreadSchedulerHandler(Function<Scheduler, Scheduler> handler) {
onMainThreadHandler = handler;
}
/**
* 处理传入的Scheduler调度器
*/
public static Scheduler onMainThreadScheduler(Scheduler scheduler) {
if (scheduler == null) {
throw new NullPointerException("scheduler == null");
}
Function<Scheduler, Scheduler> f = onMainThreadHandler;
if (f == null) {
return scheduler;
}
return apply(f, scheduler);
}
/**
* 获取hook回调函数,可能为null
*/
public static Function<Callable<Scheduler>, Scheduler> getInitMainThreadSchedulerHandler() {
return onInitMainThreadHandler;
}
/**
* 设置hook回调函数,可以对设置的调度器进行处理,再返回
*/
public static void setInitMainThreadSchedulerHandler(Function<Callable<Scheduler>, Scheduler> handler) {
onInitMainThreadHandler = handler;
}
/**
* 返回配置hook回调函数
*
* @return 返回hook回调函数,可能为null
*/
public static Function<Scheduler, Scheduler> getOnMainThreadSchedulerHandler() {
return onMainThreadHandler;
}
/**
* 重置所有配置
*/
public static void reset() {
setInitMainThreadSchedulerHandler(null);
setMainThreadSchedulerHandler(null);
}
/**
* 判空获取的Scheduler调度器,非null,则返回,为null则抛异常
*/
static Scheduler callRequireNonNull(Callable<Scheduler> s) {
try {
Scheduler scheduler = s.call();
if (scheduler == null) {
throw new NullPointerException("Scheduler Callable returned null");
}
return scheduler;
} catch (Throwable ex) {
throw Exceptions.propagate(ex);
}
}
/**
* 调用传入的Function回调函数,对Scheduler进行处理
*
* @param f 回调函数
* @param s Scheduler调度器
* @return 要应用的调度器
*/
static Scheduler applyRequireNonNull(Function<Callable<Scheduler>, Scheduler> f, Callable<Scheduler> s) {
Scheduler scheduler = apply(f, s);
if (scheduler == null) {
throw new NullPointerException("Scheduler Callable returned null");
}
return scheduler;
}
/**
* 调用回调函数对目标对象进行处理
*/
static <T, R> R apply(Function<T, R> f, T t) {
try {
return f.apply(t);
} catch (Throwable ex) {
throw Exceptions.propagate(ex);
}
}
}
HarmonySchedulers
/**
* HarmonyOS,线程调度器工厂
*/
public final class HarmonySchedulers {
/**
* 主线程调度器
*/
private static final Scheduler MAIN_THREAD =
RxHarmonyPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
/**
* 单例,保存主线程调度器
*/
private static final class MainHolder {
static final Scheduler DEFAULT = new EventHandlerScheduler(new EventHandlerScheduler.WithIdEventHandler(EventRunner.current()));
}
/**
* 工具类,隐藏构造方法,如果被反射,抛出异常
*/
private HarmonySchedulers() {
throw new AssertionError("No instances.");
}
/**
* 获取主线程调度器
*/
public static Scheduler mainThread() {
return RxHarmonyPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/**
* 可以指定EventRunner,类似Android中的Looper轮训器,所以调度器可以绑定在非主线程中
*/
public static Scheduler from(EventRunner eventRunner) {
//判空
if (eventRunner == null) throw new NullPointerException("looper == null");
//根据配置,创建对应的主线程调度器
return new EventHandlerScheduler(new EventHandlerScheduler.WithIdEventHandler(eventRunner));
}
}
- EventHandlerScheduler
/**
* Harmony主线程调度器,内部通过EventHandler,发送消息到主线程,对要执行的异步任务进行处理,实现线程切换
*/
public class EventHandlerScheduler extends Scheduler {
private final WithIdEventHandler eventHandler;
/**
* 构造方法,保存传入的EventHandler实例
*/
EventHandlerScheduler(WithIdEventHandler eventHandler) {
this.eventHandler = eventHandler;
}
/**
* 调度方法
*
* @param run 要执行的异步任务
* @param delay 延时时间
* @param unit 时间单位
* @return 返回Disposable实例,被对任务进行取消
*/
@NonNull
@Override
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//判空
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
//执行hook回调函数,如果没有设置,会返回传入的Runnable实例
run = RxJavaPlugins.onSchedule(run);
//对异步任务进行包装,ScheduledRunnable实现了Runnable和Disposable接口,可以对异步任务进行取消
ScheduledRunnable scheduled = new ScheduledRunnable(eventHandler, run);
//获取一个InnerEvent消息
InnerEvent innerEvent = InnerEvent.get(scheduled);
//发送消息到主线程
eventHandler.sendEvent(innerEvent, unit.toMillis(delay));
//返回任务,外部可以对该任务进行取消
return scheduled;
}
/**
* 创建Worker实例,会创建HandlerWorker实例并返回
*/
@NonNull
@Override
public Worker createWorker() {
return new EventHandlerWorker(eventHandler);
}
/**
* Worker子类
*/
private static final class EventHandlerWorker extends Worker {
private final WithIdEventHandler handler;
/**
* 是否被切断标志位
*/
private volatile boolean disposed;
/**
* 参数生成器,用于在 dispose() 方法中,移除该Worker调度的任务
*/
private static final AtomicLong paramsCreator = new AtomicLong();
/**
* 参数
*/
private final long params;
EventHandlerWorker(WithIdEventHandler eventHandler) {
this.handler = eventHandler;
params = paramsCreator.incrementAndGet();
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
//判空
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
//如果被切断,直接返回
if (disposed) {
return Disposables.disposed();
}
//执行hook函数,如果没有设置,则会返回传入的Runnable对象
run = RxJavaPlugins.onSchedule(run);
//对异步任务进行包装
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
//获取一个InnerEvent消息
InnerEvent innerEvent = InnerEvent.get(scheduled);
//保存异步任务到事件中,在EventHandler的processEvent()回调时,进行处理
innerEvent.object = this;
//保存Worker的参数到消息,在dispose()中,移除消息
innerEvent.param = params;
//发送消息到主线程进行执行
handler.sendEvent(innerEvent, unit.toMillis(delay));
//再次检查是否被切断,如果被切断,则取消任务,直接返回
if (disposed) {
handler.removeTask(scheduled);
return Disposables.disposed();
}
return scheduled;
}
@Override
public void dispose() {
//被切断了,记录标志位,移除任务
disposed = true;
//通过 param 参数移除该 Worker 调度,单未执行的 InnerEvent
handler.removeEvent(handler.id, params);
}
@Override
public boolean isDisposed() {
//返回切断标志位
return disposed;
}
}
/**
* 对异步任务进行包装,实现Disposable接口,提供可取消任务的功能
*/
private static final class ScheduledRunnable implements Runnable, Disposable {
private final EventHandler handler;
private final Runnable delegate;
/**
* 切断标志
*/
private volatile boolean disposed;
ScheduledRunnable(EventHandler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
//执行异步任务
delegate.run();
} catch (Throwable t) {
//如果出现异常,交给hook回调函数处理
RxJavaPlugins.onError(t);
}
}
@Override
public void dispose() {
//被切断,移除任务
handler.removeTask(this);
disposed = true;
}
@Override
public boolean isDisposed() {
//返回是否被切断
return disposed;
}
}
/**
* EventHandler 带一个消息Id
*/
static class WithIdEventHandler extends EventHandler {
private static final AtomicInteger idCreator = new AtomicInteger();
/**
* 每个EventHandler都配置一个id,通过这个EventHandler发送的事件,它的eventId都统一为这个id
*/
private final int id;
public WithIdEventHandler(EventRunner runner) throws IllegalArgumentException {
super(runner);
//生成Id
id = idCreator.incrementAndGet();
}
@Override
protected void processEvent(InnerEvent event) {
super.processEvent(event);
//非当前Handler的事件不处理
if (event.eventId != id) {
return;
}
Object obj = event.object;
if (obj instanceof Runnable) {
((Runnable) obj).run();
}
}
}
}
MainThreadDisposable
public abstract class MainThreadDisposable implements Disposable {
//工具方法,可以检查是否是主线程,非主线程会抛出一个IllegalStateException异常
public static void verifyMainThread() {
if (EventRunner.current() != EventRunner.getMainEventRunner()) {
throw new IllegalStateException(
"Expected to be called on the main thread but was " + Thread.currentThread().getName());
}
}
//原子变量,保证多线程安全
private final AtomicBoolean unsubscribed = new AtomicBoolean();
@Override
public final boolean isDisposed() {
//返回是否切换
return unsubscribed.get();
}
@Override
public final void dispose() {
//被切断,确保只能切断一次,原子标志只能设置一次,从false变true
if (unsubscribed.compareAndSet(false, true)) {
//主线程,直接切断,非主线程则通过主线程调度器发送任务进行切断
if (EventRunner.current() == EventRunner.getMainEventRunner()) {
onDispose();
} else {
//非主线程,通过调度器,在主线程中切断
HarmonySchedulers.mainThread().scheduleDirect(this::onDispose);
}
}
}
protected abstract void onDispose();
}