diff --git a/pom.xml b/pom.xml index 823dbb9..efb9fcd 100644 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,7 @@ com.github.nkzawa engine.io-client - 0.3.0 + 0.3.1-SNAPSHOT org.json diff --git a/src/main/java/com/github/nkzawa/socketio/client/Manager.java b/src/main/java/com/github/nkzawa/socketio/client/Manager.java index c76dae9..c111905 100644 --- a/src/main/java/com/github/nkzawa/socketio/client/Manager.java +++ b/src/main/java/com/github/nkzawa/socketio/client/Manager.java @@ -7,10 +7,7 @@ import com.github.nkzawa.thread.EventThread; import javax.net.ssl.SSLContext; import java.net.URI; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; +import java.util.*; import java.util.concurrent.*; import java.util.logging.Level; import java.util.logging.Logger; @@ -84,7 +81,7 @@ public class Manager extends Emitter { private long _reconnectionDelay; private long _reconnectionDelayMax; private long _timeout; - private int connected; + private Set connected; private int attempts; private URI uri; private List packetBuffer; @@ -135,7 +132,7 @@ public class Manager extends Emitter { this.timeout(opts.timeout < 0 ? 20000 : opts.timeout); this.readyState = ReadyState.CLOSED; this.uri = uri; - this.connected = 0; + this.connected = new HashSet(); this.attempts = 0; this.encoding = false; this.packetBuffer = new ArrayList(); @@ -224,8 +221,8 @@ public class Manager extends Emitter { Manager.this.engine = new Engine(Manager.this.uri, Manager.this.opts); final com.github.nkzawa.engineio.client.Socket socket = Manager.this.engine; final Manager self = Manager.this; - Manager.this.readyState = ReadyState.OPENING; + Manager.this.skipReconnect = false; // propagate transport event. socket.on(Engine.EVENT_TRANSPORT, new Listener() { @@ -370,10 +367,11 @@ public class Manager extends Emitter { socket = _socket; } else { final Manager self = this; + final Socket s = socket; socket.on(Socket.EVENT_CONNECT, new Listener() { @Override public void call(Object... objects) { - self.connected++; + self.connected.add(s); } }); } @@ -382,11 +380,10 @@ public class Manager extends Emitter { } /*package*/ void destroy(Socket socket) { - --this.connected; - if (this.connected <= 0) { - this.connected = 0; - this.close(); - } + this.connected.remove(socket); + if (this.connected.size() > 0) return; + + this.close(); } /*package*/ void packet(Packet packet) { @@ -428,7 +425,10 @@ public class Manager extends Emitter { /*package*/ void close() { this.skipReconnect = true; - this.engine.close(); + this.readyState = ReadyState.CLOSED; + if (this.engine != null) { + this.engine.close(); + } } private void onclose(String reason) { @@ -450,7 +450,7 @@ public class Manager extends Emitter { } private void reconnect() { - if (this.reconnecting) return; + if (this.reconnecting || this.skipReconnect) return; final Manager self = this; this.attempts++; @@ -471,9 +471,15 @@ public class Manager extends Emitter { EventThread.exec(new Runnable() { @Override public void run() { + if (self.skipReconnect) return; + logger.fine("attempting reconnect"); self.emitAll(EVENT_RECONNECT_ATTEMPT, self.attempts); self.emitAll(EVENT_RECONNECTING, self.attempts); + + // check again for the case socket closed in above events + if (self.skipReconnect) return; + self.open(new OpenCallback() { @Override public void call(Exception err) { diff --git a/src/main/java/com/github/nkzawa/socketio/client/Socket.java b/src/main/java/com/github/nkzawa/socketio/client/Socket.java index e22dd7a..2204db2 100644 --- a/src/main/java/com/github/nkzawa/socketio/client/Socket.java +++ b/src/main/java/com/github/nkzawa/socketio/client/Socket.java @@ -81,10 +81,11 @@ public class Socket extends Emitter { public Socket(Manager io, String nsp) { this.io = io; this.nsp = nsp; - this.subEvents(); } private void subEvents() { + if (this.subs != null) return; + final Manager io = Socket.this.io; Socket.this.subs = new LinkedList() {{ add(On.on(io, Manager.EVENT_OPEN, new Listener() { @@ -117,7 +118,8 @@ public class Socket extends Emitter { public void run() { if (Socket.this.connected) return; - Socket.this.io.open(); + Socket.this.subEvents(); + Socket.this.io.open(); // ensure open if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen(); } }); @@ -369,8 +371,12 @@ public class Socket extends Emitter { } private void destroy() { - for (On.Handle sub : this.subs) { - sub.destroy(); + if (this.subs != null) { + // clean subscriptions to avoid reconnection + for (On.Handle sub : this.subs) { + sub.destroy(); + } + this.subs = null; } this.io.destroy(this); @@ -385,14 +391,16 @@ public class Socket extends Emitter { EventThread.exec(new Runnable() { @Override public void run() { - if (!Socket.this.connected) return; - - logger.fine(String.format("performing disconnect (%s)", Socket.this.nsp)); - Socket.this.packet(new Packet(Parser.DISCONNECT)); + if (Socket.this.connected) { + logger.fine(String.format("performing disconnect (%s)", Socket.this.nsp)); + Socket.this.packet(new Packet(Parser.DISCONNECT)); + } Socket.this.destroy(); - Socket.this.onclose("io client disconnect"); + if (Socket.this.connected) { + Socket.this.onclose("io client disconnect"); + } } }); return this; diff --git a/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java b/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java index 49fd4ad..54f9a0a 100644 --- a/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java +++ b/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java @@ -240,6 +240,64 @@ public class ConnectionTest extends Connection { values.take(); } + @Test(timeout = TIMEOUT) + public void reconnectManually() throws URISyntaxException, InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue(); + socket = client(); + socket.once(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.disconnect(); + } + }).once(Socket.EVENT_DISCONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.once(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.disconnect(); + values.offer("done"); + } + }); + socket.connect(); + } + }); + socket.connect(); + values.take(); + } + + @Test(timeout = TIMEOUT) + public void reconnectAutomaticallyAfterReconnectingManually() throws URISyntaxException, InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue(); + socket = client(); + socket.once(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.disconnect(); + } + }).once(Socket.EVENT_DISCONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.on(Socket.EVENT_RECONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.disconnect(); + values.offer("done"); + } + }); + socket.connect(); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + socket.io().engine.close(); + } + }, 500); + } + }); + socket.connect(); + values.take(); + } + @Test(timeout = TIMEOUT) public void reconnectEventFireInSocket() throws URISyntaxException, InterruptedException { final BlockingQueue values = new LinkedBlockingQueue(); @@ -261,6 +319,111 @@ public class ConnectionTest extends Connection { socket.close(); } + @Test(timeout = TIMEOUT) + public void notReconnectWhenForceClosed() throws URISyntaxException, InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue(); + IO.Options opts = createOptions(); + opts.timeout = 0; + opts.reconnectionDelay = 10; + socket = IO.socket(uri() + "/invalid", opts); + socket.on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.on(Socket.EVENT_RECONNECT_ATTEMPT, new Emitter.Listener() { + @Override + public void call(Object... args) { + values.offer(false); + } + }); + socket.disconnect(); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + values.offer(true); + } + }, 500); + } + }); + socket.connect(); + assertThat((Boolean)values.take(), is(true)); + } + + @Test(timeout = TIMEOUT) + public void stopReconnectingWhenForceClosed() throws URISyntaxException, InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue(); + IO.Options opts = createOptions(); + opts.timeout = 0; + opts.reconnectionDelay = 10; + socket = IO.socket(uri() + "/invalid", opts); + socket.once(Socket.EVENT_RECONNECT_ATTEMPT, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.on(Socket.EVENT_RECONNECT_ATTEMPT, new Emitter.Listener() { + @Override + public void call(Object... args) { + values.offer(false); + } + }); + socket.disconnect(); + // set a timer to let reconnection possibly fire + new Timer().schedule(new TimerTask() { + @Override + public void run() { + values.offer(true); + } + }, 500); + } + }); + socket.connect(); + assertThat((Boolean) values.take(), is(true)); + } + + @Test(timeout = TIMEOUT) + public void stopReconnectingOnASocketAndKeepToReconnectOnAnother() throws URISyntaxException, InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue(); + final Manager manager = new Manager(new URI(uri())); + final Socket socket1 = manager.socket("/"); + final Socket socket2 = manager.socket("/asd"); + + manager.on(Manager.EVENT_RECONNECT_ATTEMPT, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket1.on(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + values.offer(false); + } + }); + socket2.on(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + new Timer().schedule(new TimerTask() { + @Override + public void run() { + socket2.disconnect(); + manager.close(); + values.offer(true); + } + }, 500); + } + }); + socket1.disconnect(); + } + }); + + socket1.connect(); + socket2.connect(); + + new Timer().schedule(new TimerTask() { + @Override + public void run() { + manager.engine.close(); + } + }, 1000); + + assertThat((Boolean) values.take(), is(true)); + } + @Test(timeout = TIMEOUT) public void tryToReconnectTwiceAndFailWithIncorrectAddress() throws URISyntaxException, InterruptedException { final BlockingQueue values = new LinkedBlockingQueue();