Skip to content

Commit

Permalink
Fix NullPointer in HTTPClient.addBackTCPClient
Browse files Browse the repository at this point in the history
A race condition could result in the ArrayDeque being removed right after addBackTCPClient added it.  Resulting in a NPE when we try to synchronize on it after doing the .get().
The fix is to change how we add back.  We already had good locks in place when removing (though checkIdleSockets forgot to check when all sockets were closed).
When adding we may loop mutliple times if the state changes from what we expect, because it can be hard to have the lock before we need to add it.
If the ArrayDeque already exists, the solution is easy, we just need to be sure it still exists once we actually are able to synchronize (otherwise it may have been removed while waiting to synchronize on it)
If it does not exist we will optimistically construct the ArrayDeque, and add the element to it (so that if another thread sees the new queue, it wont be removed due to being empty).  Then finally try to do an atomic `putIfAbsent`, if this fails, we will loop and retry.
  • Loading branch information
jentfoo committed May 3, 2018
1 parent 4823d9a commit 7802faf
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -410,30 +410,43 @@ private void addBackTCPClient(final HTTPAddress ha, final TCPClient client) {
client.close();
return;
}
if(!client.isClosed()) {
ArrayDeque<Pair<Long,TCPClient>> ll = sockets.get(ha);
Pair<Long,TCPClient> p = new Pair<>(Clock.lastKnownForwardProgressingMillis(), client);
while (!client.isClosed()) {
ArrayDeque<Pair<Long,TCPClient>> ll = sockets.get(ha);
if(ll == null) {
sockets.put(ha, new ArrayDeque<>(8));
ll = sockets.get(ha);
}
synchronized(ll) {
ll.add(new Pair<>(Clock.lastKnownForwardProgressingMillis(), client));
ll = new ArrayDeque<>(8);
ll.add(p);
if (sockets.putIfAbsent(ha, ll) == null) {
break;
}
} else {
synchronized(ll) {
if (sockets.get(ha) == ll) {
ll.add(p);
break;
}
}
}
}
}

private void checkIdleSockets() {
if(maxIdleTime > 0) {
for(ArrayDeque<Pair<Long,TCPClient>> adq: sockets.values()) {
Iterator<ArrayDeque<Pair<Long,TCPClient>>> hostSocketsIt = sockets.values().iterator();
while (hostSocketsIt.hasNext()) {
ArrayDeque<Pair<Long,TCPClient>> adq = hostSocketsIt.next();
synchronized(adq) {
Iterator<Pair<Long,TCPClient>> iter = adq.iterator();
while(iter.hasNext()) {
Pair<Long,TCPClient> c = iter.next();
Iterator<Pair<Long,TCPClient>> clientConIt = adq.iterator();
while(clientConIt.hasNext()) {
Pair<Long,TCPClient> c = clientConIt.next();
if(Clock.lastKnownForwardProgressingMillis() - c.getLeft() > maxIdleTime) {
iter.remove();
clientConIt.remove();
c.getRight().close();
}
}
if (adq.isEmpty()) {
hostSocketsIt.remove();
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
group = org.threadly
version = 0.15
version = 0.16
threadlyVersion = 5.16
litesocketsVersion = 4.3
org.gradle.parallel=true

0 comments on commit 7802faf

Please sign in to comment.