官网
这货是什么,这货能做消息推送,和其他手机端push一样有自己的协议体和算法讲究
首先看下例子使用:
public void testPubSub() throws Exception {
String methodName = Utility.getMethodName();
LoggingUtilities.banner(log, cclass, methodName);
IMqttClient client = null;
try {
String topicStr = "topic" + "_02";
String clientId = methodName;
client = clientFactory.createMqttClient(serverURI, clientId);
log.info("Assigning callback...");
MessageListener listener = new MessageListener();
client.setCallback(listener);
log.info("Connecting...(serverURI:" + serverURI + ", ClientId:" + clientId);
client.connect();
log.info("Subscribing to..." + topicStr);
client.subscribe(topicStr);
log.info("Publishing to..." + topicStr);
MqttTopic topic = client.getTopic(topicStr);
MqttMessage message = new MqttMessage("foo".getBytes());
topic.publish(message);
log.info("Checking msg");
MqttMessage msg = listener.getNextMessage();
Assert.assertNotNull(msg);
Assert.assertEquals("foo", msg.toString());
log.info("getTopic name");
String topicName = topic.getName();
log.info("topicName = " + topicName);
Assert.assertEquals(topicName, topicStr);
log.info("Disconnecting...");
client.disconnect();
}
finally {
if (client != null) {
log.info("Close...");
client.close();
}
}
}
然后看张图:
先介绍外部接口IMqttClient和MqttMessage消息质量,你所有的请求和接收都是从这个接口实现的。
Mqtt协议起初是IBM提出的,关于clinet端,有很多包,Eclipse只是其中之一
public interface IMqttClient { //extends IMqttAsyncClient {
public void connect(MqttConnectOptions options) throws MqttSecurityException, MqttException;
public IMqttToken connectWithResult(MqttConnectOptions options) throws MqttSecurityException, MqttException;
public void disconnect(long quiesceTimeout) throws MqttException;
// Mqtt基于Tcp协议,在关闭时候,有4次握手机会,为了最后一次不会超时,造成server资源泄露,强制关闭
public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) throws MqttException;
public void subscribe(String[] topicFilters, int[] qos, IMqttMessageListener[] messageListeners) throws MqttException; // 订阅主题,有自己的长度和规则限制
public void unsubscribe(String topicFilter) throws MqttException;
public void publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException; // 发布消息,有QOS3种质量
public MqttTopic getTopic(String topic);
public boolean isConnected();
public String getClientId();
public String getServerURI();
public IMqttDeliveryToken[] getPendingDeliveryTokens();// 每一个Message都有个Token,用于追踪,cleanSession会清空前面状态,推荐使用false
public void setManualAcks(boolean manualAcks); // 为true表示收到ack需要自己手动通知messageArrived
public void messageArrivedComplete(int messageId, int qos) throws MqttException; // 接收到所有信息回掉的完成
public void close() throws MqttException;
}
质量有3种
Qos 0 : 这个消息最多发送一次,不能被持久化到磁盘,不能通过网络被传递,一般内部消息转换
Qos 1 : 这个消息至少发送一次,能被重传,能持久化,能通过网络传递,需要实现MqttConnectOptions中的持久化,否则,挂了以后,不能重传。
Qos 2:这个消息精准只发一次,能持久化,能通过网络传递,客户端和服务器都会收到消息确认
上面接口方法的用途,基本你都能猜出来,我们接着看实现类
public class MqttClient implements IMqttClient { //), DestinationProvider {
//private static final String CLASS_NAME = MqttClient.class.getName();
//private static final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT,CLASS_NAME);
protected MqttAsyncClient aClient = null; // Delegate implementation to MqttAsyncClient
protected long timeToWait = -1; // How long each method should wait for action to complete
/*
* (non-Javadoc)
*
* @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#disconnectForcibly(long, long)
*/
public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) throws MqttException {
aClient.disconnectForcibly(quiesceTimeout, disconnectTimeout);
}
/*
* @see IMqttClient#subscribe(String[], int[])
*/
public void subscribe(String[] topicFilters, int[] qos) throws MqttException {
IMqttToken tok = aClient.subscribe(topicFilters, qos, null, null);
tok.waitForCompletion(getTimeToWait());
int[] grantedQos = tok.getGrantedQos();
for (int i = 0; i < grantedQos.length; ++i) {
qos[i] = grantedQos[i];
}
if (grantedQos.length == 1 && qos[0] == 0x80) {
throw new MqttException(MqttException.REASON_CODE_SUBSCRIBE_FAILED);
}
}
/*
* @see IMqttClient#publishBlock(String, byte[], int, boolean)
*/
public void publish(String topic, byte[] payload,int qos, boolean retained) throws MqttException,
MqttPersistenceException {
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
message.setRetained(retained);
this.publish(topic, message);
}
public void setManualAcks(boolean manualAcks) {
aClient.setManualAcks(manualAcks);
}
public void messageArrivedComplete(int messageId, int qos) throws MqttException {
aClient.messageArrivedComplete(messageId, qos);
}
........
}
这个类是代理类,其实是MqttAsyncClient干活
- 1.MqttAsyncClient是使用无阻塞运行在后台的轻量级链接server。基于Tcp,包含了ssl
- 2.为了确保消息通过网络被送达和重启客户端带有质量标志的消息要求被保存直到重新被送达。mqtt提供了一种自己持久化机制来保存这些消息。MqttDefaultFilePersistence是默认方式,如果为null则为瞬时消息保存在内存中。MqttClientPersistence可以自己现实接口
- 3.如果在connecting中MqttConnectOptions.setCleanSession(boolean)这个flag,为true也就说,如果client掉线disconnect,下次重连,将清空内存persistence消息,如果为false,就会使用持久化机制去重传
public class MqttAsyncClient implements IMqttAsyncClient { // DestinationProvider {
.......
public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence, MqttPingSender pingSender) throws MqttException {
final String methodName = "MqttAsyncClient";
log.setResourceName(clientId);
if (clientId == null) { //Support empty client Id, 3.1.1 standard
throw new IllegalArgumentException("Null clientId");
}
// Count characters, surrogate pairs count as one character.
int clientIdLength = 0;
for (int i = 0; i < clientId.length() - 1; i++) {
if (Character_isHighSurrogate(clientId.charAt(i)))
i++;
clientIdLength++;
}
if ( clientIdLength > 65535) {
throw new IllegalArgumentException("ClientId longer than 65535 characters");
}
MqttConnectOptions.validateURI(serverURI);
this.serverURI = serverURI;
this.clientId = clientId; // 【: 1】
this.persistence = persistence;
if (this.persistence == null) {
this.persistence = new MemoryPersistence();
}
// @TRACE 101=<init> ClientID={0} ServerURI={1} PersistenceType={2}
log.fine(CLASS_NAME,methodName,"101",new Object[]{clientId,serverURI,persistence});
this.persistence.open(clientId, serverURI); // 【: 2】
this.comms = new ClientComms(this, this.persistence, pingSender); // 【: 3】
this.persistence.close();
this.topics = new Hashtable();
}
protected NetworkModule[] createNetworkModules(String address, MqttConnectOptions options) throws MqttException, MqttSecurityException { // 【: 4】
final String methodName = "createNetworkModules";
// @TRACE 116=URI={0}
log.fine(CLASS_NAME, methodName, "116", new Object[]{address});
NetworkModule[] networkModules = null;
String[] serverURIs = options.getServerURIs();
String[] array = null;
if (serverURIs == null) {
array = new String[]{address};
} else if (serverURIs.length == 0) {
array = new String[]{address};
} else {
array = serverURIs;
}
networkModules = new NetworkModule[array.length];
for (int i = 0; i < array.length; i++) {
networkModules[i] = createNetworkModule(array[i], options);
}
log.fine(CLASS_NAME, methodName, "108");
return networkModules;
}
private NetworkModule createNetworkModule(String address, MqttConnectOptions options) throws MqttException, MqttSecurityException {
final String methodName = "createNetworkModule";
// @TRACE 115=URI={0}
log.fine(CLASS_NAME,methodName, "115", new Object[] {address});
NetworkModule netModule;
String shortAddress;
String host;
int port;
SocketFactory factory = options.getSocketFactory();
int serverURIType = MqttConnectOptions.validateURI(address); // 【: 5】
switch (serverURIType) {
case MqttConnectOptions.URI_TYPE_TCP :
shortAddress = address.substring(6);
host = getHostName(shortAddress);
port = getPort(shortAddress, 1883);
if (factory == null) {
factory = SocketFactory.getDefault();
}
else if (factory instanceof SSLSocketFactory) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
}
netModule = new TCPNetworkModule(factory, host, port, clientId); // 【: 7】
((TCPNetworkModule)netModule).setConnectTimeout(options.getConnectionTimeout());
break;
case MqttConnectOptions.URI_TYPE_SSL:
shortAddress = address.substring(6);
host = getHostName(shortAddress);
port = getPort(shortAddress, 8883);
SSLSocketFactoryFactory factoryFactory = null;
if (factory == null) {
// try {
factoryFactory = new SSLSocketFactoryFactory();
Properties sslClientProps = options.getSSLProperties();
if (null != sslClientProps)
factoryFactory.initialize(sslClientProps, null);
factory = factoryFactory.createSocketFactory(null);
// }
// catch (MqttDirectException ex) {
// throw ExceptionHelper.createMqttException(ex.getCause());
// }
}
else if ((factory instanceof SSLSocketFactory) == false) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
}
// Create the network module...
netModule = new SSLNetworkModule((SSLSocketFactory) factory, host, port, clientId);
((SSLNetworkModule)netModule).setSSLhandshakeTimeout(options.getConnectionTimeout());
// Ciphers suites need to be set, if they are available
if (factoryFactory != null) {
String[] enabledCiphers = factoryFactory.getEnabledCipherSuites(null);
if (enabledCiphers != null) {
((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers);
}
}
break;
case MqttConnectOptions.URI_TYPE_WS:
shortAddress = address.substring(5);
host = getHostName(shortAddress);
port = getPort(shortAddress, 80);
if (factory == null) {
factory = SocketFactory.getDefault();
}
else if (factory instanceof SSLSocketFactory) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
}
netModule = new WebSocketNetworkModule(factory, address, host, port, clientId);
((WebSocketNetworkModule)netModule).setConnectTimeout(options.getConnectionTimeout());
break;
case MqttConnectOptions.URI_TYPE_WSS:
shortAddress = address.substring(6);
host = getHostName(shortAddress);
port = getPort(shortAddress, 443);
SSLSocketFactoryFactory wSSFactoryFactory = null;
if (factory == null) {
wSSFactoryFactory = new SSLSocketFactoryFactory();
Properties sslClientProps = options.getSSLProperties();
if (null != sslClientProps)
wSSFactoryFactory.initialize(sslClientProps, null);
factory = wSSFactoryFactory.createSocketFactory(null);
}
else if ((factory instanceof SSLSocketFactory) == false) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH);
}
// Create the network module...
netModule = new WebSocketSecureNetworkModule((SSLSocketFactory) factory, address, host, port, clientId);
((WebSocketSecureNetworkModule)netModule).setSSLhandshakeTimeout(options.getConnectionTimeout());
// Ciphers suites need to be set, if they are available
if (wSSFactoryFactory != null) {
String[] enabledCiphers = wSSFactoryFactory.getEnabledCipherSuites(null);
if (enabledCiphers != null) {
((SSLNetworkModule) netModule).setEnabledCiphers(enabledCiphers);
}
}
break;
case MqttConnectOptions.URI_TYPE_LOCAL :
netModule = new LocalNetworkModule(address.substring(8));
break;
default:
// This shouldn't happen, as long as validateURI() has been called.
netModule = null;
}
return netModule;
}
.......
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)
throws MqttException, MqttSecurityException {
final String methodName = "connect";
if (comms.isConnected()) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
}
if (comms.isConnecting()) {
throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
}
if (comms.isDisconnecting()) {
throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
}
if (comms.isClosed()) {
throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
}
this.connOpts = options;
this.userContext = userContext;
final boolean automaticReconnect = options.isAutomaticReconnect();
// @TRACE 103=cleanSession={0} connectionTimeout={1} TimekeepAlive={2} userName={3} password={4} will={5} userContext={6} callback={7}
log.fine(CLASS_NAME,methodName, "103",
new Object[]{
Boolean.valueOf(options.isCleanSession()),
new Integer(options.getConnectionTimeout()),
new Integer(options.getKeepAliveInterval()),
options.getUserName(),
((null == options.getPassword())?"[null]":"[notnull]"),
((null == options.getWillMessage())?"[null]":"[notnull]"),
userContext,
callback });
comms.setNetworkModules(createNetworkModules(serverURI, options));
comms.setReconnectCallback(new MqttCallbackExtended() {
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
public void deliveryComplete(IMqttDeliveryToken token) {
}
public void connectComplete(boolean reconnect, String serverURI) {
}
public void connectionLost(Throwable cause) {
if(automaticReconnect){
// Automatic reconnect is set so make sure comms is in resting state
comms.setRestingState(true);
reconnecting = true;
startReconnectCycle();
}
}
});
// Insert our own callback to iterate through the URIs till the connect succeeds
MqttToken userToken = new MqttToken(getClientId()); // 【: 8】....
ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options, userToken, userContext, callback, reconnecting);
userToken.setActionCallback(connectActionListener);
userToken.setUserContext(this);
// If we are using the MqttCallbackExtended, set it on the connectActionListener
if(this.mqttCallback instanceof MqttCallbackExtended){
connectActionListener.setMqttCallbackExtended((MqttCallbackExtended)this.mqttCallback);
}
comms.setNetworkModuleIndex(0);
connectActionListener.connect();
return userToken;
}
........
public IMqttToken disconnect(long quiesceTimeout, Object userContext, IMqttActionListener callback) throws MqttException {
final String methodName = "disconnect";
MqttToken token = new MqttToken(getClientId());
token.setActionCallback(callback);
token.setUserContext(userContext);
MqttDisconnect disconnect = new MqttDisconnect();
try {
comms.disconnect(disconnect, quiesceTimeout, token);
} catch (MqttException ex) {
throw ex;
}
return token;
}
public String getCurrentServerURI(){
return comms.getNetworkModules()[comms.getNetworkModuleIndex()].getServerURI();
}
protected MqttTopic getTopic(String topic) {
MqttTopic.validate(topic, false/*wildcards NOT allowed*/);
MqttTopic result = (MqttTopic)topics.get(topic);
if (result == null) {
result = new MqttTopic(topic, comms);
topics.put(topic,result);
}
return result;
}
public IMqttToken checkPing(Object userContext, IMqttActionListener callback) throws MqttException{ // 【: 9】
final String methodName = "ping";
MqttToken token;
token = comms.checkForActivity();
return token;
}
.........
public IMqttToken subscribe(String[] topicFilters, int[] qos, Object userContext, IMqttActionListener callback) throws MqttException {
final String methodName = "subscribe";
if (topicFilters.length != qos.length) {
throw new IllegalArgumentException();
}
// remove any message handlers for individual topics
for (int i = 0; i < topicFilters.length; ++i) {
this.comms.removeMessageListener(topicFilters[i]);
}
String subs = "";
for (int i=0;i<topicFilters.length;i++) {
if (i>0) {
subs+=", ";
}
subs+= "topic="+ topicFilters[i]+" qos="+qos[i];
//Check if the topic filter is valid before subscribing
MqttTopic.validate(topicFilters[i], true/*allow wildcards*/);
}
//@TRACE 106=Subscribe topicFilter={0} userContext={1} callback={2}
log.fine(CLASS_NAME,methodName,"106",new Object[]{subs, userContext, callback});
MqttToken token = new MqttToken(getClientId());
token.setActionCallback(callback);
token.setUserContext(userContext);
token.internalTok.setTopics(topicFilters);
MqttSubscribe register = new MqttSubscribe(topicFilters, qos);
comms.sendNoWait(register, token);
//@TRACE 109=<
log.fine(CLASS_NAME,methodName,"109");
return token;
}
/* (non-Javadoc)
* @see org.eclipse.paho.client.mqttv3.IMqttAsyncClient#unsubscribe(java.lang.String, java.lang.Object, org.eclipse.paho.client.mqttv3.IMqttActionListener)
*/
public IMqttToken unsubscribe(String topicFilter, Object userContext, IMqttActionListener callback) throws MqttException {
return unsubscribe(new String[] {topicFilter}, userContext, callback);
}
public IMqttToken unsubscribe(String[] topicFilters, Object userContext, IMqttActionListener callback) throws MqttException {
final String methodName = "unsubscribe";
String subs = "";
for (int i=0;i<topicFilters.length;i++) {
if (i>0) {
subs+=", ";
}
subs+=topicFilters[i];
// Check if the topic filter is valid before unsubscribing
// Although we already checked when subscribing, but invalid
// topic filter is meanless for unsubscribing, just prohibit it
// to reduce unnecessary control packet send to broker.
MqttTopic.validate(topicFilters[i], true/*allow wildcards*/);
}
// remove message handlers from the list for this client
for (int i = 0; i < topicFilters.length; ++i) {
this.comms.removeMessageListener(topicFilters[i]);
}
MqttToken token = new MqttToken(getClientId());
token.setActionCallback(callback);
token.setUserContext(userContext);
token.internalTok.setTopics(topicFilters);
MqttUnsubscribe unregister = new MqttUnsubscribe(topicFilters);
comms.sendNoWait(unregister, token);
return token;
}
public void setCallback(MqttCallback callback) {
this.mqttCallback = callback;
comms.setCallback(callback);
}
public void setManualAcks(boolean manualAcks) {
comms.setManualAcks(manualAcks);
}
public void messageArrivedComplete(int messageId, int qos) throws MqttException {
comms.messageArrivedComplete(messageId, qos);
}
public static String generateClientId() { // 【: 10】
//length of nanoTime = 15, so total length = 19 < 65535(defined in spec)
return CLIENT_ID_PREFIX + System.nanoTime();
}
public IMqttDeliveryToken publish(String topic, MqttMessage message, Object userContext, IMqttActionListener callback) throws MqttException,
MqttPersistenceException {
final String methodName = "publish";
//Checks if a topic is valid when publishing a message.
MqttTopic.validate(topic, false/*wildcards NOT allowed*/);
MqttDeliveryToken token = new MqttDeliveryToken(getClientId());
token.setActionCallback(callback);
token.setUserContext(userContext);
token.setMessage(message);
token.internalTok.setTopics(new String[] {topic});
MqttPublish pubMsg = new MqttPublish(topic, message);
comms.sendNoWait(pubMsg, token);
//@TRACE 112=<
log.fine(CLASS_NAME,methodName,"112");
return token;
}
public void reconnect() throws MqttException {
final String methodName = "reconnect";
//@Trace 500=Attempting to reconnect client: {0}
log.fine(CLASS_NAME, methodName, "500", new Object[]{this.clientId});
// Some checks to make sure that we're not attempting to reconnect an already connected client
if (comms.isConnected()) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
}
if (comms.isConnecting()) {
throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
}
if (comms.isDisconnecting()) {
throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
}
if (comms.isClosed()) {
throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
}
// We don't want to spam the server
stopReconnectCycle();
attemptReconnect();
}
private void attemptReconnect(){
final String methodName = "attemptReconnect";
//@Trace 500=Attempting to reconnect client: {0}
log.fine(CLASS_NAME, methodName, "500", new Object[]{this.clientId});
try {
connect(this.connOpts, this.userContext,new IMqttActionListener() {
public void onSuccess(IMqttToken asyncActionToken) {
//@Trace 501=Automatic Reconnect Successful: {0}
log.fine(CLASS_NAME, methodName, "501", new Object[]{asyncActionToken.getClient().getClientId()});
comms.setRestingState(false);
stopReconnectCycle();
}
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
//@Trace 502=Automatic Reconnect failed, rescheduling: {0}
log.fine(CLASS_NAME, methodName, "502", new Object[]{asyncActionToken.getClient().getClientId()});
if(reconnectDelay < 128000){
reconnectDelay = reconnectDelay * 2;
}
rescheduleReconnectCycle(reconnectDelay);
}
});
} catch (MqttSecurityException ex) {
//@TRACE 804=exception
log.fine(CLASS_NAME,methodName,"804",null, ex);
} catch (MqttException ex) {
//@TRACE 804=exception
log.fine(CLASS_NAME,methodName,"804",null, ex);
}
}
private void startReconnectCycle(){
String methodName = "startReconnectCycle";
//@Trace 503=Start reconnect timer for client: {0}, delay: {1}
log.fine(CLASS_NAME, methodName, "503", new Object[]{this.clientId, new Long(reconnectDelay)});
reconnectTimer = new Timer("MQTT Reconnect: " + clientId);
reconnectTimer.schedule(new ReconnectTask(), reconnectDelay);
}
private void stopReconnectCycle(){
String methodName = "stopReconnectCycle";
//@Trace 504=Stop reconnect timer for client: {0}
log.fine(CLASS_NAME, methodName, "504", new Object[]{this.clientId});
reconnectTimer.cancel();
reconnectDelay = 1000; // Reset Delay Timer
}
private void rescheduleReconnectCycle(int delay){
String methodName = "rescheduleReconnectCycle";
//@Trace 505=Rescheduling reconnect timer for client: {0}, delay: {1}
log.fine(CLASS_NAME, methodName, "505", new Object[]{this.clientId, new Long(reconnectDelay)});
reconnectTimer.schedule(new ReconnectTask(), reconnectDelay);
}
private class ReconnectTask extends TimerTask {
private static final String methodName = "ReconnectTask.run";
public void run() {
//@Trace 506=Triggering Automatic Reconnect attempt.
log.fine(CLASS_NAME, methodName, "506");
attemptReconnect();
}
}
..........
}
- 1.从MqttAsyncClient入口开始,需要获得一个url和id,url会做validateURI检查,细节可以自己查看,无非就是那个几个网络前缀的合法性。id这个是由自己生产generateClientId也就是(10)
- 2.这个persistence就是持久化方案,在org.eclipse.paho.client.mqttv3.persist包下,可以自己查看MqttDefaultFilePersistence和MemoryPersistence,这里open就是初始化,一个File,一个创建Hashtable,每次new一个MqttAsyncClient要清空这个目录或者Hashtable
- 3.这个ClientComms这个很重要了,转换器,业务逻辑在这个类开始实现,稍等介绍。
- 4.对于每一个url,Mqtt会其url做网络模式的匹配,最终5我们按上面列子的话就是TCPNetworkModule (7)
- 5.校验uri,对应其type
/**
* Validate a URI
* @param srvURI
* @return the URI type
*/
protected static int validateURI(String srvURI) {
try {
URI vURI = new URI(srvURI);
if (vURI.getScheme().equals("ws")){
return URI_TYPE_WS;
}
else if (vURI.getScheme().equals("wss")) {
return URI_TYPE_WSS;
}
if (!vURI.getPath().equals("")) {
throw new IllegalArgumentException(srvURI);
}
if (vURI.getScheme().equals("tcp")) {
return URI_TYPE_TCP;
}
else if (vURI.getScheme().equals("ssl")) {
return URI_TYPE_SSL;
}
else if (vURI.getScheme().equals("local")) {
return URI_TYPE_LOCAL;
}
else {
throw new IllegalArgumentException(srvURI);
}
} catch (URISyntaxException ex) {
throw new IllegalArgumentException(srvURI);
}
}
- 7.TCPNetworkModule看下这个tcp底层实现
/**
* A network module for connecting over TCP.
*/
public class TCPNetworkModule implements NetworkModule {
.......
public TCPNetworkModule(SocketFactory factory, String host, int port, String resourceContext) {
log.setResourceName(resourceContext);
this.factory = factory;
this.host = host;
this.port = port;
}
/**
* Starts the module, by creating a TCP socket to the server.
*/
public void start() throws IOException, MqttException {
final String methodName = "start";
try {
// InetAddress localAddr = InetAddress.getLocalHost();
// socket = factory.createSocket(host, port, localAddr, 0);
// @TRACE 252=connect to host {0} port {1} timeout {2}
log.fine(CLASS_NAME,methodName, "252", new Object[] {host, new Integer(port), new Long(conTimeout*1000)});
SocketAddress sockaddr = new InetSocketAddress(host, port);
socket = factory.createSocket();
socket.connect(sockaddr, conTimeout*1000);
// SetTcpNoDelay was originally set ot true disabling Nagle's algorithm.
// This should not be required.
// socket.setTcpNoDelay(true); // TCP_NODELAY on, which means we do not use Nagle's algorithm
}
catch (ConnectException ex) {
//@TRACE 250=Failed to create TCP socket
log.fine(CLASS_NAME,methodName,"250",null,ex);
throw new MqttException(MqttException.REASON_CODE_SERVER_CONNECT_ERROR, ex);
}
}
.......
}
- 8.MqttToken提供了一种追踪异步操作的机制
public class MqttToken implements IMqttToken {
/**
* A reference to the the class that provides most of the implementation of the
* MqttToken. MQTT application programs must not use the internal class.
*/
public Token internalTok = null;
public MqttToken(String logContext) {
internalTok = new Token(logContext);
}
public void setActionCallback(IMqttActionListener listener) {
internalTok.setActionCallback(listener);
}
public IMqttActionListener getActionCallback() {
return internalTok.getActionCallback();
}
public void waitForCompletion() throws MqttException {
internalTok.waitForCompletion(-1);
}
public void waitForCompletion(long timeout) throws MqttException {
internalTok.waitForCompletion(timeout);
}
......
}
实际是个代理类,实现在内部Token,我们看几点重点方法即可
public class Token {
public boolean checkResult() throws MqttException {
if ( getException() != null) {
throw getException();
}
return true;
}
public void waitForCompletion() throws MqttException {
waitForCompletion(-1);
}
public void waitForCompletion(long timeout) throws MqttException {
final String methodName = "waitForCompletion";
//@TRACE 407=key={0} wait max={1} token={2}
log.fine(CLASS_NAME,methodName, "407",new Object[]{getKey(), new Long(timeout), this});
MqttWireMessage resp = waitForResponse(timeout);
if (resp == null && !completed) {
//@TRACE 406=key={0} timed out token={1}
log.fine(CLASS_NAME,methodName, "406",new Object[]{getKey(), this});
exception = new MqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT);
throw exception;
}
checkResult();
}
protected MqttWireMessage waitForResponse() throws MqttException {
return waitForResponse(-1);
}
protected MqttWireMessage waitForResponse(long timeout) throws MqttException {
final String methodName = "waitForResponse";
synchronized (responseLock) {
//@TRACE 400=>key={0} timeout={1} sent={2} completed={3} hasException={4} response={5} token={6}
log.fine(CLASS_NAME, methodName, "400",new Object[]{getKey(), new Long(timeout),new Boolean(sent),new Boolean(completed),(exception==null)?"false":"true",response,this},exception);
while (!this.completed) {
if (this.exception == null) {
try {
//@TRACE 408=key={0} wait max={1}
log.fine(CLASS_NAME,methodName,"408",new Object[] {getKey(),new Long(timeout)});
if (timeout <= 0) {
responseLock.wait();
} else {
responseLock.wait(timeout);
}
} catch (InterruptedException e) {
exception = new MqttException(e);
}
}
if (!this.completed) {
if (this.exception != null) {
//@TRACE 401=failed with exception
log.fine(CLASS_NAME,methodName,"401",null,exception);
throw exception;
}
if (timeout > 0) {
// time up and still not completed
break;
}
}
}
}
//@TRACE 402=key={0} response={1}
log.fine(CLASS_NAME,methodName, "402",new Object[]{getKey(), this.response});
return this.response;
}
/**
* Mark the token as complete and ready for users to be notified.
* @param msg response message. Optional - there are no response messages for some flows
* @param ex if there was a problem store the exception in the token.
*/
protected void markComplete(MqttWireMessage msg, MqttException ex) {
final String methodName = "markComplete";
//@TRACE 404=>key={0} response={1} excep={2}
log.fine(CLASS_NAME,methodName,"404",new Object[]{getKey(),msg,ex});
synchronized(responseLock) {
// ACK means that everything was OK, so mark the message for garbage collection.
if (msg instanceof MqttAck) {
this.message = null;
}
this.pendingComplete = true;
this.response = msg;
this.exception = ex;
}
}
/**
* Notifies this token that a response message (an ACK or NACK) has been
* received.
*/
protected void notifyComplete() {
final String methodName = "notifyComplete";
//@TRACE 411=>key={0} response={1} excep={2}
log.fine(CLASS_NAME,methodName,"404",new Object[]{getKey(),this.response, this.exception});
synchronized (responseLock) {
// If pending complete is set then normally the token can be marked
// as complete and users notified. An abnormal error may have
// caused the client to shutdown beween pending complete being set
// and notifying the user. In this case - the action must be failed.
if (exception == null && pendingComplete) {
completed = true;
pendingComplete = false;
} else {
pendingComplete = false;
}
responseLock.notifyAll();
}
synchronized (sentLock) {
sent=true;
sentLock.notifyAll();
}
}
public void waitUntilSent() throws MqttException {
final String methodName = "waitUntilSent";
synchronized (sentLock) {
synchronized (responseLock) {
if (this.exception != null) {
throw this.exception;
}
}
while (!sent) {
try {
//@TRACE 409=wait key={0}
log.fine(CLASS_NAME,methodName, "409",new Object[]{getKey()});
sentLock.wait();
} catch (InterruptedException e) {
}
}
while (!sent) {
if (this.exception == null) {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_UNEXPECTED_ERROR);
}
throw this.exception;
}
}
}
protected void notifySent() {
final String methodName = "notifySent";
//@TRACE 403=> key={0}
log.fine(CLASS_NAME, methodName, "403",new Object[]{getKey()});
synchronized (responseLock) {
this.response = null;
this.completed = false;
}
synchronized (sentLock) {
sent = true;
sentLock.notifyAll();
}
}
}
重要的方法实现在waitForResponse,会实现消费者与生产者模式。可以看出当timeout为-1的时候是死锁的,需要notify,什么时候重置completed
notifyComplete这个是消费者调用,提示生产者可以继续了,否则就等待
- 9.checkPing是需要开发者手动调用的,这个会安排一个Task维持这个链接心跳
- 10.generateClientId()独特id生成
好接下来就是ClientComms里面发送和业务逻辑了,前面都是铺垫有个概念。
public class ClientComms {
public ClientComms(IMqttAsyncClient client, MqttClientPersistence persistence, MqttPingSender pingSender) throws MqttException {
this.conState = DISCONNECTED;
this.client = client;
this.persistence = persistence;
this.pingSender = pingSender;
this.pingSender.init(this);
this.tokenStore = new CommsTokenStore(getClient().getClientId()); // 【: 11】
this.callback = new CommsCallback(this); // 【: 12】
this.clientState = new ClientState(persistence, tokenStore, this.callback, this, pingSender); // 【: 13】
callback.setClientState(clientState);
log.setResourceName(getClient().getClientId());
}
/**
* Sends a message to the server. Does not check if connected this validation must be done
* by invoking routines.
* @param message
* @param token
* @throws MqttException
*/
void internalSend(MqttWireMessage message, MqttToken token) throws MqttException {
if (token.getClient() == null ) {
// Associate the client with the token - also marks it as in use.
token.internalTok.setClient(getClient());
} else {
// Token is already in use - cannot reuse
//@TRACE 213=fail: token in use: key={0} message={1} token={2}
log.fine(CLASS_NAME, methodName, "213", new Object[]{message.getKey(), message, token});
throw new MqttException(MqttException.REASON_CODE_TOKEN_INUSE);
}
try {
// Persist if needed and send the message
this.clientState.send(message, token); // 【: 14】
} catch(MqttException e) {
if (message instanceof MqttPublish) {
this.clientState.undo((MqttPublish)message);
}
throw e;
}
}
/**
* Sends a message to the broker if in connected state, but only waits for the message to be
* stored, before returning.
*/
public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttException { // 【: 15】
final String methodName = "sendNoWait";
if (isConnected() ||
(!isConnected() && message instanceof MqttConnect) ||
(isDisconnecting() && message instanceof MqttDisconnect)) {
if(disconnectedMessageBuffer != null && disconnectedMessageBuffer.getMessageCount() != 0){
this.clientState.persistBufferedMessage(message);
disconnectedMessageBuffer.putMessage(message, token);
} else {
this.internalSend(message, token);
}
} else if(disconnectedMessageBuffer != null && isResting()){
this.clientState.persistBufferedMessage(message);
disconnectedMessageBuffer.putMessage(message, token);
} else {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
}
}
/**
* Sends a connect message and waits for an ACK or NACK.
* Connecting is a special case which will also start up the
* network connection, receive thread, and keep alive thread.
*/
public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
final String methodName = "connect";
synchronized (conLock) {
if (isDisconnected() && !closePending) {
conState = CONNECTING;
conOptions = options;
MqttConnect connect = new MqttConnect(client.getClientId(),
conOptions.getMqttVersion(),
conOptions.isCleanSession(),
conOptions.getKeepAliveInterval(),
conOptions.getUserName(),
conOptions.getPassword(),
conOptions.getWillMessage(),
conOptions.getWillDestination());
this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval());
this.clientState.setCleanSession(conOptions.isCleanSession());
this.clientState.setMaxInflight(conOptions.getMaxInflight());
tokenStore.open();
ConnectBG conbg = new ConnectBG(this, token, connect);
conbg.start();
}
else {
if (isClosed() || closePending) {
throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
} else if (isConnecting()) {
throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
} else if (isDisconnecting()) {
throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
} else {
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
}
}
}
}
public void connectComplete( MqttConnack cack, MqttException mex) throws MqttException {
final String methodName = "connectComplete";
int rc = cack.getReturnCode();
synchronized (conLock) {
if (rc == 0) {
conState = CONNECTED;
return;
}
}
throw mex;
}
// Tidy up. There may be tokens outstanding as the client was
// not disconnected/quiseced cleanly! Work out what tokens still
// need to be notified and waiters unblocked. Store the
// disconnect or connect token to notify after disconnect is
// complete.
private MqttToken handleOldTokens(MqttToken token, MqttException reason) { // 【: 16】
MqttToken tokToNotifyLater = null;
try {
// First the token that was related to the disconnect / shutdown may
// not be in the token table - temporarily add it if not
if (token != null) {
if (tokenStore.getToken(token.internalTok.getKey())==null) {
tokenStore.saveToken(token, token.internalTok.getKey());
}
}
Vector toksToNot = clientState.resolveOldTokens(reason);
Enumeration toksToNotE = toksToNot.elements();
while(toksToNotE.hasMoreElements()) {
MqttToken tok = (MqttToken)toksToNotE.nextElement();
if (tok.internalTok.getKey().equals(MqttDisconnect.KEY) ||
tok.internalTok.getKey().equals(MqttConnect.KEY)) {
// Its con or discon so remember and notify @ end of disc routine
tokToNotifyLater = tok;
} else {
// notify waiters and callbacks of outstanding tokens
// that a problem has occurred and disconnect is in
// progress
callback.asyncOperationComplete(tok);
}
}
}catch(Exception ex) {
// Ignore as we are shutting down
}
return tokToNotifyLater;
}
public void messageArrivedComplete(int messageId, int qos) throws MqttException {
this.callback.messageArrivedComplete(messageId, qos);
}
// Kick off the connect processing in the background so that it does not block. For instance
// the socket could take time to create.
private class ConnectBG implements Runnable {
ClientComms clientComms = null;
Thread cBg = null;
MqttToken conToken;
MqttConnect conPacket;
ConnectBG(ClientComms cc, MqttToken cToken, MqttConnect cPacket) {
clientComms = cc;
conToken = cToken;
conPacket = cPacket;
cBg = new Thread(this, "MQTT Con: "+getClient().getClientId());
}
void start() {
cBg.start();
}
public void run() { // 【: 17】
final String methodName = "connectBG:run";
MqttException mqttEx = null;
//@TRACE 220=>
log.fine(CLASS_NAME, methodName, "220");
try {
// Reset an exception on existing delivery tokens.
// This will have been set if disconnect occured before delivery was
// fully processed.
MqttDeliveryToken[] toks = tokenStore.getOutstandingDelTokens();
for (int i=0; i<toks.length; i++) {
toks[i].internalTok.setException(null);
}
// Save the connect token in tokenStore as failure can occur before send
tokenStore.saveToken(conToken,conPacket);
// Connect to the server at the network level e.g. TCP socket and then
// start the background processing threads before sending the connect
// packet.
NetworkModule networkModule = networkModules[networkModuleIndex];
networkModule.start();
receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
receiver.start("MQTT Rec: "+getClient().getClientId());
sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
sender.start("MQTT Snd: "+getClient().getClientId());
callback.start("MQTT Call: "+getClient().getClientId());
internalSend(conPacket, conToken);
} catch (MqttException ex) {
mqttEx = ex;
} catch (Exception ex) {
mqttEx = ExceptionHelper.createMqttException(ex);
}
if (mqttEx != null) {
shutdownConnection(conToken, mqttEx);
}
}
}
/*
* Check and send a ping if needed and check for ping timeout.
* Need to send a ping if nothing has been sent or received
* in the last keepalive interval.
* Passes an IMqttActionListener to ClientState.checkForActivity
* so that the callbacks are attached as soon as the token is created
* (Bug 473928)
*/
public MqttToken checkForActivity(IMqttActionListener pingCallback){
MqttToken token = null;
try{
token = clientState.checkForActivity(pingCallback);
}catch(MqttException e){
handleRunException(e);
}catch(Exception e){
handleRunException(e);
}
return token;
}
......
}
重连和关闭我都干掉了,无非就是自己加入Task和判断flag,是否关闭和连接,大家可以自行查看
- 11.CommsTokenStore提供了保存和追踪token的机制即使多线程,当一条消息被sent,所关联的Token将会被保存,任何一个感兴趣的追踪状态通过getTkoen在wait方法或者使用监听进行回调,这个类保存token仅仅只有request时候才有唯一id,其他消息是无固定的。
public class CommsTokenStore {
private Hashtable tokens;
protected void saveToken(MqttToken token, String key) {
final String methodName = "saveToken";
synchronized(tokens) {
//@TRACE 307=key={0} token={1}
log.fine(CLASS_NAME,methodName,"307",new Object[]{key,token.toString()});
token.internalTok.setKey(key);
this.tokens.put(key, token);
}
}
......
}
- 12.CommsCallback是桥接Receiver和外部api回掉的,它将转换MQTT message objects进行最终回掉外部。下节介绍
- 13.ClientState核心类,包含了飞行窗口的滑动机制和消息质量控制flag,稍后。
- 14.每一条消息都由对应的Token
- 15.用于重连后重新发送的,跟消息质量有关
- 16.也是断线后保证消息达到机制
- 17.在外部调用connect时候就开始run这个Task,然后开开启了CommsReceiver和CommsSender,然后保存和发送第一条connect消息
ClientState 我拆开来看,太长了
这个类是客户端核心类,它包含了等待中和飞行消息的信息状态。
这里消息能被多个对象传递,当它在传送移动被接收的时候:
- 当客户端重启,会使用持久化的消息,进行重新传递。
- 当客户端或者特殊消息的客户端状态被实例化将会读下面持久化
Qos 为 2 PUBLISH 或者 PUBREL 将在 outboundqos2 hashtable中
Qos 为 1 PUBLISH 将在 outboundqos1 hashtable - 在连接的时候,复制messages从outbound hashtables到pendingMessages 或者
pendingFlows vector按照messageid排序
初始化的消息发布通过pendingmessages buffer(等待数据队列)
PUBREL 消息通过 pendingflows buffer (飞行窗口队列) - 发送线程同时从pendingflows和pendingmessages拿取数据。这个消息从pendingbuffer移除的时候,还会有备份在outbound* hashtable
- 接收线程
如果接受到 Qos 1的消息,将会移除持久化和outboundqos1对应消息
如果接受到 QoS 2的PUBREC send PUBREL消息,将会更新outboundqos2整个队列关于PUBREL的数据,并且刷新持久化。
如果接受到 QoS 2的PUBCOMP消息,将会移除持久化和outboundqos2对应消息
public class ClientState {
private static final int MIN_MSG_ID = 1; // Lowest possible MQTT message ID to use
private static final int MAX_MSG_ID = 65535; // Highest possible MQTT message ID to use
private int nextMsgId = MIN_MSG_ID - 1; // The next available message ID to use
private Hashtable inUseMsgIds; // Used to store a set of in-use message IDs
volatile private Vector pendingMessages;
volatile private Vector pendingFlows;
private CommsTokenStore tokenStore;
private ClientComms clientComms = null;
private CommsCallback callback = null;
private long keepAlive;
private boolean cleanSession;
private MqttClientPersistence persistence;
private int maxInflight = 0;
private int actualInFlight = 0;
private int inFlightPubRels = 0;
private Object queueLock = new Object();
private Object quiesceLock = new Object();
private boolean quiescing = false;
private long lastOutboundActivity = 0;
private long lastInboundActivity = 0;
private long lastPing = 0;
private MqttWireMessage pingCommand;
private Object pingOutstandingLock = new Object();
private int pingOutstanding = 0;
private boolean connected = false;
private Hashtable outboundQoS2 = null;
private Hashtable outboundQoS1 = null;
private Hashtable outboundQoS0 = null;
private Hashtable inboundQoS2 = null;
private MqttPingSender pingSender = null;
protected ClientState(MqttClientPersistence persistence, CommsTokenStore tokenStore,
CommsCallback callback, ClientComms clientComms, MqttPingSender pingSender) throws MqttException {
log.setResourceName(clientComms.getClient().getClientId());
log.finer(CLASS_NAME, "<Init>", "" );
inUseMsgIds = new Hashtable();
pendingFlows = new Vector();
outboundQoS2 = new Hashtable();
outboundQoS1 = new Hashtable();
outboundQoS0 = new Hashtable();
inboundQoS2 = new Hashtable();
pingCommand = new MqttPingReq();
inFlightPubRels = 0;
actualInFlight = 0;
this.persistence = persistence;
this.callback = callback;
this.tokenStore = tokenStore;
this.clientComms = clientComms;
this.pingSender = pingSender;
restoreState();
}
先有个类的概念我拆出来看。
/**
* Submits a message for delivery. This method will block until there is
* room in the inFlightWindow for the message. The message is put into
* persistence before returning.
*
* @param message the message to send
* @param token the token that can be used to track delivery of the message
* @throws MqttException
*/
public void send(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "send";
if (message.isMessageIdRequired() && (message.getMessageId() == 0)) {
message.setMessageId(getNextMessageId());
}
if (token != null ) {
try {
token.internalTok.setMessageID(message.getMessageId());
} catch (Exception e) {
}
}
if (message instanceof MqttPublish) {
synchronized (queueLock) {
if (actualInFlight >= this.maxInflight) {
throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
}
MqttMessage innerMessage = ((MqttPublish) message).getMessage();
switch(innerMessage.getQos()) {
case 2:
outboundQoS2.put(new Integer(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
break;
case 1:
outboundQoS1.put(new Integer(message.getMessageId()), message);
persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
break;
}
tokenStore.saveToken(token, message);
pendingMessages.addElement(message);
queueLock.notifyAll();
}
} else {
if (message instanceof MqttConnect) {
synchronized (queueLock) {
// Add the connect action at the head of the pending queue ensuring it jumps
// ahead of any of other pending actions.
tokenStore.saveToken(token, message);
pendingFlows.insertElementAt(message,0);
queueLock.notifyAll();
}
} else {
if (message instanceof MqttPingReq) {
this.pingCommand = message;
}
else if (message instanceof MqttPubRel) {
outboundQoS2.put(new Integer(message.getMessageId()), message);
persistence.put(getSendConfirmPersistenceKey(message), (MqttPubRel) message);
}
else if (message instanceof MqttPubComp) {
persistence.remove(getReceivedPersistenceKey(message));
}
synchronized (queueLock) {
if ( !(message instanceof MqttAck )) {
tokenStore.saveToken(token, message);
}
pendingFlows.addElement(message);
queueLock.notifyAll();
}
}
}
}
}
send方法是阻塞的直到飞行窗口有消息,每一个消息进来之前都会被持久化
我们看发送,一开始进来MqttConnect,而后才是MqttPublish方法,涉及重连和其他特殊message才走else的逻辑。
先判断真实的滑动指针是否超过最大的滑动容量,然后进行消息质量判断,持久化以后然后加入相应的队列中,最后在CommsSender中write出去
public class CommsSender implements Runnable {
private static final String CLASS_NAME = CommsSender.class.getName();
private static final Logger log = LoggerFactory.getLogger(LoggerFactory.MQTT_CLIENT_MSG_CAT, CLASS_NAME);
//Sends MQTT packets to the server on its own thread
private boolean running = false;
private Object lifecycle = new Object();
private ClientState clientState = null;
private MqttOutputStream out;
private ClientComms clientComms = null;
private CommsTokenStore tokenStore = null;
private Thread sendThread = null;
public CommsSender(ClientComms clientComms, ClientState clientState, CommsTokenStore tokenStore, OutputStream out) {
this.out = new MqttOutputStream(clientState, out);
this.clientComms = clientComms;
this.clientState = clientState;
this.tokenStore = tokenStore;
log.setResourceName(clientComms.getClient().getClientId());
}
public void run() {
final String methodName = "run";
MqttWireMessage message = null;
while (running && (out != null)) {
try {
message = clientState.get();
if (message != null) {
if (message instanceof MqttAck) {
out.write(message);
out.flush();
} else {
MqttToken token = tokenStore.getToken(message);
// While quiescing the tokenstore can be cleared so need
// to check for null for the case where clear occurs
// while trying to send a message.
if (token != null) {
synchronized (token) {
out.write(message);
try {
out.flush();
} catch (IOException ex) {
// The flush has been seen to fail on disconnect of a SSL socket
// as disconnect is in progress this should not be treated as an error
if (!(message instanceof MqttDisconnect)) {
throw ex;
}
}
clientState.notifySent(message);
}
}
}
} else { // null message
running = false;
}
} catch (MqttException me) {
handleRunException(message, me);
} catch (Exception ex) {
handleRunException(message, ex);
}
} // end while
我们看看clientState.get(),get方法是阻塞的。
/**
* This returns the next piece of work, ie message, for the CommsSender
* to send over the network.
* Calls to this method block until either:
* - there is a message to be sent
* - the keepAlive interval is exceeded, which triggers a ping message
* to be returned
* - {@link #disconnected(MqttException, boolean)} is called
* @return the next message to send, or null if the client is disconnected
*/
protected MqttWireMessage get() throws MqttException {
final String methodName = "get";
MqttWireMessage result = null;
synchronized (queueLock) {
while (result == null) {
// If there is no work wait until there is work.
// If the inflight window is full and no flows are pending wait until space is freed.
// In both cases queueLock will be notified.
if ((pendingMessages.isEmpty() && pendingFlows.isEmpty()) ||
(pendingFlows.isEmpty() && actualInFlight >= this.maxInflight)) {
try {
queueLock.wait();
} catch (InterruptedException e) {
}
}
// Handle the case where not connected. This should only be the case if:
// - in the process of disconnecting / shutting down
// - in the process of connecting
if (!connected &&
(pendingFlows.isEmpty() || !((MqttWireMessage)pendingFlows.elementAt(0) instanceof MqttConnect))) {
return null;
}
// Check if there is a need to send a ping to keep the session alive.
// Note this check is done before processing messages. If not done first
// an app that only publishes QoS 0 messages will prevent keepalive processing
// from functioning.
// checkForActivity(); //Use pinger, don't check here
// Now process any queued flows or messages
if (!pendingFlows.isEmpty()) {
// Process the first "flow" in the queue
result = (MqttWireMessage)pendingFlows.remove(0);
if (result instanceof MqttPubRel) {
inFlightPubRels++;
//@TRACE 617=+1 inflightpubrels={0}
log.fine(CLASS_NAME,methodName,"617", new Object[]{new Integer(inFlightPubRels)});
}
checkQuiesceLock();
} else if (!pendingMessages.isEmpty()) {
// If the inflight window is full then messages are not
// processed until the inflight window has space.
if (actualInFlight < this.maxInflight) {
// The in flight window is not full so process the
// first message in the queue
result = (MqttWireMessage)pendingMessages.elementAt(0);
pendingMessages.removeElementAt(0);
actualInFlight++;
//@TRACE 623=+1 actualInFlight={0}
log.fine(CLASS_NAME,methodName,"623",new Object[]{new Integer(actualInFlight)});
} else {
//@TRACE 622=inflight window full
log.fine(CLASS_NAME,methodName,"622");
}
}
}
}
return result;
}
每次拿去数据从飞行窗口开始的,双向队列,如果飞行窗口没有了,才从等待数据队列进入,其中产生的message会根据flag优先加入哪些队列,如断线后,重连的消息会置顶。
其中pendingMessages和pendingFlows联系不大不会把其中数据做拷贝到pendingMessages中,pendingFlows只做特殊的消息处理,它有自带一套响应回调机制(如MqttConnect),而pendingMessages专门给外部Publish做缓存队列的。
接收端