compatible with socket.io-client 1.2.0

This commit is contained in:
Naoyuki Kanezawa
2014-11-04 01:15:44 +09:00
parent cdafd9e04d
commit 1aa6e4a907
4 changed files with 202 additions and 25 deletions

View File

@@ -58,7 +58,7 @@
<dependency>
<groupId>com.github.nkzawa</groupId>
<artifactId>engine.io-client</artifactId>
<version>0.3.0</version>
<version>0.3.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.json</groupId>

View File

@@ -7,10 +7,7 @@ import com.github.nkzawa.thread.EventThread;
import javax.net.ssl.SSLContext;
import java.net.URI;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -84,7 +81,7 @@ public class Manager extends Emitter {
private long _reconnectionDelay;
private long _reconnectionDelayMax;
private long _timeout;
private int connected;
private Set<Socket> connected;
private int attempts;
private URI uri;
private List<Packet> packetBuffer;
@@ -135,7 +132,7 @@ public class Manager extends Emitter {
this.timeout(opts.timeout < 0 ? 20000 : opts.timeout);
this.readyState = ReadyState.CLOSED;
this.uri = uri;
this.connected = 0;
this.connected = new HashSet<Socket>();
this.attempts = 0;
this.encoding = false;
this.packetBuffer = new ArrayList<Packet>();
@@ -224,8 +221,8 @@ public class Manager extends Emitter {
Manager.this.engine = new Engine(Manager.this.uri, Manager.this.opts);
final com.github.nkzawa.engineio.client.Socket socket = Manager.this.engine;
final Manager self = Manager.this;
Manager.this.readyState = ReadyState.OPENING;
Manager.this.skipReconnect = false;
// propagate transport event.
socket.on(Engine.EVENT_TRANSPORT, new Listener() {
@@ -370,10 +367,11 @@ public class Manager extends Emitter {
socket = _socket;
} else {
final Manager self = this;
final Socket s = socket;
socket.on(Socket.EVENT_CONNECT, new Listener() {
@Override
public void call(Object... objects) {
self.connected++;
self.connected.add(s);
}
});
}
@@ -382,12 +380,11 @@ public class Manager extends Emitter {
}
/*package*/ void destroy(Socket socket) {
--this.connected;
if (this.connected <= 0) {
this.connected = 0;
this.connected.remove(socket);
if (this.connected.size() > 0) return;
this.close();
}
}
/*package*/ void packet(Packet packet) {
logger.fine(String.format("writing packet %s", packet));
@@ -428,8 +425,11 @@ public class Manager extends Emitter {
/*package*/ void close() {
this.skipReconnect = true;
this.readyState = ReadyState.CLOSED;
if (this.engine != null) {
this.engine.close();
}
}
private void onclose(String reason) {
logger.fine("close");
@@ -450,7 +450,7 @@ public class Manager extends Emitter {
}
private void reconnect() {
if (this.reconnecting) return;
if (this.reconnecting || this.skipReconnect) return;
final Manager self = this;
this.attempts++;
@@ -471,9 +471,15 @@ public class Manager extends Emitter {
EventThread.exec(new Runnable() {
@Override
public void run() {
if (self.skipReconnect) return;
logger.fine("attempting reconnect");
self.emitAll(EVENT_RECONNECT_ATTEMPT, self.attempts);
self.emitAll(EVENT_RECONNECTING, self.attempts);
// check again for the case socket closed in above events
if (self.skipReconnect) return;
self.open(new OpenCallback() {
@Override
public void call(Exception err) {

View File

@@ -81,10 +81,11 @@ public class Socket extends Emitter {
public Socket(Manager io, String nsp) {
this.io = io;
this.nsp = nsp;
this.subEvents();
}
private void subEvents() {
if (this.subs != null) return;
final Manager io = Socket.this.io;
Socket.this.subs = new LinkedList<On.Handle>() {{
add(On.on(io, Manager.EVENT_OPEN, new Listener() {
@@ -117,7 +118,8 @@ public class Socket extends Emitter {
public void run() {
if (Socket.this.connected) return;
Socket.this.io.open();
Socket.this.subEvents();
Socket.this.io.open(); // ensure open
if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen();
}
});
@@ -369,9 +371,13 @@ public class Socket extends Emitter {
}
private void destroy() {
if (this.subs != null) {
// clean subscriptions to avoid reconnection
for (On.Handle sub : this.subs) {
sub.destroy();
}
this.subs = null;
}
this.io.destroy(this);
}
@@ -385,15 +391,17 @@ public class Socket extends Emitter {
EventThread.exec(new Runnable() {
@Override
public void run() {
if (!Socket.this.connected) return;
if (Socket.this.connected) {
logger.fine(String.format("performing disconnect (%s)", Socket.this.nsp));
Socket.this.packet(new Packet(Parser.DISCONNECT));
}
Socket.this.destroy();
if (Socket.this.connected) {
Socket.this.onclose("io client disconnect");
}
}
});
return this;
}

View File

@@ -240,6 +240,64 @@ public class ConnectionTest extends Connection {
values.take();
}
@Test(timeout = TIMEOUT)
public void reconnectManually() throws URISyntaxException, InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
socket = client();
socket.once(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.disconnect();
}
}).once(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.once(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.disconnect();
values.offer("done");
}
});
socket.connect();
}
});
socket.connect();
values.take();
}
@Test(timeout = TIMEOUT)
public void reconnectAutomaticallyAfterReconnectingManually() throws URISyntaxException, InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
socket = client();
socket.once(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.disconnect();
}
}).once(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.on(Socket.EVENT_RECONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.disconnect();
values.offer("done");
}
});
socket.connect();
new Timer().schedule(new TimerTask() {
@Override
public void run() {
socket.io().engine.close();
}
}, 500);
}
});
socket.connect();
values.take();
}
@Test(timeout = TIMEOUT)
public void reconnectEventFireInSocket() throws URISyntaxException, InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
@@ -261,6 +319,111 @@ public class ConnectionTest extends Connection {
socket.close();
}
@Test(timeout = TIMEOUT)
public void notReconnectWhenForceClosed() throws URISyntaxException, InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
IO.Options opts = createOptions();
opts.timeout = 0;
opts.reconnectionDelay = 10;
socket = IO.socket(uri() + "/invalid", opts);
socket.on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.on(Socket.EVENT_RECONNECT_ATTEMPT, new Emitter.Listener() {
@Override
public void call(Object... args) {
values.offer(false);
}
});
socket.disconnect();
new Timer().schedule(new TimerTask() {
@Override
public void run() {
values.offer(true);
}
}, 500);
}
});
socket.connect();
assertThat((Boolean)values.take(), is(true));
}
@Test(timeout = TIMEOUT)
public void stopReconnectingWhenForceClosed() throws URISyntaxException, InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
IO.Options opts = createOptions();
opts.timeout = 0;
opts.reconnectionDelay = 10;
socket = IO.socket(uri() + "/invalid", opts);
socket.once(Socket.EVENT_RECONNECT_ATTEMPT, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.on(Socket.EVENT_RECONNECT_ATTEMPT, new Emitter.Listener() {
@Override
public void call(Object... args) {
values.offer(false);
}
});
socket.disconnect();
// set a timer to let reconnection possibly fire
new Timer().schedule(new TimerTask() {
@Override
public void run() {
values.offer(true);
}
}, 500);
}
});
socket.connect();
assertThat((Boolean) values.take(), is(true));
}
@Test(timeout = TIMEOUT)
public void stopReconnectingOnASocketAndKeepToReconnectOnAnother() throws URISyntaxException, InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
final Manager manager = new Manager(new URI(uri()));
final Socket socket1 = manager.socket("/");
final Socket socket2 = manager.socket("/asd");
manager.on(Manager.EVENT_RECONNECT_ATTEMPT, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket1.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
values.offer(false);
}
});
socket2.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
new Timer().schedule(new TimerTask() {
@Override
public void run() {
socket2.disconnect();
manager.close();
values.offer(true);
}
}, 500);
}
});
socket1.disconnect();
}
});
socket1.connect();
socket2.connect();
new Timer().schedule(new TimerTask() {
@Override
public void run() {
manager.engine.close();
}
}, 1000);
assertThat((Boolean) values.take(), is(true));
}
@Test(timeout = TIMEOUT)
public void tryToReconnectTwiceAndFailWithIncorrectAddress() throws URISyntaxException, InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();