compatible with socket.io 1.3.2

This commit is contained in:
Naoyuki Kanezawa
2015-01-26 07:00:46 +09:00
parent 8eaebfaf97
commit 8a4ffe95c5
9 changed files with 378 additions and 21 deletions

View File

@@ -58,7 +58,7 @@
<dependency> <dependency>
<groupId>com.github.nkzawa</groupId> <groupId>com.github.nkzawa</groupId>
<artifactId>engine.io-client</artifactId> <artifactId>engine.io-client</artifactId>
<version>0.3.1</version> <version>0.4.0-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.json</groupId> <groupId>org.json</groupId>

View File

@@ -0,0 +1,50 @@
package com.github.nkzawa.backo;
public class Backoff {
private long ms = 100;
private long max = 10000;
private int factor = 2;
private double jitter = 0.0;
private int attempts = 0;
public Backoff() {}
public long duration() {
long ms = this.ms * (long) Math.pow(this.factor, this.attempts++);
if (jitter != 0.0) {
double rand = Math.random();
int deviation = (int) Math.floor(rand * this.jitter * ms);
ms = (((int) Math.floor(rand * 10)) & 1) == 0 ? ms - deviation : ms + deviation;
}
return Math.min(ms, this.max);
}
public void reset() {
this.attempts = 0;
}
public Backoff setMin(long min) {
this.ms = min;
return this;
}
public Backoff setMax(long max) {
this.max = max;
return this;
}
public Backoff setFactor(int factor) {
this.factor = factor;
return this;
}
public Backoff setJitter(double jitter) {
this.jitter = jitter;
return this;
}
public int getAttempts() {
return this.attempts;
}
}

View File

@@ -1,5 +1,6 @@
package com.github.nkzawa.socketio.client; package com.github.nkzawa.socketio.client;
import com.github.nkzawa.backo.Backoff;
import com.github.nkzawa.emitter.Emitter; import com.github.nkzawa.emitter.Emitter;
import com.github.nkzawa.socketio.parser.Packet; import com.github.nkzawa.socketio.parser.Packet;
import com.github.nkzawa.socketio.parser.Parser; import com.github.nkzawa.socketio.parser.Parser;
@@ -76,13 +77,13 @@ public class Manager extends Emitter {
private boolean skipReconnect; private boolean skipReconnect;
private boolean reconnecting; private boolean reconnecting;
private boolean encoding; private boolean encoding;
private boolean openReconnect;
private int _reconnectionAttempts; private int _reconnectionAttempts;
private long _reconnectionDelay; private long _reconnectionDelay;
private long _reconnectionDelayMax; private long _reconnectionDelayMax;
private double _randomizationFactor;
private Backoff backoff;
private long _timeout; private long _timeout;
private Set<Socket> connected; private Set<Socket> connected;
private int attempts;
private URI uri; private URI uri;
private List<Packet> packetBuffer; private List<Packet> packetBuffer;
private Queue<On.Handle> subs; private Queue<On.Handle> subs;
@@ -129,11 +130,15 @@ public class Manager extends Emitter {
this.reconnectionAttempts(opts.reconnectionAttempts != 0 ? opts.reconnectionAttempts : Integer.MAX_VALUE); this.reconnectionAttempts(opts.reconnectionAttempts != 0 ? opts.reconnectionAttempts : Integer.MAX_VALUE);
this.reconnectionDelay(opts.reconnectionDelay != 0 ? opts.reconnectionDelay : 1000); this.reconnectionDelay(opts.reconnectionDelay != 0 ? opts.reconnectionDelay : 1000);
this.reconnectionDelayMax(opts.reconnectionDelayMax != 0 ? opts.reconnectionDelayMax : 5000); this.reconnectionDelayMax(opts.reconnectionDelayMax != 0 ? opts.reconnectionDelayMax : 5000);
this.randomizationFactor(opts.randomizationFactor != 0.0 ? opts.randomizationFactor : 0.5);
this.backoff = new Backoff()
.setMin(this.reconnectionDelay())
.setMax(this.reconnectionDelayMax())
.setJitter(this.randomizationFactor());
this.timeout(opts.timeout < 0 ? 20000 : opts.timeout); this.timeout(opts.timeout < 0 ? 20000 : opts.timeout);
this.readyState = ReadyState.CLOSED; this.readyState = ReadyState.CLOSED;
this.uri = uri; this.uri = uri;
this.connected = new HashSet<Socket>(); this.connected = new HashSet<Socket>();
this.attempts = 0;
this.encoding = false; this.encoding = false;
this.packetBuffer = new ArrayList<Packet>(); this.packetBuffer = new ArrayList<Packet>();
this.encoder = new Parser.Encoder(); this.encoder = new Parser.Encoder();
@@ -147,6 +152,15 @@ public class Manager extends Emitter {
} }
} }
/**
* Update `socket.id` of all sockets
*/
private void updateSocketIds() {
for (Socket socket : this.nsps.values()) {
socket.id = this.engine.id();
}
}
public boolean reconnection() { public boolean reconnection() {
return this._reconnection; return this._reconnection;
} }
@@ -171,6 +185,21 @@ public class Manager extends Emitter {
public Manager reconnectionDelay(long v) { public Manager reconnectionDelay(long v) {
this._reconnectionDelay = v; this._reconnectionDelay = v;
if (this.backoff != null) {
this.backoff.setMin(v);
}
return this;
}
public double randomizationFactor() {
return this._randomizationFactor;
}
public Manager randomizationFactor(double v) {
this._randomizationFactor = v;
if (this.backoff != null) {
this.backoff.setJitter(v);
}
return this; return this;
} }
@@ -180,6 +209,9 @@ public class Manager extends Emitter {
public Manager reconnectionDelayMax(long v) { public Manager reconnectionDelayMax(long v) {
this._reconnectionDelayMax = v; this._reconnectionDelayMax = v;
if (this.backoff != null) {
this.backoff.setMax(v);
}
return this; return this;
} }
@@ -194,8 +226,7 @@ public class Manager extends Emitter {
private void maybeReconnectOnOpen() { private void maybeReconnectOnOpen() {
// Only try to reconnect if it's the first time we're connecting // Only try to reconnect if it's the first time we're connecting
if (!this.openReconnect && !this.reconnecting && this._reconnection && this.attempts == 0) { if (!this.reconnecting && this._reconnection && this.backoff.getAttempts() == 0) {
this.openReconnect = true;
this.reconnect(); this.reconnect();
} }
} }
@@ -252,9 +283,10 @@ public class Manager extends Emitter {
Exception err = new SocketIOException("Connection error", Exception err = new SocketIOException("Connection error",
data instanceof Exception ? (Exception) data : null); data instanceof Exception ? (Exception) data : null);
fn.call(err); fn.call(err);
} else {
// Only do this if there is no fn to handle the error
self.maybeReconnectOnOpen();
} }
self.maybeReconnectOnOpen();
} }
}); });
@@ -371,6 +403,7 @@ public class Manager extends Emitter {
socket.on(Socket.EVENT_CONNECT, new Listener() { socket.on(Socket.EVENT_CONNECT, new Listener() {
@Override @Override
public void call(Object... objects) { public void call(Object... objects) {
s.id = self.engine.id();
self.connected.add(s); self.connected.add(s);
} }
}); });
@@ -425,6 +458,7 @@ public class Manager extends Emitter {
/*package*/ void close() { /*package*/ void close() {
this.skipReconnect = true; this.skipReconnect = true;
this.backoff.reset();
this.readyState = ReadyState.CLOSED; this.readyState = ReadyState.CLOSED;
if (this.engine != null) { if (this.engine != null) {
this.engine.close(); this.engine.close();
@@ -434,6 +468,7 @@ public class Manager extends Emitter {
private void onclose(String reason) { private void onclose(String reason) {
logger.fine("close"); logger.fine("close");
this.cleanup(); this.cleanup();
this.backoff.reset();
this.readyState = ReadyState.CLOSED; this.readyState = ReadyState.CLOSED;
this.emit(EVENT_CLOSE, reason); this.emit(EVENT_CLOSE, reason);
@@ -453,15 +488,14 @@ public class Manager extends Emitter {
if (this.reconnecting || this.skipReconnect) return; if (this.reconnecting || this.skipReconnect) return;
final Manager self = this; final Manager self = this;
this.attempts++;
if (attempts > this._reconnectionAttempts) { if (this.backoff.getAttempts() >= this._reconnectionAttempts) {
logger.fine("reconnect failed"); logger.fine("reconnect failed");
this.backoff.reset();
this.emitAll(EVENT_RECONNECT_FAILED); this.emitAll(EVENT_RECONNECT_FAILED);
this.reconnecting = false; this.reconnecting = false;
} else { } else {
long delay = this.attempts * this.reconnectionDelay(); long delay = this.backoff.duration();
delay = Math.min(delay, this.reconnectionDelayMax());
logger.fine(String.format("will wait %dms before reconnect attempt", delay)); logger.fine(String.format("will wait %dms before reconnect attempt", delay));
this.reconnecting = true; this.reconnecting = true;
@@ -474,8 +508,9 @@ public class Manager extends Emitter {
if (self.skipReconnect) return; if (self.skipReconnect) return;
logger.fine("attempting reconnect"); logger.fine("attempting reconnect");
self.emitAll(EVENT_RECONNECT_ATTEMPT, self.attempts); int attempts = self.backoff.getAttempts();
self.emitAll(EVENT_RECONNECTING, self.attempts); self.emitAll(EVENT_RECONNECT_ATTEMPT, attempts);
self.emitAll(EVENT_RECONNECTING, attempts);
// check again for the case socket closed in above events // check again for the case socket closed in above events
if (self.skipReconnect) return; if (self.skipReconnect) return;
@@ -509,9 +544,10 @@ public class Manager extends Emitter {
} }
private void onreconnect() { private void onreconnect() {
int attempts = this.attempts; int attempts = this.backoff.getAttempts();
this.attempts = 0;
this.reconnecting = false; this.reconnecting = false;
this.backoff.reset();
this.updateSocketIds();
this.emitAll(EVENT_RECONNECT, attempts); this.emitAll(EVENT_RECONNECT, attempts);
} }
@@ -549,6 +585,7 @@ public class Manager extends Emitter {
public int reconnectionAttempts; public int reconnectionAttempts;
public long reconnectionDelay; public long reconnectionDelay;
public long reconnectionDelayMax; public long reconnectionDelayMax;
public double randomizationFactor;
public long timeout = -1; public long timeout = -1;
} }
} }

View File

@@ -68,6 +68,8 @@ public class Socket extends Emitter {
put(EVENT_RECONNECTING, 1); put(EVENT_RECONNECTING, 1);
}}; }};
/*package*/ String id;
private volatile boolean connected; private volatile boolean connected;
private int ids; private int ids;
private String nsp; private String nsp;
@@ -262,6 +264,7 @@ public class Socket extends Emitter {
private void onclose(String reason) { private void onclose(String reason) {
logger.fine(String.format("close (%s)", reason)); logger.fine(String.format("close (%s)", reason));
this.connected = false; this.connected = false;
this.id = null;
this.emit(EVENT_DISCONNECT, reason); this.emit(EVENT_DISCONNECT, reason);
} }
@@ -418,11 +421,22 @@ public class Socket extends Emitter {
} }
public Manager io() { public Manager io() {
return io; return this.io;
} }
public boolean connected() { public boolean connected() {
return connected; return this.connected;
}
/**
* A property on the socket instance that is equal to the underlying engine.io socket id.
*
* The value is present once the socket has connected, is removed when the socket disconnects and is updated if the socket reconnects.
*
* @return a socket id
*/
public String id() {
return this.id;
} }
private static Object[] toArray(JSONArray array) { private static Object[] toArray(JSONArray array) {

View File

@@ -0,0 +1,22 @@
package com.github.nkzawa.backo;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
public class BackoffTest {
@Test
public void durationShouldIncreaseTheBackoff() {
Backoff b = new Backoff();
assertTrue(100 == b.duration());
assertTrue(200 == b.duration());
assertTrue(400 == b.duration());
assertTrue(800 == b.duration());
b.reset();
assertTrue(100 == b.duration());
assertTrue(200 == b.duration());
}
}

View File

@@ -298,6 +298,93 @@ public class ConnectionTest extends Connection {
values.take(); values.take();
} }
@Test(timeout = TIMEOUT)
public void attemptReconnectsAfterAFailedReconnect() throws URISyntaxException, InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
IO.Options opts = createOptions();
opts.reconnection = true;
opts.timeout = 0;
opts.reconnectionAttempts = 2;
opts.reconnectionDelay = 10;
final Manager manager = new Manager(new URI(uri()), opts);
socket = manager.socket("/timeout");
socket.once(Socket.EVENT_RECONNECT_FAILED, new Emitter.Listener() {
@Override
public void call(Object... args) {
final int[] reconnects = new int[] {0};
Emitter.Listener reconnectCb = new Emitter.Listener() {
@Override
public void call(Object... args) {
reconnects[0]++;
}
};
manager.on(Manager.EVENT_RECONNECT_ATTEMPT, reconnectCb);
manager.on(Manager.EVENT_RECONNECT_FAILED, new Emitter.Listener() {
@Override
public void call(Object... args) {
values.offer(reconnects[0]);
}
});
socket.connect();
}
});
socket.connect();
assertThat((Integer)values.take(), is(2));
socket.close();
manager.close();
}
@Test(timeout = TIMEOUT)
public void reconnectDelayShouldIncreaseEveryTime() throws URISyntaxException, InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
IO.Options opts = createOptions();
opts.reconnection = true;
opts.timeout = 0;
opts.reconnectionAttempts = 5;
opts.reconnectionDelay = 10;
opts.randomizationFactor = 0.2;
final Manager manager = new Manager(new URI(uri()), opts);
socket = manager.socket("/timeout");
final int[] reconnects = new int[] {0};
final boolean[] increasingDelay = new boolean[] {true};
final long[] startTime = new long[] {0};
final long[] prevDelay = new long[] {0};
socket.on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() {
@Override
public void call(Object... args) {
startTime[0] = new Date().getTime();
}
});
socket.on(Socket.EVENT_RECONNECT_ATTEMPT, new Emitter.Listener() {
@Override
public void call(Object... args) {
reconnects[0]++;
long currentTime = new Date().getTime();
long delay = currentTime - startTime[0];
if (delay <= prevDelay[0]) {
increasingDelay[0] = false;
}
prevDelay[0] = delay;
}
});
socket.on(Socket.EVENT_RECONNECT_FAILED, new Emitter.Listener() {
@Override
public void call(Object... args) {
values.offer(true);
}
});
socket.connect();
values.take();
assertThat(reconnects[0], is(5));
assertThat(increasingDelay[0], is(true));
socket.close();
manager.close();
}
@Test(timeout = TIMEOUT) @Test(timeout = TIMEOUT)
public void reconnectEventFireInSocket() throws URISyntaxException, InterruptedException { public void reconnectEventFireInSocket() throws URISyntaxException, InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>(); final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
@@ -446,14 +533,14 @@ public class ConnectionTest extends Connection {
manager.on(Manager.EVENT_RECONNECT_FAILED, new Emitter.Listener() { manager.on(Manager.EVENT_RECONNECT_FAILED, new Emitter.Listener() {
@Override @Override
public void call(Object... objects) { public void call(Object... objects) {
socket.close();
manager.close();
values.offer(reconnects[0]); values.offer(reconnects[0]);
} }
}); });
socket.open(); socket.open();
assertThat((Integer)values.take(), is(2)); assertThat((Integer)values.take(), is(2));
socket.close();
manager.close();
} }
@Test(timeout = TIMEOUT) @Test(timeout = TIMEOUT)

View File

@@ -0,0 +1,103 @@
package com.github.nkzawa.socketio.client;
import com.github.nkzawa.emitter.Emitter;
import com.github.nkzawa.util.Optional;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.net.URISyntaxException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.assertThat;
@RunWith(JUnit4.class)
public class SocketTest extends Connection {
private Socket socket;
@Test(timeout = TIMEOUT)
public void shouldHaveAnAccessibleSocketIdEqualToTheEngineIOSocketId() throws URISyntaxException, InterruptedException {
final BlockingQueue<Optional> values = new LinkedBlockingQueue<Optional>();
socket = client();
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... objects) {
values.offer(Optional.ofNullable(socket.id()));
}
});
socket.connect();
@SuppressWarnings("unchecked")
Optional<String> id = values.take();
assertThat(id.isPresent(), is(true));
assertThat(id.get(), is(socket.io().engine.id()));
socket.disconnect();
}
@Test(timeout = TIMEOUT)
public void clearsSocketIdUponDisconnection() throws URISyntaxException, InterruptedException {
final BlockingQueue<Optional> values = new LinkedBlockingQueue<Optional>();
socket = client();
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... objects) {
socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
values.offer(Optional.ofNullable(socket.id()));
}
});
socket.disconnect();
}
});
socket.connect();
@SuppressWarnings("unchecked")
Optional<String> id = values.take();
assertThat(id.isPresent(), is(false));
}
@Test(timeout = TIMEOUT)
public void shouldChangeSocketIdUponReconnection() throws URISyntaxException, InterruptedException {
final BlockingQueue<Optional> values = new LinkedBlockingQueue<Optional>();
socket = client();
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... objects) {
values.offer(Optional.ofNullable(socket.id()));
socket.on(Socket.EVENT_RECONNECT_ATTEMPT, new Emitter.Listener() {
@Override
public void call(Object... objects) {
values.offer(Optional.ofNullable(socket.id()));
}
});
socket.on(Socket.EVENT_RECONNECT, new Emitter.Listener() {
@Override
public void call(Object... objects) {
values.offer(Optional.ofNullable(socket.id()));
}
});
socket.io().engine.close();
}
});
socket.connect();
@SuppressWarnings("unchecked")
Optional<String> id1 = values.take();
@SuppressWarnings("unchecked")
Optional<String> id2 = values.take();
assertThat(id2.isPresent(), is(false));
@SuppressWarnings("unchecked")
Optional<String> id3 = values.take();
assertThat(id3.get(), is(not(id1.get())));
socket.disconnect();
}
}

View File

@@ -0,0 +1,44 @@
package com.github.nkzawa.util;
import java.util.NoSuchElementException;
public class Optional<T> {
static final Optional EMPTY = Optional.ofNullable(null);
private T value;
public static <T> Optional<T> of(T value) {
if (value == null) {
throw new NullPointerException();
}
return new Optional<T>(value);
}
public static <T> Optional<T> ofNullable(T value) {
return new Optional<T>(value);
}
public static <T> Optional<T> empty() {
return EMPTY;
}
private Optional(T value) {
this.value = value;
}
public boolean isPresent() {
return this.value != null;
}
public T get() {
if (this.value == null) {
throw new NoSuchElementException();
}
return this.value;
}
public T orElse(T other) {
return this.value != null ? this.value : other;
}
}

View File

@@ -3,6 +3,6 @@
"version": "0.0.0", "version": "0.0.0",
"private": true, "private": true,
"dependencies": { "dependencies": {
"socket.io": "1.2.0" "socket.io": "1.3.2"
} }
} }