Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Synchronize put/remove send details hashmaps for thread safety #427

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ buildscript {
google()
}
dependencies {
classpath 'com.android.tools.build:gradle:3.6.3'
classpath 'com.android.tools.build:gradle:4.0.1'
}
}

Expand All @@ -21,7 +21,7 @@ ext {
serviceArchivesBaseName = 'org.eclipse.paho.android.service'
serviceVersion = '1.2.0'

clientVersion = '1.2.0'
clientVersion = '1.2.5'

mavenUrl = "https://repo.eclipse.org/content/repositories/paho-snapshots/"
}
Expand Down
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#Wed Oct 04 11:50:11 BST 2017
#Tue Aug 18 18:10:50 PDT 2020
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.4-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.5.1-all.zip
Original file line number Diff line number Diff line change
Expand Up @@ -1028,6 +1028,33 @@ public IMqttToken unsubscribe(String[] topic, Object userContext, IMqttActionLis
return token;
}

/**
* Removes a published message corresponding to the token.
* <p>If a publish is requested with QoS1 or Qos2 and the publish callback is
* not called yet, this function returns true, the publish called will never
* be called, and a messageId corresponding to the token will become reusable.
* </p>
* <p>If the publish callback is already be called, this function returns false.
* </p>
* <p>This function might not stop sending the published message.
* </p>
* *
*
* @param token the token of removing published message
* @return if the message is removed then true, otherwise false
* @throws MqttException if there was an error removing the message.
*/
@Override
public boolean removeMessage(IMqttDeliveryToken token) throws MqttException {
if (token.getMessage().getQos() > 0 && !token.isComplete())
return false;
int i = tokenMap.indexOfValue(token);
if (i > -1) tokenMap.removeAt(i);
if (callback != null)
callback.deliveryComplete(token);
return true;
}

/**
* Returns the delivery tokens for any outstanding publish operations.
* <p>
Expand Down Expand Up @@ -1176,6 +1203,16 @@ public void setManualAcks(boolean manualAcks) {
throw new UnsupportedOperationException();
}

/**
* Will attempt to reconnect to the server after the client has lost connection.
*
* @throws MqttException if an error occurs attempting to reconnect
*/
@Override
public void reconnect() throws MqttException {
mqttService.reconnect();
}

/**
* Process the results of a connection
*
Expand Down Expand Up @@ -1405,6 +1442,19 @@ public void deleteBufferedMessage(int bufferIndex) {
mqttService.deleteBufferedMessage(clientHandle, bufferIndex);
}

/**
* Returns the current number of outgoing in-flight messages being sent by the
* client. Note that this number cannot be guaranteed to be 100% accurate as
* some messages may have been sent or queued in the time taken for this method
* to return.
*
* @return the current number of in-flight messages.
*/
@Override
public int getInFlightMessageCount() {
return mqttService.getInFlightMessageCount(clientHandle);
}

/**
* Get the SSLSocketFactory using SSL key store and password
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,21 +710,10 @@ public void deliveryComplete(IMqttDeliveryToken messageToken) {

service.traceDebug(TAG, "deliveryComplete(" + messageToken + ")");

MqttMessage message = savedSentMessages.remove(messageToken);
if (message != null) { // If I don't know about the message, it's
// irrelevant
String topic = savedTopics.remove(messageToken);
String activityToken = savedActivityTokens.remove(messageToken);
String invocationContext = savedInvocationContexts.remove(messageToken);

Bundle resultBundle = messageToBundle(null, topic, message);
if (activityToken != null) {
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.SEND_ACTION);
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN, activityToken);
resultBundle.putString(MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, invocationContext);

Bundle resultBundle = popSendDetails(messageToken);
if (resultBundle != null) {
if (MqttServiceConstants.SEND_ACTION.equals(resultBundle.getString(MqttServiceConstants.CALLBACK_ACTION)))
service.callbackToActivity(clientHandle, Status.OK, resultBundle);
}
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.MESSAGE_DELIVERED_ACTION);
service.callbackToActivity(clientHandle, Status.OK, resultBundle);
}
Expand All @@ -751,6 +740,29 @@ public void messageArrived(String topic, MqttMessage message) throws Exception {
service.callbackToActivity(clientHandle, Status.OK, resultBundle);
}

/**
* Removed store details of sent messages in "deliveryComplete"
* callbacks from the mqttClient
*/
private synchronized Bundle popSendDetails(final IMqttDeliveryToken messageToken) {
MqttMessage message = savedSentMessages.remove(messageToken);
if (message != null) { // If I don't know about the message, it's
// irrelevant
String topic = savedTopics.remove(messageToken);
String activityToken = savedActivityTokens.remove(messageToken);
String invocationContext = savedInvocationContexts.remove(messageToken);

Bundle resultBundle = messageToBundle(null, topic, message);
if (activityToken != null) {
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION, MqttServiceConstants.SEND_ACTION);
resultBundle.putString(MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN, activityToken);
resultBundle.putString(MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, invocationContext);
}
return resultBundle;
}
return null;
}

/**
* Store details of sent messages so we can handle "deliveryComplete"
* callbacks from the mqttClient
Expand All @@ -761,7 +773,7 @@ public void messageArrived(String topic, MqttMessage message) throws Exception {
* @param invocationContext
* @param activityToken
*/
private void storeSendDetails(final String topic, final MqttMessage msg, final IMqttDeliveryToken messageToken,
private synchronized void storeSendDetails(final String topic, final MqttMessage msg, final IMqttDeliveryToken messageToken,
final String invocationContext, final String activityToken) {
savedTopics.put(messageToken, topic);
savedSentMessages.put(messageToken, msg);
Expand Down Expand Up @@ -920,6 +932,10 @@ public void deleteBufferedMessage(int bufferIndex) {
myClient.deleteBufferedMessage(bufferIndex);
}

public int getInFlightMessageCount() {
return myClient.getInFlightMessageCount();
}

/**
* General-purpose IMqttActionListener for the Client context
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,11 @@ public void deleteBufferedMessage(String clientHandle, int bufferIndex) {
client.deleteBufferedMessage(bufferIndex);
}

public int getInFlightMessageCount(String clientHandle) {
MqttConnection client = getConnection(clientHandle);
return client.getInFlightMessageCount();
}

/*
* Called in response to a change in network connection - after losing a
* connection to the server, this allows us to wait until we have a usable
Expand Down