From 9d6751d6b4f2eca241e7bebb82053787d8af877a Mon Sep 17 00:00:00 2001 From: Naoyuki Kanezawa Date: Tue, 25 Mar 2014 23:21:26 +0900 Subject: [PATCH] support the rememberUpgrade option --- .../github/nkzawa/engineio/client/Socket.java | 38 ++++-- .../nkzawa/engineio/client/Transport.java | 3 + .../client/transports/PollingXHR.java | 8 +- .../engineio/client/transports/WebSocket.java | 12 +- .../engineio/client/ServerConnectionTest.java | 123 ++++++++++++++++++ .../nkzawa/engineio/client/SocketTest.java | 9 +- .../nkzawa/engineio/client/TransportTest.java | 2 + 7 files changed, 175 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/github/nkzawa/engineio/client/Socket.java b/src/main/java/com/github/nkzawa/engineio/client/Socket.java index 377ca7e..3c3278f 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/Socket.java +++ b/src/main/java/com/github/nkzawa/engineio/client/Socket.java @@ -91,10 +91,13 @@ public abstract class Socket extends Emitter { */ public static final int protocol = Parser.protocol; + public static boolean priorWebsocketSuccess = false; + private boolean secure; private boolean upgrade; private boolean timestampRequests; private boolean upgrading; + private boolean rememberUpgrade; private int port; private int policyPort; private int prevBufferLen; @@ -109,7 +112,7 @@ public abstract class Socket extends Emitter { private Map query; private LinkedList writeBuffer = new LinkedList(); private LinkedList callbackBuffer = new LinkedList(); - private Transport transport; + /*package*/ Transport transport; private Future pingTimeoutTimer; private Future pingIntervalTimer; @@ -117,6 +120,10 @@ public abstract class Socket extends Emitter { private ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(); + public Socket() { + this(new Options()); + } + /** * Creates a socket. * @@ -167,6 +174,7 @@ public abstract class Socket extends Emitter { this.transports = new ArrayList(Arrays.asList(opts.transports != null ? opts.transports : new String[]{Polling.NAME, WebSocket.NAME})); this.policyPort = opts.policyPort != 0 ? opts.policyPort : 843; + this.rememberUpgrade = opts.rememberUpgrade; } /** @@ -176,7 +184,12 @@ public abstract class Socket extends Emitter { EventThread.exec(new Runnable() { @Override public void run() { - String transportName = Socket.this.transports.get(0); + String transportName; + if (Socket.this.rememberUpgrade && Socket.priorWebsocketSuccess && Socket.this.transports.contains(WebSocket.NAME)) { + transportName = WebSocket.NAME; + } else { + transportName = Socket.this.transports.get(0); + } Socket.this.readyState = ReadyState.OPENING; Transport transport = Socket.this.createTransport(transportName); Socket.this.setTransport(transport); @@ -204,6 +217,7 @@ public abstract class Socket extends Emitter { opts.timestampRequests = this.timestampRequests; opts.timestampParam = this.timestampParam; opts.policyPort = this.policyPort; + opts.socket = this; if (WebSocket.NAME.equals(name)) { return new WebSocket(opts); @@ -256,6 +270,8 @@ public abstract class Socket extends Emitter { final boolean[] failed = new boolean[] {false}; final Socket self = this; + Socket.priorWebsocketSuccess = false; + final Listener onerror = new Listener() { @Override public void call(Object... args) { @@ -292,6 +308,7 @@ public abstract class Socket extends Emitter { logger.fine(String.format("probe transport '%s' pong", name)); self.upgrading = true; self.emit(EVENT_UPGRADING, transport[0]); + Socket.priorWebsocketSuccess = WebSocket.NAME.equals(transport[0].name); logger.fine(String.format("pausing current transport '%s'", self.transport.name)); ((Polling)self.transport).pause(new Runnable() { @@ -304,10 +321,10 @@ public abstract class Socket extends Emitter { logger.fine("changing transport and sending upgrade packet"); transport[0].off(Transport.EVENT_ERROR, onerror); - self.emit(EVENT_UPGRADE, transport[0]); self.setTransport(transport[0]); Packet packet = new Packet(Packet.UPGRADE); transport[0].send(new Packet[]{packet}); + self.emit(EVENT_UPGRADE, transport[0]); transport[0] = null; self.upgrading = false; self.flush(); @@ -356,6 +373,7 @@ public abstract class Socket extends Emitter { private void onOpen() { logger.fine("socket open"); this.readyState = ReadyState.OPEN; + Socket.priorWebsocketSuccess = WebSocket.NAME.equals(this.transport.name); this.emit(EVENT_OPEN); this.onopen(); this.flush(); @@ -574,6 +592,7 @@ public abstract class Socket extends Emitter { private void onError(Exception err) { logger.fine(String.format("socket error %s", err)); + Socket.priorWebsocketSuccess = false; this.emit(EVENT_ERROR, err); this.onerror(err); this.onClose("transport error", err); @@ -605,21 +624,21 @@ public abstract class Socket extends Emitter { } }); + // ensure transport won't stay open + this.transport.close(); + // ignore further transport communication this.transport.off(); // set ready state - ReadyState prev = this.readyState; this.readyState = ReadyState.CLOSED; // clear session id this.id = null; - // emit events - if (prev == ReadyState.OPEN) { - this.emit(EVENT_CLOSE, reason, desc); - this.onclose(); - } + // emit close events + this.emit(EVENT_CLOSE, reason, desc); + this.onclose(); } } @@ -653,6 +672,7 @@ public abstract class Socket extends Emitter { */ public boolean upgrade = true; + public boolean rememberUpgrade; public String host; public String query; diff --git a/src/main/java/com/github/nkzawa/engineio/client/Transport.java b/src/main/java/com/github/nkzawa/engineio/client/Transport.java index 2dbb7c2..67795ff 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/Transport.java +++ b/src/main/java/com/github/nkzawa/engineio/client/Transport.java @@ -36,6 +36,7 @@ public abstract class Transport extends Emitter { protected String path; protected String hostname; protected String timestampParam; + protected Socket socket; protected ReadyState readyState; @@ -47,6 +48,7 @@ public abstract class Transport extends Emitter { this.query = opts.query; this.timestampParam = opts.timestampParam; this.timestampRequests = opts.timestampRequests; + this.socket = opts.socket; } protected Transport onError(String msg, Exception desc) { @@ -131,5 +133,6 @@ public abstract class Transport extends Emitter { public int port; public int policyPort; public Map query; + protected Socket socket; } } diff --git a/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java b/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java index 0656eda..ad79a8c 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java +++ b/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java @@ -244,10 +244,12 @@ public class PollingXHR extends Polling { } private void cleanup() { - if (xhr != null) { - xhr.disconnect(); - xhr = null; + if (xhr == null) { + return; } + + xhr.disconnect(); + xhr = null; } public void abort() { diff --git a/src/main/java/com/github/nkzawa/engineio/client/transports/WebSocket.java b/src/main/java/com/github/nkzawa/engineio/client/transports/WebSocket.java index 3ebdb21..63c5f8d 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/transports/WebSocket.java +++ b/src/main/java/com/github/nkzawa/engineio/client/transports/WebSocket.java @@ -22,7 +22,7 @@ public class WebSocket extends Transport { public static final String NAME = "websocket"; - private WebSocketClient socket; + private WebSocketClient ws; public WebSocket(Options opts) { @@ -40,7 +40,7 @@ public class WebSocket extends Transport { final WebSocket self = this; try { - this.socket = new WebSocketClient(new URI(this.uri()), new Draft_17(), headers, 0) { + this.ws = new WebSocketClient(new URI(this.uri()), new Draft_17(), headers, 0) { @Override public void onOpen(final ServerHandshake serverHandshake) { EventThread.exec(new Runnable() { @@ -87,7 +87,7 @@ public class WebSocket extends Transport { }); } }; - this.socket.connect(); + this.ws.connect(); } catch (URISyntaxException e) { throw new RuntimeException(e); } @@ -97,7 +97,7 @@ public class WebSocket extends Transport { final WebSocket self = this; this.writable = false; for (Packet packet : packets) { - this.socket.send(Parser.encodePacket(packet)); + this.ws.send(Parser.encodePacket(packet)); } final Runnable ondrain = new Runnable() { @@ -119,8 +119,8 @@ public class WebSocket extends Transport { } protected void doClose() { - if (this.socket != null) { - this.socket.close(); + if (this.ws != null) { + this.ws.close(); } } diff --git a/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java b/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java index edb4533..34fdad0 100644 --- a/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java +++ b/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java @@ -311,4 +311,127 @@ public class ServerConnectionTest { assertThat(messages.take(), is("foo")); socket.close(); } + + @Test(timeout = TIMEOUT) + public void rememberWebsocket() throws URISyntaxException, InterruptedException { + final Semaphore semaphore = new Semaphore(0); + + EventThread.exec(new Runnable() { + @Override + public void run() { + Socket.Options opts = new Socket.Options(); + opts.port = PORT; + + final Socket socket = new Socket(opts) { + @Override + public void onopen() { + } + + @Override + public void onmessage(String data) { + } + + @Override + public void onclose() { + } + + @Override + public void onerror(Exception err) { + } + }; + + socket.on(Socket.EVENT_UPGRADE, new Emitter.Listener() { + @Override + public void call(Object... args) { + Transport transport = (Transport) args[0]; + socket.close(); + if (WebSocket.NAME.equals(transport.name)) { + Socket.Options opts = new Socket.Options(); + opts.port = PORT; + opts.rememberUpgrade = true; + + final Socket socket2 = new Socket(opts) { + @Override + public void onopen() { + } + + @Override + public void onmessage(String data) { + } + + @Override + public void onclose() { + } + + @Override + public void onerror(Exception err) { + } + }; + socket2.open(); + assertThat(socket2.transport.name, is(WebSocket.NAME)); + } + semaphore.release(); + } + }); + socket.open(); + assertThat(socket.transport.name, is(Polling.NAME)); + } + }); + semaphore.acquire(); + } + + @Test(timeout = TIMEOUT) + public void notRememberWebsocket() throws URISyntaxException, InterruptedException { + final Semaphore semaphore = new Semaphore(0); + + EventThread.exec(new Runnable() { + @Override + public void run() { + Socket.Options opts = new Socket.Options(); + opts.port = PORT; + + final Socket socket = new Socket(opts) { + @Override + public void onopen() {} + @Override + public void onmessage(String data) {} + @Override + public void onclose() {} + @Override + public void onerror(Exception err) {} + }; + + socket.on(Socket.EVENT_UPGRADE, new Emitter.Listener() { + @Override + public void call(Object... args) { + Transport transport = (Transport)args[0]; + socket.close(); + if (WebSocket.NAME.equals(transport.name)) { + Socket.Options opts = new Socket.Options(); + opts.port = PORT; + opts.rememberUpgrade = false; + + final Socket socket2 = new Socket(opts) { + @Override + public void onopen() {} + @Override + public void onmessage(String data) {} + @Override + public void onclose() {} + @Override + public void onerror(Exception err) {} + }; + socket2.open(); + assertThat(socket2.transport.name, is(not(WebSocket.NAME))); + } + semaphore.release(); + } + }); + socket.open(); + assertThat(socket.transport.name, is(Polling.NAME)); + } + }); + semaphore.acquire(); + } + } diff --git a/src/test/java/com/github/nkzawa/engineio/client/SocketTest.java b/src/test/java/com/github/nkzawa/engineio/client/SocketTest.java index 8722081..dc3f9a5 100644 --- a/src/test/java/com/github/nkzawa/engineio/client/SocketTest.java +++ b/src/test/java/com/github/nkzawa/engineio/client/SocketTest.java @@ -12,6 +12,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.Semaphore; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; @@ -42,13 +43,14 @@ public class SocketTest { } /** - * should not emit close on incorrect connection. + * should emit close on incorrect connection. * * @throws URISyntaxException * @throws InterruptedException */ @Test public void socketClosing() throws URISyntaxException, InterruptedException { + final Semaphore semaphore = new Semaphore(0); Socket socket = new Socket("ws://0.0.0.0:8080") { @Override public void onopen() {} @@ -68,7 +70,8 @@ public class SocketTest { timer.schedule(new TimerTask() { @Override public void run() { - assertThat(closed[0], is(false)); + assertThat(closed[0], is(true)); + semaphore.release(); } }, 20); } @@ -81,5 +84,7 @@ public class SocketTest { } }); socket.open(); + semaphore.acquire(); } + } diff --git a/src/test/java/com/github/nkzawa/engineio/client/TransportTest.java b/src/test/java/com/github/nkzawa/engineio/client/TransportTest.java index 3f7c926..d835956 100644 --- a/src/test/java/com/github/nkzawa/engineio/client/TransportTest.java +++ b/src/test/java/com/github/nkzawa/engineio/client/TransportTest.java @@ -14,6 +14,8 @@ import static org.junit.Assert.assertThat; @RunWith(JUnit4.class) public class TransportTest { + // NOTE: tests for the rememberUpgrade option are on ServerConnectionTest. + @Test public void uri() { Transport.Options opt = new Transport.Options();