From 8a4ffe95c5ae0d81beef37a3a2626554a24d8cd1 Mon Sep 17 00:00:00 2001 From: Naoyuki Kanezawa Date: Mon, 26 Jan 2015 07:00:46 +0900 Subject: [PATCH] compatible with socket.io 1.3.2 --- pom.xml | 2 +- .../java/com/github/nkzawa/backo/Backoff.java | 50 +++++++++ .../nkzawa/socketio/client/Manager.java | 67 +++++++++--- .../github/nkzawa/socketio/client/Socket.java | 18 ++- .../com/github/nkzawa/backo/BackoffTest.java | 22 ++++ .../socketio/client/ConnectionTest.java | 91 +++++++++++++++- .../nkzawa/socketio/client/SocketTest.java | 103 ++++++++++++++++++ .../java/com/github/nkzawa/util/Optional.java | 44 ++++++++ src/test/resources/package.json | 2 +- 9 files changed, 378 insertions(+), 21 deletions(-) create mode 100644 src/main/java/com/github/nkzawa/backo/Backoff.java create mode 100644 src/test/java/com/github/nkzawa/backo/BackoffTest.java create mode 100644 src/test/java/com/github/nkzawa/socketio/client/SocketTest.java create mode 100644 src/test/java/com/github/nkzawa/util/Optional.java diff --git a/pom.xml b/pom.xml index 1373c76..23448ee 100644 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,7 @@ com.github.nkzawa engine.io-client - 0.3.1 + 0.4.0-SNAPSHOT org.json diff --git a/src/main/java/com/github/nkzawa/backo/Backoff.java b/src/main/java/com/github/nkzawa/backo/Backoff.java new file mode 100644 index 0000000..bf0ca01 --- /dev/null +++ b/src/main/java/com/github/nkzawa/backo/Backoff.java @@ -0,0 +1,50 @@ +package com.github.nkzawa.backo; + +public class Backoff { + + private long ms = 100; + private long max = 10000; + private int factor = 2; + private double jitter = 0.0; + private int attempts = 0; + + public Backoff() {} + + public long duration() { + long ms = this.ms * (long) Math.pow(this.factor, this.attempts++); + if (jitter != 0.0) { + double rand = Math.random(); + int deviation = (int) Math.floor(rand * this.jitter * ms); + ms = (((int) Math.floor(rand * 10)) & 1) == 0 ? ms - deviation : ms + deviation; + } + return Math.min(ms, this.max); + } + + public void reset() { + this.attempts = 0; + } + + public Backoff setMin(long min) { + this.ms = min; + return this; + } + + public Backoff setMax(long max) { + this.max = max; + return this; + } + + public Backoff setFactor(int factor) { + this.factor = factor; + return this; + } + + public Backoff setJitter(double jitter) { + this.jitter = jitter; + return this; + } + + public int getAttempts() { + return this.attempts; + } +} diff --git a/src/main/java/com/github/nkzawa/socketio/client/Manager.java b/src/main/java/com/github/nkzawa/socketio/client/Manager.java index c111905..9478869 100644 --- a/src/main/java/com/github/nkzawa/socketio/client/Manager.java +++ b/src/main/java/com/github/nkzawa/socketio/client/Manager.java @@ -1,5 +1,6 @@ package com.github.nkzawa.socketio.client; +import com.github.nkzawa.backo.Backoff; import com.github.nkzawa.emitter.Emitter; import com.github.nkzawa.socketio.parser.Packet; import com.github.nkzawa.socketio.parser.Parser; @@ -76,13 +77,13 @@ public class Manager extends Emitter { private boolean skipReconnect; private boolean reconnecting; private boolean encoding; - private boolean openReconnect; private int _reconnectionAttempts; private long _reconnectionDelay; private long _reconnectionDelayMax; + private double _randomizationFactor; + private Backoff backoff; private long _timeout; private Set connected; - private int attempts; private URI uri; private List packetBuffer; private Queue subs; @@ -129,11 +130,15 @@ public class Manager extends Emitter { this.reconnectionAttempts(opts.reconnectionAttempts != 0 ? opts.reconnectionAttempts : Integer.MAX_VALUE); this.reconnectionDelay(opts.reconnectionDelay != 0 ? opts.reconnectionDelay : 1000); this.reconnectionDelayMax(opts.reconnectionDelayMax != 0 ? opts.reconnectionDelayMax : 5000); + this.randomizationFactor(opts.randomizationFactor != 0.0 ? opts.randomizationFactor : 0.5); + this.backoff = new Backoff() + .setMin(this.reconnectionDelay()) + .setMax(this.reconnectionDelayMax()) + .setJitter(this.randomizationFactor()); this.timeout(opts.timeout < 0 ? 20000 : opts.timeout); this.readyState = ReadyState.CLOSED; this.uri = uri; this.connected = new HashSet(); - this.attempts = 0; this.encoding = false; this.packetBuffer = new ArrayList(); this.encoder = new Parser.Encoder(); @@ -147,6 +152,15 @@ public class Manager extends Emitter { } } + /** + * Update `socket.id` of all sockets + */ + private void updateSocketIds() { + for (Socket socket : this.nsps.values()) { + socket.id = this.engine.id(); + } + } + public boolean reconnection() { return this._reconnection; } @@ -171,6 +185,21 @@ public class Manager extends Emitter { public Manager reconnectionDelay(long v) { this._reconnectionDelay = v; + if (this.backoff != null) { + this.backoff.setMin(v); + } + return this; + } + + public double randomizationFactor() { + return this._randomizationFactor; + } + + public Manager randomizationFactor(double v) { + this._randomizationFactor = v; + if (this.backoff != null) { + this.backoff.setJitter(v); + } return this; } @@ -180,6 +209,9 @@ public class Manager extends Emitter { public Manager reconnectionDelayMax(long v) { this._reconnectionDelayMax = v; + if (this.backoff != null) { + this.backoff.setMax(v); + } return this; } @@ -194,8 +226,7 @@ public class Manager extends Emitter { private void maybeReconnectOnOpen() { // Only try to reconnect if it's the first time we're connecting - if (!this.openReconnect && !this.reconnecting && this._reconnection && this.attempts == 0) { - this.openReconnect = true; + if (!this.reconnecting && this._reconnection && this.backoff.getAttempts() == 0) { this.reconnect(); } } @@ -252,9 +283,10 @@ public class Manager extends Emitter { Exception err = new SocketIOException("Connection error", data instanceof Exception ? (Exception) data : null); fn.call(err); + } else { + // Only do this if there is no fn to handle the error + self.maybeReconnectOnOpen(); } - - self.maybeReconnectOnOpen(); } }); @@ -371,6 +403,7 @@ public class Manager extends Emitter { socket.on(Socket.EVENT_CONNECT, new Listener() { @Override public void call(Object... objects) { + s.id = self.engine.id(); self.connected.add(s); } }); @@ -425,6 +458,7 @@ public class Manager extends Emitter { /*package*/ void close() { this.skipReconnect = true; + this.backoff.reset(); this.readyState = ReadyState.CLOSED; if (this.engine != null) { this.engine.close(); @@ -434,6 +468,7 @@ public class Manager extends Emitter { private void onclose(String reason) { logger.fine("close"); this.cleanup(); + this.backoff.reset(); this.readyState = ReadyState.CLOSED; this.emit(EVENT_CLOSE, reason); @@ -453,15 +488,14 @@ public class Manager extends Emitter { if (this.reconnecting || this.skipReconnect) return; final Manager self = this; - this.attempts++; - if (attempts > this._reconnectionAttempts) { + if (this.backoff.getAttempts() >= this._reconnectionAttempts) { logger.fine("reconnect failed"); + this.backoff.reset(); this.emitAll(EVENT_RECONNECT_FAILED); this.reconnecting = false; } else { - long delay = this.attempts * this.reconnectionDelay(); - delay = Math.min(delay, this.reconnectionDelayMax()); + long delay = this.backoff.duration(); logger.fine(String.format("will wait %dms before reconnect attempt", delay)); this.reconnecting = true; @@ -474,8 +508,9 @@ public class Manager extends Emitter { if (self.skipReconnect) return; logger.fine("attempting reconnect"); - self.emitAll(EVENT_RECONNECT_ATTEMPT, self.attempts); - self.emitAll(EVENT_RECONNECTING, self.attempts); + int attempts = self.backoff.getAttempts(); + self.emitAll(EVENT_RECONNECT_ATTEMPT, attempts); + self.emitAll(EVENT_RECONNECTING, attempts); // check again for the case socket closed in above events if (self.skipReconnect) return; @@ -509,9 +544,10 @@ public class Manager extends Emitter { } private void onreconnect() { - int attempts = this.attempts; - this.attempts = 0; + int attempts = this.backoff.getAttempts(); this.reconnecting = false; + this.backoff.reset(); + this.updateSocketIds(); this.emitAll(EVENT_RECONNECT, attempts); } @@ -549,6 +585,7 @@ public class Manager extends Emitter { public int reconnectionAttempts; public long reconnectionDelay; public long reconnectionDelayMax; + public double randomizationFactor; public long timeout = -1; } } diff --git a/src/main/java/com/github/nkzawa/socketio/client/Socket.java b/src/main/java/com/github/nkzawa/socketio/client/Socket.java index c16c25d..137082a 100644 --- a/src/main/java/com/github/nkzawa/socketio/client/Socket.java +++ b/src/main/java/com/github/nkzawa/socketio/client/Socket.java @@ -68,6 +68,8 @@ public class Socket extends Emitter { put(EVENT_RECONNECTING, 1); }}; + /*package*/ String id; + private volatile boolean connected; private int ids; private String nsp; @@ -262,6 +264,7 @@ public class Socket extends Emitter { private void onclose(String reason) { logger.fine(String.format("close (%s)", reason)); this.connected = false; + this.id = null; this.emit(EVENT_DISCONNECT, reason); } @@ -418,11 +421,22 @@ public class Socket extends Emitter { } public Manager io() { - return io; + return this.io; } public boolean connected() { - return connected; + return this.connected; + } + + /** + * A property on the socket instance that is equal to the underlying engine.io socket id. + * + * The value is present once the socket has connected, is removed when the socket disconnects and is updated if the socket reconnects. + * + * @return a socket id + */ + public String id() { + return this.id; } private static Object[] toArray(JSONArray array) { diff --git a/src/test/java/com/github/nkzawa/backo/BackoffTest.java b/src/test/java/com/github/nkzawa/backo/BackoffTest.java new file mode 100644 index 0000000..66f8734 --- /dev/null +++ b/src/test/java/com/github/nkzawa/backo/BackoffTest.java @@ -0,0 +1,22 @@ +package com.github.nkzawa.backo; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + +public class BackoffTest { + + @Test + public void durationShouldIncreaseTheBackoff() { + Backoff b = new Backoff(); + + assertTrue(100 == b.duration()); + assertTrue(200 == b.duration()); + assertTrue(400 == b.duration()); + assertTrue(800 == b.duration()); + + b.reset(); + assertTrue(100 == b.duration()); + assertTrue(200 == b.duration()); + } +} diff --git a/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java b/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java index 54f9a0a..5ae28ff 100644 --- a/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java +++ b/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java @@ -298,6 +298,93 @@ public class ConnectionTest extends Connection { values.take(); } + @Test(timeout = TIMEOUT) + public void attemptReconnectsAfterAFailedReconnect() throws URISyntaxException, InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue(); + IO.Options opts = createOptions(); + opts.reconnection = true; + opts.timeout = 0; + opts.reconnectionAttempts = 2; + opts.reconnectionDelay = 10; + final Manager manager = new Manager(new URI(uri()), opts); + socket = manager.socket("/timeout"); + socket.once(Socket.EVENT_RECONNECT_FAILED, new Emitter.Listener() { + @Override + public void call(Object... args) { + final int[] reconnects = new int[] {0}; + Emitter.Listener reconnectCb = new Emitter.Listener() { + @Override + public void call(Object... args) { + reconnects[0]++; + } + }; + + manager.on(Manager.EVENT_RECONNECT_ATTEMPT, reconnectCb); + manager.on(Manager.EVENT_RECONNECT_FAILED, new Emitter.Listener() { + @Override + public void call(Object... args) { + values.offer(reconnects[0]); + } + }); + socket.connect(); + } + }); + socket.connect(); + assertThat((Integer)values.take(), is(2)); + socket.close(); + manager.close(); + } + + @Test(timeout = TIMEOUT) + public void reconnectDelayShouldIncreaseEveryTime() throws URISyntaxException, InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue(); + IO.Options opts = createOptions(); + opts.reconnection = true; + opts.timeout = 0; + opts.reconnectionAttempts = 5; + opts.reconnectionDelay = 10; + opts.randomizationFactor = 0.2; + final Manager manager = new Manager(new URI(uri()), opts); + socket = manager.socket("/timeout"); + + final int[] reconnects = new int[] {0}; + final boolean[] increasingDelay = new boolean[] {true}; + final long[] startTime = new long[] {0}; + final long[] prevDelay = new long[] {0}; + + socket.on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() { + @Override + public void call(Object... args) { + startTime[0] = new Date().getTime(); + } + }); + socket.on(Socket.EVENT_RECONNECT_ATTEMPT, new Emitter.Listener() { + @Override + public void call(Object... args) { + reconnects[0]++; + long currentTime = new Date().getTime(); + long delay = currentTime - startTime[0]; + if (delay <= prevDelay[0]) { + increasingDelay[0] = false; + } + prevDelay[0] = delay; + } + }); + socket.on(Socket.EVENT_RECONNECT_FAILED, new Emitter.Listener() { + @Override + public void call(Object... args) { + values.offer(true); + } + }); + + socket.connect(); + values.take(); + assertThat(reconnects[0], is(5)); + assertThat(increasingDelay[0], is(true)); + socket.close(); + manager.close(); + } + @Test(timeout = TIMEOUT) public void reconnectEventFireInSocket() throws URISyntaxException, InterruptedException { final BlockingQueue values = new LinkedBlockingQueue(); @@ -446,14 +533,14 @@ public class ConnectionTest extends Connection { manager.on(Manager.EVENT_RECONNECT_FAILED, new Emitter.Listener() { @Override public void call(Object... objects) { - socket.close(); - manager.close(); values.offer(reconnects[0]); } }); socket.open(); assertThat((Integer)values.take(), is(2)); + socket.close(); + manager.close(); } @Test(timeout = TIMEOUT) diff --git a/src/test/java/com/github/nkzawa/socketio/client/SocketTest.java b/src/test/java/com/github/nkzawa/socketio/client/SocketTest.java new file mode 100644 index 0000000..a8f1ab7 --- /dev/null +++ b/src/test/java/com/github/nkzawa/socketio/client/SocketTest.java @@ -0,0 +1,103 @@ +package com.github.nkzawa.socketio.client; + +import com.github.nkzawa.emitter.Emitter; +import com.github.nkzawa.util.Optional; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.net.URISyntaxException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.assertThat; + +@RunWith(JUnit4.class) +public class SocketTest extends Connection { + + private Socket socket; + + @Test(timeout = TIMEOUT) + public void shouldHaveAnAccessibleSocketIdEqualToTheEngineIOSocketId() throws URISyntaxException, InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue(); + socket = client(); + socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... objects) { + values.offer(Optional.ofNullable(socket.id())); + } + }); + socket.connect(); + + @SuppressWarnings("unchecked") + Optional id = values.take(); + assertThat(id.isPresent(), is(true)); + assertThat(id.get(), is(socket.io().engine.id())); + socket.disconnect(); + } + + @Test(timeout = TIMEOUT) + public void clearsSocketIdUponDisconnection() throws URISyntaxException, InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue(); + socket = client(); + socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... objects) { + socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + values.offer(Optional.ofNullable(socket.id())); + } + }); + + socket.disconnect(); + } + }); + socket.connect(); + @SuppressWarnings("unchecked") + Optional id = values.take(); + assertThat(id.isPresent(), is(false)); + } + + @Test(timeout = TIMEOUT) + public void shouldChangeSocketIdUponReconnection() throws URISyntaxException, InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue(); + socket = client(); + socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... objects) { + values.offer(Optional.ofNullable(socket.id())); + + socket.on(Socket.EVENT_RECONNECT_ATTEMPT, new Emitter.Listener() { + @Override + public void call(Object... objects) { + values.offer(Optional.ofNullable(socket.id())); + } + }); + + socket.on(Socket.EVENT_RECONNECT, new Emitter.Listener() { + @Override + public void call(Object... objects) { + values.offer(Optional.ofNullable(socket.id())); + } + }); + + socket.io().engine.close(); + } + }); + socket.connect(); + @SuppressWarnings("unchecked") + Optional id1 = values.take(); + + @SuppressWarnings("unchecked") + Optional id2 = values.take(); + assertThat(id2.isPresent(), is(false)); + + @SuppressWarnings("unchecked") + Optional id3 = values.take(); + assertThat(id3.get(), is(not(id1.get()))); + + socket.disconnect(); + } +} diff --git a/src/test/java/com/github/nkzawa/util/Optional.java b/src/test/java/com/github/nkzawa/util/Optional.java new file mode 100644 index 0000000..1f8c423 --- /dev/null +++ b/src/test/java/com/github/nkzawa/util/Optional.java @@ -0,0 +1,44 @@ +package com.github.nkzawa.util; + +import java.util.NoSuchElementException; + +public class Optional { + + static final Optional EMPTY = Optional.ofNullable(null); + + private T value; + + public static Optional of(T value) { + if (value == null) { + throw new NullPointerException(); + } + return new Optional(value); + } + + public static Optional ofNullable(T value) { + return new Optional(value); + } + + public static Optional empty() { + return EMPTY; + } + + private Optional(T value) { + this.value = value; + } + + public boolean isPresent() { + return this.value != null; + } + + public T get() { + if (this.value == null) { + throw new NoSuchElementException(); + } + return this.value; + } + + public T orElse(T other) { + return this.value != null ? this.value : other; + } +} diff --git a/src/test/resources/package.json b/src/test/resources/package.json index f96a306..5c88f47 100644 --- a/src/test/resources/package.json +++ b/src/test/resources/package.json @@ -3,6 +3,6 @@ "version": "0.0.0", "private": true, "dependencies": { - "socket.io": "1.2.0" + "socket.io": "1.3.2" } }