diff --git a/pom.xml b/pom.xml
index 1373c76..23448ee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,7 +58,7 @@
com.github.nkzawa
engine.io-client
- 0.3.1
+ 0.4.0-SNAPSHOT
org.json
diff --git a/src/main/java/com/github/nkzawa/backo/Backoff.java b/src/main/java/com/github/nkzawa/backo/Backoff.java
new file mode 100644
index 0000000..bf0ca01
--- /dev/null
+++ b/src/main/java/com/github/nkzawa/backo/Backoff.java
@@ -0,0 +1,50 @@
+package com.github.nkzawa.backo;
+
+public class Backoff {
+
+ private long ms = 100;
+ private long max = 10000;
+ private int factor = 2;
+ private double jitter = 0.0;
+ private int attempts = 0;
+
+ public Backoff() {}
+
+ public long duration() {
+ long ms = this.ms * (long) Math.pow(this.factor, this.attempts++);
+ if (jitter != 0.0) {
+ double rand = Math.random();
+ int deviation = (int) Math.floor(rand * this.jitter * ms);
+ ms = (((int) Math.floor(rand * 10)) & 1) == 0 ? ms - deviation : ms + deviation;
+ }
+ return Math.min(ms, this.max);
+ }
+
+ public void reset() {
+ this.attempts = 0;
+ }
+
+ public Backoff setMin(long min) {
+ this.ms = min;
+ return this;
+ }
+
+ public Backoff setMax(long max) {
+ this.max = max;
+ return this;
+ }
+
+ public Backoff setFactor(int factor) {
+ this.factor = factor;
+ return this;
+ }
+
+ public Backoff setJitter(double jitter) {
+ this.jitter = jitter;
+ return this;
+ }
+
+ public int getAttempts() {
+ return this.attempts;
+ }
+}
diff --git a/src/main/java/com/github/nkzawa/socketio/client/Manager.java b/src/main/java/com/github/nkzawa/socketio/client/Manager.java
index c111905..9478869 100644
--- a/src/main/java/com/github/nkzawa/socketio/client/Manager.java
+++ b/src/main/java/com/github/nkzawa/socketio/client/Manager.java
@@ -1,5 +1,6 @@
package com.github.nkzawa.socketio.client;
+import com.github.nkzawa.backo.Backoff;
import com.github.nkzawa.emitter.Emitter;
import com.github.nkzawa.socketio.parser.Packet;
import com.github.nkzawa.socketio.parser.Parser;
@@ -76,13 +77,13 @@ public class Manager extends Emitter {
private boolean skipReconnect;
private boolean reconnecting;
private boolean encoding;
- private boolean openReconnect;
private int _reconnectionAttempts;
private long _reconnectionDelay;
private long _reconnectionDelayMax;
+ private double _randomizationFactor;
+ private Backoff backoff;
private long _timeout;
private Set connected;
- private int attempts;
private URI uri;
private List packetBuffer;
private Queue subs;
@@ -129,11 +130,15 @@ public class Manager extends Emitter {
this.reconnectionAttempts(opts.reconnectionAttempts != 0 ? opts.reconnectionAttempts : Integer.MAX_VALUE);
this.reconnectionDelay(opts.reconnectionDelay != 0 ? opts.reconnectionDelay : 1000);
this.reconnectionDelayMax(opts.reconnectionDelayMax != 0 ? opts.reconnectionDelayMax : 5000);
+ this.randomizationFactor(opts.randomizationFactor != 0.0 ? opts.randomizationFactor : 0.5);
+ this.backoff = new Backoff()
+ .setMin(this.reconnectionDelay())
+ .setMax(this.reconnectionDelayMax())
+ .setJitter(this.randomizationFactor());
this.timeout(opts.timeout < 0 ? 20000 : opts.timeout);
this.readyState = ReadyState.CLOSED;
this.uri = uri;
this.connected = new HashSet();
- this.attempts = 0;
this.encoding = false;
this.packetBuffer = new ArrayList();
this.encoder = new Parser.Encoder();
@@ -147,6 +152,15 @@ public class Manager extends Emitter {
}
}
+ /**
+ * Update `socket.id` of all sockets
+ */
+ private void updateSocketIds() {
+ for (Socket socket : this.nsps.values()) {
+ socket.id = this.engine.id();
+ }
+ }
+
public boolean reconnection() {
return this._reconnection;
}
@@ -171,6 +185,21 @@ public class Manager extends Emitter {
public Manager reconnectionDelay(long v) {
this._reconnectionDelay = v;
+ if (this.backoff != null) {
+ this.backoff.setMin(v);
+ }
+ return this;
+ }
+
+ public double randomizationFactor() {
+ return this._randomizationFactor;
+ }
+
+ public Manager randomizationFactor(double v) {
+ this._randomizationFactor = v;
+ if (this.backoff != null) {
+ this.backoff.setJitter(v);
+ }
return this;
}
@@ -180,6 +209,9 @@ public class Manager extends Emitter {
public Manager reconnectionDelayMax(long v) {
this._reconnectionDelayMax = v;
+ if (this.backoff != null) {
+ this.backoff.setMax(v);
+ }
return this;
}
@@ -194,8 +226,7 @@ public class Manager extends Emitter {
private void maybeReconnectOnOpen() {
// Only try to reconnect if it's the first time we're connecting
- if (!this.openReconnect && !this.reconnecting && this._reconnection && this.attempts == 0) {
- this.openReconnect = true;
+ if (!this.reconnecting && this._reconnection && this.backoff.getAttempts() == 0) {
this.reconnect();
}
}
@@ -252,9 +283,10 @@ public class Manager extends Emitter {
Exception err = new SocketIOException("Connection error",
data instanceof Exception ? (Exception) data : null);
fn.call(err);
+ } else {
+ // Only do this if there is no fn to handle the error
+ self.maybeReconnectOnOpen();
}
-
- self.maybeReconnectOnOpen();
}
});
@@ -371,6 +403,7 @@ public class Manager extends Emitter {
socket.on(Socket.EVENT_CONNECT, new Listener() {
@Override
public void call(Object... objects) {
+ s.id = self.engine.id();
self.connected.add(s);
}
});
@@ -425,6 +458,7 @@ public class Manager extends Emitter {
/*package*/ void close() {
this.skipReconnect = true;
+ this.backoff.reset();
this.readyState = ReadyState.CLOSED;
if (this.engine != null) {
this.engine.close();
@@ -434,6 +468,7 @@ public class Manager extends Emitter {
private void onclose(String reason) {
logger.fine("close");
this.cleanup();
+ this.backoff.reset();
this.readyState = ReadyState.CLOSED;
this.emit(EVENT_CLOSE, reason);
@@ -453,15 +488,14 @@ public class Manager extends Emitter {
if (this.reconnecting || this.skipReconnect) return;
final Manager self = this;
- this.attempts++;
- if (attempts > this._reconnectionAttempts) {
+ if (this.backoff.getAttempts() >= this._reconnectionAttempts) {
logger.fine("reconnect failed");
+ this.backoff.reset();
this.emitAll(EVENT_RECONNECT_FAILED);
this.reconnecting = false;
} else {
- long delay = this.attempts * this.reconnectionDelay();
- delay = Math.min(delay, this.reconnectionDelayMax());
+ long delay = this.backoff.duration();
logger.fine(String.format("will wait %dms before reconnect attempt", delay));
this.reconnecting = true;
@@ -474,8 +508,9 @@ public class Manager extends Emitter {
if (self.skipReconnect) return;
logger.fine("attempting reconnect");
- self.emitAll(EVENT_RECONNECT_ATTEMPT, self.attempts);
- self.emitAll(EVENT_RECONNECTING, self.attempts);
+ int attempts = self.backoff.getAttempts();
+ self.emitAll(EVENT_RECONNECT_ATTEMPT, attempts);
+ self.emitAll(EVENT_RECONNECTING, attempts);
// check again for the case socket closed in above events
if (self.skipReconnect) return;
@@ -509,9 +544,10 @@ public class Manager extends Emitter {
}
private void onreconnect() {
- int attempts = this.attempts;
- this.attempts = 0;
+ int attempts = this.backoff.getAttempts();
this.reconnecting = false;
+ this.backoff.reset();
+ this.updateSocketIds();
this.emitAll(EVENT_RECONNECT, attempts);
}
@@ -549,6 +585,7 @@ public class Manager extends Emitter {
public int reconnectionAttempts;
public long reconnectionDelay;
public long reconnectionDelayMax;
+ public double randomizationFactor;
public long timeout = -1;
}
}
diff --git a/src/main/java/com/github/nkzawa/socketio/client/Socket.java b/src/main/java/com/github/nkzawa/socketio/client/Socket.java
index c16c25d..137082a 100644
--- a/src/main/java/com/github/nkzawa/socketio/client/Socket.java
+++ b/src/main/java/com/github/nkzawa/socketio/client/Socket.java
@@ -68,6 +68,8 @@ public class Socket extends Emitter {
put(EVENT_RECONNECTING, 1);
}};
+ /*package*/ String id;
+
private volatile boolean connected;
private int ids;
private String nsp;
@@ -262,6 +264,7 @@ public class Socket extends Emitter {
private void onclose(String reason) {
logger.fine(String.format("close (%s)", reason));
this.connected = false;
+ this.id = null;
this.emit(EVENT_DISCONNECT, reason);
}
@@ -418,11 +421,22 @@ public class Socket extends Emitter {
}
public Manager io() {
- return io;
+ return this.io;
}
public boolean connected() {
- return connected;
+ return this.connected;
+ }
+
+ /**
+ * A property on the socket instance that is equal to the underlying engine.io socket id.
+ *
+ * The value is present once the socket has connected, is removed when the socket disconnects and is updated if the socket reconnects.
+ *
+ * @return a socket id
+ */
+ public String id() {
+ return this.id;
}
private static Object[] toArray(JSONArray array) {
diff --git a/src/test/java/com/github/nkzawa/backo/BackoffTest.java b/src/test/java/com/github/nkzawa/backo/BackoffTest.java
new file mode 100644
index 0000000..66f8734
--- /dev/null
+++ b/src/test/java/com/github/nkzawa/backo/BackoffTest.java
@@ -0,0 +1,22 @@
+package com.github.nkzawa.backo;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class BackoffTest {
+
+ @Test
+ public void durationShouldIncreaseTheBackoff() {
+ Backoff b = new Backoff();
+
+ assertTrue(100 == b.duration());
+ assertTrue(200 == b.duration());
+ assertTrue(400 == b.duration());
+ assertTrue(800 == b.duration());
+
+ b.reset();
+ assertTrue(100 == b.duration());
+ assertTrue(200 == b.duration());
+ }
+}
diff --git a/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java b/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java
index 54f9a0a..5ae28ff 100644
--- a/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java
+++ b/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java
@@ -298,6 +298,93 @@ public class ConnectionTest extends Connection {
values.take();
}
+ @Test(timeout = TIMEOUT)
+ public void attemptReconnectsAfterAFailedReconnect() throws URISyntaxException, InterruptedException {
+ final BlockingQueue