STOMP是一个简单的可互操作的协议, 被用于通过中间服务器在客户端之间进行异步消息传递。它定义了一种在客户端与服务端进行消息传递的文本格式.
个人github地址:https://github.com/VincentStory/StompForAndroid
引用github地址:https://github.com/NaikSoftware/StompProtocolAndroid
1.引入类库
dependencies {
api "com.github.NaikSoftware:StompProtocolAndroid:1.6.4"
api "io.reactivex.rxjava2:rxjava:2.2.5"
}
2.初始化stompClient对象 并监听连接状态
stompClient = Stomp.over(OKHTTP, Const.address);
stompClient.lifecycle().subscribe(lifecycleEvent -> {
//关注lifecycleEvent的回调来决定是否重连
switch (lifecycleEvent.getType()) {
case OPENED:
Log.d(Const.TAG, "forlan debug stomp connection opened");
break;
case ERROR:
Log.e(Const.TAG, "forlan debug stomp connection error is ", lifecycleEvent.getException());
break;
case CLOSED:
Log.d(Const.TAG, "forlan debug stomp connection closed");
break;
}
});
3.发起连接 headers信息根据具体情况填入
ArrayList<StompHeader> headers = new ArrayList<>();
headers.add(new StompHeader("userId", "103"));
//这里必须添加headers 否则会报错 headers可以添加用户的认证相关信息
stompClient.connect(headers);
4.发起订阅信息
stompClient.topic(Const.broadcastResponse)
.subscribe(new DisposableSubscriber<StompMessage>() {
@Override
public void onNext(StompMessage stompMessage) {
Log.d(Const.TAG, "Received== " + stompMessage.toString());
Log.i(Const.TAG, "Receive: " + stompMessage.getPayload());
runOnUiThread(() -> {
try {
JSONObject jsonObject = new JSONObject(stompMessage.getPayload());
resultText.append(jsonObject.getString("response") + "\n");
} catch (JSONException e) {
e.printStackTrace();
}
});
}
@Override
public void onError(Throwable t) {
Log.e(Const.TAG, "Stomp topic error", t);
}
@Override
public void onComplete() {
Log.e(Const.TAG, "Stomp connection onComplete");
}
});
5.发送消息 由相对应的订阅渠道返回信息
stompClient.send(Const.broadcast).subscribe();
- 如果需要取消订阅,可以通过这种方式
private CompositeDisposable compositeDisposable;
private void resetSubscriptions() {
if (compositeDisposable != null) {
compositeDisposable.dispose();
}
compositeDisposable = new CompositeDisposable();
}
public void topicData() {
resetSubscriptions();
Disposable dispTopic = stompClient.topic(address)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(stompMessage -> {
Log.i(TAG, stompMessage.getPayload());
}, throwable -> {
Log.e(TAG, "Error on subscribe topic", throwable);
});
compositeDisposable.add(dispTopic);
}
//取消订阅
public void unSubcribe() {
compositeDisposable.dispose();
}
7.设置全局异常监听(如果不设置,连接出现异常时会出现The exception was not handled due to missing onError , java.lang.IllegalStateException: Not connected 并闪退)
RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
L.i("throwable==", throwable.getMessage());
}
});
评论区有这么久以来整理的来自各位同行遇到的一些问题,以及我自己的一些回复,希望对其他遇到同类问题的朋友有一些帮助。