diff --git a/src/main/java/com/github/nkzawa/engineio/client/EngineIOException.java b/src/main/java/com/github/nkzawa/engineio/client/EngineIOException.java index 9e0b987..8fb77d3 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/EngineIOException.java +++ b/src/main/java/com/github/nkzawa/engineio/client/EngineIOException.java @@ -2,6 +2,9 @@ package com.github.nkzawa.engineio.client; public class EngineIOException extends Exception { + public String transport; + public Object code; + public EngineIOException() { super(); } diff --git a/src/main/java/com/github/nkzawa/engineio/client/Socket.java b/src/main/java/com/github/nkzawa/engineio/client/Socket.java index 6ca562e..775a878 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/Socket.java +++ b/src/main/java/com/github/nkzawa/engineio/client/Socket.java @@ -13,10 +13,7 @@ import org.json.JSONObject; import java.net.URI; import java.net.URISyntaxException; import java.util.*; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.logging.Logger; @@ -25,7 +22,7 @@ import java.util.logging.Logger; * * @see https://github.com/LearnBoost/engine.io-client */ -public abstract class Socket extends Emitter { +public class Socket extends Emitter { private static final Logger logger = Logger.getLogger(Socket.class.getName()); @@ -274,26 +271,9 @@ public abstract class Socket extends Emitter { Socket.priorWebsocketSuccess = false; - final Listener onerror = new Listener() { - @Override - public void call(Object... args) { - if (failed[0]) return; + final Runnable[] cleanup = new Runnable[1]; - failed[0] = true; - - // TODO: handle error - Exception err = args.length > 0 ? (Exception)args[0] : null; - EngineIOException error = new EngineIOException("probe error", err); - //error.transport = transport[0].name; - - transport[0].close(); - transport[0] = null; - logger.fine(String.format("probing transport '%s' failed because of error: %s", name, err)); - self.emit(EVENT_UPGRADE_ERROR, error); - } - }; - - transport[0].once(Transport.EVENT_OPEN, new Listener() { + final Listener onTransportOpen = new Listener() { @Override public void call(Object... args) { if (failed[0]) return; @@ -305,6 +285,7 @@ public abstract class Socket extends Emitter { @Override public void call(Object... args) { if (failed[0]) return; + Packet msg = (Packet)args[0]; if (Packet.PONG.equals(msg.type) && "probe".equals(msg.data)) { logger.fine(String.format("probe transport '%s' pong", name)); @@ -317,12 +298,14 @@ public abstract class Socket extends Emitter { @Override public void run() { if (failed[0]) return; - if (self.readyState == ReadyState.CLOSED || self.readyState == ReadyState.CLOSING) { + if (ReadyState.CLOSED == self.readyState || ReadyState.CLOSING == self.readyState) { return; } logger.fine("changing transport and sending upgrade packet"); - transport[0].off(Transport.EVENT_ERROR, onerror); + + cleanup[0].run(); + self.setTransport(transport[0]); Packet packet = new Packet(Packet.UPGRADE); transport[0].send(new Packet[]{packet}); @@ -341,33 +324,89 @@ public abstract class Socket extends Emitter { } }); } - }); + }; - transport[0].once(Transport.EVENT_ERROR, onerror); - - this.once(EVENT_CLOSE, new Listener() { + final Listener freezeTransport = new Listener() { @Override public void call(Object... args) { - if (transport[0] != null) { - logger.fine("socket closed prematurely - aborting probe"); - failed[0] = true; - transport[0].close(); - transport[0] = null; - } - } - }); + if (failed[0]) return; - this.once(EVENT_UPGRADING, new Listener() { + failed[0] = true; + + cleanup[0].run(); + + transport[0].close(); + transport[0] = null; + } + }; + + // Handle any error that happens while probing + final Listener onerror = new Listener() { + @Override + public void call(Object... args) { + Object err = args[0]; + EngineIOException error; + if (err instanceof Exception) { + error = new EngineIOException("probe error", (Exception)err); + } else if (err instanceof String) { + error = new EngineIOException("probe error: " + (String)err); + } else { + error = new EngineIOException("probe error"); + } + error.transport = transport[0].name; + + freezeTransport.call(); + + logger.fine(String.format("probe transport \"%s\" failed because of error: %s", name, err)); + + self.emit(EVENT_UPGRADE_ERROR, error); + } + }; + + final Listener onTransportClose = new Listener() { + @Override + public void call(Object... args) { + onerror.call("transport closed"); + } + }; + + // When the socket is closed while we're probing + final Listener onclose = new Listener() { + @Override + public void call(Object... args) { + onerror.call("socket closed"); + } + }; + + // When the socket is upgraded while we're probing + final Listener onupgrade = new Listener() { @Override public void call(Object... args) { Transport to = (Transport)args[0]; if (transport[0] != null && !to.name.equals(transport[0].name)) { logger.fine(String.format("'%s' works - aborting '%s'", to.name, transport[0].name)); - transport[0].close(); - transport[0] = null; + freezeTransport.call(); } } - }); + }; + + cleanup[0] = new Runnable() { + @Override + public void run() { + transport[0].off(Transport.EVENT_OPEN, onTransportOpen); + transport[0].off(Transport.EVENT_ERROR, onerror); + transport[0].off(Transport.EVENT_CLOSE, onTransportClose); + self.off(EVENT_CLOSE, onclose); + self.off(EVENT_UPGRADING, onupgrade); + } + }; + + transport[0].once(Transport.EVENT_OPEN, onTransportOpen); + transport[0].once(Transport.EVENT_ERROR, onerror); + transport[0].once(Transport.EVENT_CLOSE, onTransportClose); + + this.once(EVENT_CLOSE, onclose); + this.once(EVENT_UPGRADING, onupgrade); transport[0].open(); } @@ -377,7 +416,6 @@ public abstract class Socket extends Emitter { this.readyState = ReadyState.OPEN; Socket.priorWebsocketSuccess = WebSocket.NAME.equals(this.transport.name); this.emit(EVENT_OPEN); - this.onopen(); this.flush(); if (this.readyState == ReadyState.OPEN && this.upgrade && this.transport instanceof Polling) { @@ -400,18 +438,12 @@ public abstract class Socket extends Emitter { } else if (Packet.PONG.equals(packet.type)) { this.setPing(); } else if (Packet.ERROR.equals(packet.type)) { - // TODO: handle error EngineIOException err = new EngineIOException("server error"); - //err.code = packet.data; + err.code = packet.data; this.emit(EVENT_ERROR, err); } else if (Packet.MESSAGE.equals(packet.type)) { this.emit(EVENT_DATA, packet.data); this.emit(EVENT_MESSAGE, packet.data); - if (packet.data instanceof String) { - this.onmessage((String)packet.data); - } else if (packet.data instanceof byte[]) { - this.onmessage((byte[])packet.data); - } } } else { logger.fine(String.format("packet received with socket readyState '%s'", this.readyState)); @@ -630,7 +662,6 @@ public abstract class Socket extends Emitter { logger.fine(String.format("socket error %s", err)); Socket.priorWebsocketSuccess = false; this.emit(EVENT_ERROR, err); - this.onerror(err); this.onClose("transport error", err); } @@ -660,6 +691,9 @@ public abstract class Socket extends Emitter { } }); + // stop event from firing again for transport + this.transport.off(EVENT_CLOSE); + // ensure transport won't stay open this.transport.close(); @@ -674,7 +708,6 @@ public abstract class Socket extends Emitter { // emit close events this.emit(EVENT_CLOSE, reason, desc); - this.onclose(); } } @@ -688,16 +721,6 @@ public abstract class Socket extends Emitter { return filteredUpgrades; } - public void onmessage(byte[] data) {} - - public abstract void onopen(); - - public abstract void onmessage(String data); - - public abstract void onclose(); - - public abstract void onerror(Exception err); - public static class Options extends Transport.Options { /** diff --git a/src/main/java/com/github/nkzawa/engineio/client/Transport.java b/src/main/java/com/github/nkzawa/engineio/client/Transport.java index ea0c388..6ffbf3f 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/Transport.java +++ b/src/main/java/com/github/nkzawa/engineio/client/Transport.java @@ -27,6 +27,8 @@ public abstract class Transport extends Emitter { public static final String EVENT_REQUEST_HEADERS = "requestHeaders"; public static final String EVENT_RESPONSE_HEADERS = "responseHeaders"; + protected static int timestamps = 0; + public boolean writable; public String name; public Map query; diff --git a/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java b/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java index 3b19c1a..5fe520b 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java +++ b/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java @@ -194,7 +194,7 @@ abstract public class Polling extends Transport { String port = ""; if (this.timestampRequests) { - query.put(this.timestampParam, String.valueOf(new Date().getTime())); + query.put(this.timestampParam, String.valueOf(new Date().getTime()) + "-" + Transport.timestamps++); } String _query = ParseQS.encode(query); diff --git a/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java b/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java index e5a2733..981fdb2 100644 --- a/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java +++ b/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java @@ -93,23 +93,18 @@ public class ServerConnectionTest { public void openAndClose() throws URISyntaxException, InterruptedException { final BlockingQueue events = new LinkedBlockingQueue(); - socket = new Socket("ws://localhost:" + PORT) { + socket = new Socket("ws://localhost:" + PORT); + socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { @Override - public void onopen() { + public void call(Object... args) { events.offer("onopen"); } - + }).on(Socket.EVENT_CLOSE, new Emitter.Listener() { @Override - public void onmessage(String data) {} - - @Override - public void onclose() { + public void call(Object... args) { events.offer("onclose"); } - - @Override - public void onerror(Exception err) {} - }; + }); socket.open(); assertThat(events.take(), is("onopen")); @@ -121,22 +116,18 @@ public class ServerConnectionTest { public void messages() throws URISyntaxException, InterruptedException { final BlockingQueue events = new LinkedBlockingQueue(); - socket = new Socket("ws://localhost:" + PORT) { + socket = new Socket("ws://localhost:" + PORT); + socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { @Override - public void onopen() { + public void call(Object... args) { socket.send("hi"); } - + }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() { @Override - public void onmessage(String data) { - events.offer(data); + public void call(Object... args) { + events.offer((String)args[0]); } - - @Override - public void onclose() {} - @Override - public void onerror(Exception err) {} - }; + }); socket.open(); assertThat(events.take(), is("hello client")); @@ -148,16 +139,7 @@ public class ServerConnectionTest { public void handshake() throws URISyntaxException, InterruptedException { final Semaphore semaphore = new Semaphore(0); - socket = new Socket("ws://localhost:" + PORT) { - @Override - public void onopen() {} - @Override - public void onmessage(String data) {} - @Override - public void onclose() {} - @Override - public void onerror(Exception err) {} - }; + socket = new Socket("ws://localhost:" + PORT); socket.on(Socket.EVENT_HANDSHAKE, new Emitter.Listener() { @Override public void call(Object... args) { @@ -183,16 +165,7 @@ public class ServerConnectionTest { public void upgrade() throws URISyntaxException, InterruptedException { final BlockingQueue events = new LinkedBlockingQueue(); - socket = new Socket("ws://localhost:" + PORT) { - @Override - public void onopen() {} - @Override - public void onmessage(String data) {} - @Override - public void onclose() {} - @Override - public void onerror(Exception err) {} - }; + socket = new Socket("ws://localhost:" + PORT); socket.on(Socket.EVENT_UPGRADING, new Emitter.Listener() { @Override public void call(Object... args) { @@ -229,16 +202,7 @@ public class ServerConnectionTest { Socket.Options opts = new Socket.Options(); opts.transports = new String[] {Polling.NAME}; - socket = new Socket("ws://localhost:" + PORT, opts) { - @Override - public void onopen() {} - @Override - public void onmessage(String data) {} - @Override - public void onclose() {} - @Override - public void onerror(Exception err) {} - }; + socket = new Socket("ws://localhost:" + PORT, opts); socket.on(Socket.EVENT_TRANSPORT, new Emitter.Listener() { @Override public void call(Object... args) { @@ -274,16 +238,7 @@ public class ServerConnectionTest { Socket.Options opts = new Socket.Options(); opts.transports = new String[] {WebSocket.NAME}; - socket = new Socket("ws://localhost:" + PORT, opts) { - @Override - public void onopen() {} - @Override - public void onmessage(String data) {} - @Override - public void onclose() {} - @Override - public void onerror(Exception err) {} - }; + socket = new Socket("ws://localhost:" + PORT, opts); socket.on(Socket.EVENT_TRANSPORT, new Emitter.Listener() { @Override public void call(Object... args) { @@ -321,16 +276,7 @@ public class ServerConnectionTest { Socket.Options opts = new Socket.Options(); opts.port = PORT; - final Socket socket = new Socket(opts) { - @Override - public void onopen() {} - @Override - public void onmessage(String data) {} - @Override - public void onclose() {} - @Override - public void onerror(Exception err) {} - }; + final Socket socket = new Socket(opts); socket.on(Socket.EVENT_UPGRADE, new Emitter.Listener() { @Override @@ -342,16 +288,7 @@ public class ServerConnectionTest { opts.port = PORT; opts.rememberUpgrade = true; - final Socket socket2 = new Socket(opts) { - @Override - public void onopen() {} - @Override - public void onmessage(String data) {} - @Override - public void onclose() {} - @Override - public void onerror(Exception err) {} - }; + final Socket socket2 = new Socket(opts); socket2.open(); assertThat(socket2.transport.name, is(WebSocket.NAME)); } @@ -375,16 +312,7 @@ public class ServerConnectionTest { Socket.Options opts = new Socket.Options(); opts.port = PORT; - final Socket socket = new Socket(opts) { - @Override - public void onopen() {} - @Override - public void onmessage(String data) {} - @Override - public void onclose() {} - @Override - public void onerror(Exception err) {} - }; + final Socket socket = new Socket(opts); socket.on(Socket.EVENT_UPGRADE, new Emitter.Listener() { @Override @@ -396,16 +324,7 @@ public class ServerConnectionTest { opts.port = PORT; opts.rememberUpgrade = false; - final Socket socket2 = new Socket(opts) { - @Override - public void onopen() {} - @Override - public void onmessage(String data) {} - @Override - public void onclose() {} - @Override - public void onerror(Exception err) {} - }; + final Socket socket2 = new Socket(opts); socket2.open(); assertThat(socket2.transport.name, is(not(WebSocket.NAME))); } @@ -431,24 +350,22 @@ public class ServerConnectionTest { opts.port = PORT; opts.transports = new String[] {Polling.NAME}; - socket = new Socket(opts) { + socket = new Socket(opts); + socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { @Override - public void onopen() { + public void call(Object... args) { socket.send(binaryData); } + }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() { @Override - public void onmessage(byte[] data) { - assertThat(data, is(binaryData)); - socket.close(); - semaphore.release(); + public void call(Object... args) { + if (args[0] instanceof byte[]) { + assertThat((byte[])args[0], is(binaryData)); + socket.close(); + semaphore.release(); + } } - @Override - public void onmessage(String data) {} - @Override - public void onclose() {} - @Override - public void onerror(Exception err) {} - }; + }); socket.open(); semaphore.acquire(); } @@ -464,29 +381,27 @@ public class ServerConnectionTest { Socket.Options opts = new Socket.Options(); opts.port = PORT; - socket = new Socket(opts) { + socket = new Socket(opts); + socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { @Override - public void onopen() { - socket.on(Socket.EVENT_UPGRADE, new Listener() { + public void call(Object... args) { + socket.on(Socket.EVENT_UPGRADE, new Emitter.Listener() { @Override public void call(Object... args) { socket.send(binaryData); } }); } + }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() { @Override - public void onmessage(byte[] data) { - assertThat(data, is(binaryData)); - socket.close(); - semaphore.release(); + public void call(Object... args) { + if (args[0] instanceof byte[]) { + assertThat((byte[])args[0], is(binaryData)); + socket.close(); + semaphore.release(); + } } - @Override - public void onmessage(String data) {} - @Override - public void onclose() {} - @Override - public void onerror(Exception err) {} - }; + }); socket.open(); semaphore.acquire(); } diff --git a/src/test/java/com/github/nkzawa/engineio/client/SocketTest.java b/src/test/java/com/github/nkzawa/engineio/client/SocketTest.java index dc3f9a5..99c8760 100644 --- a/src/test/java/com/github/nkzawa/engineio/client/SocketTest.java +++ b/src/test/java/com/github/nkzawa/engineio/client/SocketTest.java @@ -24,16 +24,7 @@ public class SocketTest { public void filterUpgrades() { Socket.Options opts = new Socket.Options(); opts.transports = new String[] {Polling.NAME}; - Socket socket = new Socket(opts) { - @Override - public void onopen() {} - @Override - public void onmessage(String data) {} - @Override - public void onclose() {} - @Override - public void onerror(Exception err) {} - }; + Socket socket = new Socket(opts); List upgrades = new ArrayList() {{ add(Polling.NAME); add(WebSocket.NAME); @@ -51,16 +42,7 @@ public class SocketTest { @Test public void socketClosing() throws URISyntaxException, InterruptedException { final Semaphore semaphore = new Semaphore(0); - Socket socket = new Socket("ws://0.0.0.0:8080") { - @Override - public void onopen() {} - @Override - public void onmessage(String data) {} - @Override - public void onclose() {} - @Override - public void onerror(Exception err) {} - }; + Socket socket = new Socket("ws://0.0.0.0:8080"); final boolean[] closed = {false}; socket.once(Socket.EVENT_ERROR, new Emitter.Listener() { diff --git a/src/test/java/com/github/nkzawa/engineio/client/TransportTest.java b/src/test/java/com/github/nkzawa/engineio/client/TransportTest.java index d835956..5d03ff2 100644 --- a/src/test/java/com/github/nkzawa/engineio/client/TransportTest.java +++ b/src/test/java/com/github/nkzawa/engineio/client/TransportTest.java @@ -8,6 +8,7 @@ import org.junit.runners.JUnit4; import java.util.HashMap; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; @@ -27,7 +28,7 @@ public class TransportTest { }}; opt.timestampRequests = false; Polling polling = new Polling(opt); - assertThat(polling.uri(), is("http://localhost/engine.io?sid=test")); + assertThat(polling.uri(), containsString("http://localhost/engine.io?sid=test")); } @Test @@ -42,7 +43,7 @@ public class TransportTest { opt.port = 80; opt.timestampRequests = false; Polling polling = new Polling(opt); - assertThat(polling.uri(), is("http://localhost/engine.io?sid=test")); + assertThat(polling.uri(), containsString("http://localhost/engine.io?sid=test")); } @Test @@ -57,7 +58,7 @@ public class TransportTest { opt.port = 3000; opt.timestampRequests = false; Polling polling = new Polling(opt); - assertThat(polling.uri(), is("http://localhost:3000/engine.io?sid=test")); + assertThat(polling.uri(), containsString("http://localhost:3000/engine.io?sid=test")); } @Test @@ -72,7 +73,7 @@ public class TransportTest { opt.port = 443; opt.timestampRequests = false; Polling polling = new Polling(opt); - assertThat(polling.uri(), is("https://localhost/engine.io?sid=test")); + assertThat(polling.uri(), containsString("https://localhost/engine.io?sid=test")); } @Test @@ -83,7 +84,7 @@ public class TransportTest { opt.timestampParam = "t"; opt.timestampRequests = true; Polling polling = new Polling(opt); - assertThat(polling.uri().matches("http://localhost/engine.io\\?t=[0-9]+"), is(true)); + assertThat(polling.uri().matches("http://localhost/engine.io\\?(j=[0-9]+&)?t=[0-9]+-[0-9]+"), is(true)); } @Test diff --git a/src/test/resources/package.json b/src/test/resources/package.json index fe162bd..7e06042 100644 --- a/src/test/resources/package.json +++ b/src/test/resources/package.json @@ -3,7 +3,6 @@ "version": "0.0.0", "private": true, "dependencies": { - "engine.io": "1.0.5", - "engine.io-parser": "nkzawa/engine.io-parser#patch-0" + "engine.io": "1.1.0" } }