diff --git a/src/main/java/com/github/nkzawa/socketio/client/Manager.java b/src/main/java/com/github/nkzawa/socketio/client/Manager.java index 9cc62c4..c3300c2 100644 --- a/src/main/java/com/github/nkzawa/socketio/client/Manager.java +++ b/src/main/java/com/github/nkzawa/socketio/client/Manager.java @@ -12,6 +12,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.concurrent.*; +import java.util.logging.Level; import java.util.logging.Logger; @@ -63,6 +64,8 @@ public class Manager extends Emitter { public static final String EVENT_RECONNECT_ATTEMPT = "reconnect_attempt"; + public static final String EVENT_RECONNECTING = "reconnecting"; + /*package*/ static SSLContext defaultSSLContext; /*package*/ ReadyState readyState = null; @@ -135,6 +138,13 @@ public class Manager extends Emitter { this.decoder = new Parser.Decoder(); } + private void emitAll(String event, Object... args) { + this.emit(event, args); + for (Socket socket : this.nsps.values()) { + socket.emit(event, args); + } + } + public boolean reconnection() { return this._reconnection; } @@ -226,7 +236,7 @@ public class Manager extends Emitter { logger.fine("connect_error"); self.cleanup(); self.readyState = ReadyState.CLOSED; - self.emit(EVENT_CONNECT_ERROR, data); + self.emitAll(EVENT_CONNECT_ERROR, data); if (fn != null) { Exception err = new SocketIOException("Connection error", data instanceof Exception ? (Exception) data : null); @@ -251,7 +261,7 @@ public class Manager extends Emitter { openSub.destroy(); socket.close(); socket.emit(Engine.EVENT_ERROR, new SocketIOException("timeout")); - self.emit(EVENT_CONNECT_TIMEOUT, timeout); + self.emitAll(EVENT_CONNECT_TIMEOUT, timeout); } }); } @@ -327,7 +337,8 @@ public class Manager extends Emitter { } private void onerror(Exception err) { - this.emit(EVENT_ERROR, err); + logger.log(Level.FINE, "error", err); + this.emitAll(EVENT_ERROR, err); } /** @@ -423,7 +434,7 @@ public class Manager extends Emitter { if (attempts > this._reconnectionAttempts) { logger.fine("reconnect failed"); - this.emit(EVENT_RECONNECT_FAILED); + this.emitAll(EVENT_RECONNECT_FAILED); this.reconnecting = false; } else { long delay = this.attempts * this.reconnectionDelay(); @@ -438,7 +449,8 @@ public class Manager extends Emitter { @Override public void run() { logger.fine("attempting reconnect"); - self.emit(EVENT_RECONNECT_ATTEMPT); + self.emitAll(EVENT_RECONNECT_ATTEMPT, self.attempts); + self.emitAll(EVENT_RECONNECTING, self.attempts); self.open(new OpenCallback() { @Override public void call(Exception err) { @@ -446,7 +458,7 @@ public class Manager extends Emitter { logger.fine("reconnect attempt error"); self.reconnecting = false; self.reconnect(); - self.emit(EVENT_RECONNECT_ERROR, err); + self.emitAll(EVENT_RECONNECT_ERROR, err); } else { logger.fine("reconnect success"); self.onreconnect(); @@ -471,7 +483,7 @@ public class Manager extends Emitter { int attempts = this.attempts; this.attempts = 0; this.reconnecting = false; - this.emit(EVENT_RECONNECT, attempts); + this.emitAll(EVENT_RECONNECT, attempts); } diff --git a/src/main/java/com/github/nkzawa/socketio/client/Socket.java b/src/main/java/com/github/nkzawa/socketio/client/Socket.java index 2848221..e0c0a85 100644 --- a/src/main/java/com/github/nkzawa/socketio/client/Socket.java +++ b/src/main/java/com/github/nkzawa/socketio/client/Socket.java @@ -41,10 +41,31 @@ public class Socket extends Emitter { public static final String EVENT_MESSAGE = "message"; + public static final String EVENT_CONNECT_ERROR = Manager.EVENT_CONNECT_ERROR; + + public static final String EVENT_CONNECT_TIMEOUT = Manager.EVENT_CONNECT_TIMEOUT; + + public static final String EVENT_RECONNECT = Manager.EVENT_RECONNECT; + + public static final String EVENT_RECONNECT_ERROR = Manager.EVENT_RECONNECT_ERROR; + + public static final String EVENT_RECONNECT_FAILED = Manager.EVENT_RECONNECT_FAILED; + + public static final String EVENT_RECONNECT_ATTEMPT = Manager.EVENT_RECONNECT_ATTEMPT; + + public static final String EVENT_RECONNECTING = Manager.EVENT_RECONNECTING; + private static Map events = new HashMap() {{ put(EVENT_CONNECT, 1); + put(EVENT_CONNECT_ERROR, 1); + put(EVENT_CONNECT_TIMEOUT, 1); put(EVENT_DISCONNECT, 1); put(EVENT_ERROR, 1); + put(EVENT_RECONNECT, 1); + put(EVENT_RECONNECT_ATTEMPT, 1); + put(EVENT_RECONNECT_FAILED, 1); + put(EVENT_RECONNECT_ERROR, 1); + put(EVENT_RECONNECTING, 1); }}; private boolean connected; @@ -54,11 +75,37 @@ public class Socket extends Emitter { /*package*/ Manager io; private Map acks = new HashMap(); private Queue subs; - private final Queue> buffer = new LinkedList>(); + private final Queue> receiveBuffer = new LinkedList>(); + private final Queue> sendBuffer = new LinkedList>(); public Socket(Manager io, String nsp) { this.io = io; this.nsp = nsp; + this.subEvents(); + } + + private void subEvents() { + final Manager io = Socket.this.io; + Socket.this.subs = new LinkedList() {{ + add(On.on(io, Manager.EVENT_OPEN, new Listener() { + @Override + public void call(Object... args) { + Socket.this.onopen(); + } + })); + add(On.on(io, Manager.EVENT_PACKET, new Listener() { + @Override + public void call(Object... args) { + Socket.this.onpacket((Packet) args[0]); + } + })); + add(On.on(io, Manager.EVENT_CLOSE, new Listener() { + @Override + public void call(Object... args) { + Socket.this.onclose(args.length > 0 ? (String) args[0] : null); + } + })); + }}; } /** @@ -69,34 +116,8 @@ public class Socket extends Emitter { @Override public void run() { if (Socket.this.connected) return; - final Manager io = Socket.this.io; - io.open(); - Socket.this.subs = new LinkedList() {{ - add(On.on(io, Manager.EVENT_OPEN, new Listener() { - @Override - public void call(Object... args) { - Socket.this.onopen(); - } - })); - add(On.on(io, Manager.EVENT_ERROR, new Listener() { - @Override - public void call(Object... args) { - Socket.this.onerror(args.length > 0 ? (Exception) args[0] : null); - } - })); - add(On.on(io, Manager.EVENT_PACKET, new Listener() { - @Override - public void call(Object... args) { - Socket.this.onpacket((Packet) args[0]); - } - })); - add(On.on(io, Manager.EVENT_CLOSE, new Listener() { - @Override - public void call(Object... args) { - Socket.this.onclose(args.length > 0 ? (String) args[0] : null); - } - })); - }}; + + Socket.this.io.open(); if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen(); } }); @@ -163,7 +184,11 @@ public class Socket extends Emitter { packet.id = Socket.this.ids++; } - Socket.this.packet(packet); + if (Socket.this.connected) { + Socket.this.packet(packet); + } else { + Socket.this.sendBuffer.add(packet); + } } }); return this; @@ -220,10 +245,6 @@ public class Socket extends Emitter { this.io.packet(packet); } - private void onerror(Exception err) { - this.emit(EVENT_ERROR, err); - } - private void onopen() { logger.fine("transport is open - connecting"); @@ -286,7 +307,7 @@ public class Socket extends Emitter { String event = (String)args.remove(0); super.emit(event, args.toArray()); } else { - this.buffer.add(args); + this.receiveBuffer.add(args); } } @@ -328,10 +349,17 @@ public class Socket extends Emitter { private void emitBuffered() { List data; - while ((data = this.buffer.poll()) != null) { + while ((data = this.receiveBuffer.poll()) != null) { String event = (String)data.get(0); super.emit(event, data.toArray()); } + this.receiveBuffer.clear(); + + Packet packet; + while ((packet = this.sendBuffer.poll()) != null) { + this.packet(packet); + } + this.sendBuffer.clear(); } private void ondisconnect() { diff --git a/src/main/java/com/github/nkzawa/socketio/client/Url.java b/src/main/java/com/github/nkzawa/socketio/client/Url.java index 4e0de0a..32ed79f 100644 --- a/src/main/java/com/github/nkzawa/socketio/client/Url.java +++ b/src/main/java/com/github/nkzawa/socketio/client/Url.java @@ -2,40 +2,68 @@ package com.github.nkzawa.socketio.client; import java.net.MalformedURLException; import java.net.URI; +import java.net.URISyntaxException; import java.net.URL; +import java.util.regex.Pattern; public class Url { + private static Pattern PATTERN_HTTP = Pattern.compile("^http|ws$"); + private static Pattern PATTERN_HTTPS = Pattern.compile("^(http|ws)s$"); + private Url() {} + public static URL parse(String uri) throws URISyntaxException, MalformedURLException { + return parse(new URI(uri)); + } + public static URL parse(URI uri) throws MalformedURLException { String protocol = uri.getScheme(); if (protocol == null || !protocol.matches("^https?|wss?$")) { - uri = uri.resolve("https://" + uri.getAuthority()); + protocol = "https"; } int port = uri.getPort(); - if (protocol != null && ((protocol.matches("^http|ws$") && port == 80) || - (protocol.matches("^(http|ws)s$") && port == 443))) { - uri = uri.resolve("//" + uri.getHost()); + if (port == -1) { + if (PATTERN_HTTP.matcher(protocol).matches()) { + port = 80; + } else if (PATTERN_HTTPS.matcher(protocol).matches()) { + port = 443; + } } - String path = uri.getPath(); + String path = uri.getRawPath(); if (path == null || path.length() == 0) { - uri = uri.resolve("/"); + path = "/"; } - return uri.toURL(); + String userInfo = uri.getRawUserInfo(); + String query = uri.getRawQuery(); + String fragment = uri.getRawFragment(); + return new URL(protocol + "://" + + (userInfo != null ? userInfo + "@" : "") + + uri.getHost() + + (port != -1 ? ":" + port : "") + + path + + (query != null ? "?" + query : "") + + (fragment != null ? "#" + fragment : "")); + } + + public static String extractId(String url) throws MalformedURLException { + return extractId(new URL(url)); } public static String extractId(URL url) { String protocol = url.getProtocol(); int port = url.getPort(); - if ((protocol.matches("^http|ws$") && port == 80) || - (protocol.matches("^(http|ws)s$") && port == 443)) { - port = -1; + if (port == -1) { + if (PATTERN_HTTP.matcher(protocol).matches()) { + port = 80; + } else if (PATTERN_HTTPS.matcher(protocol).matches()) { + port = 443; + } } - return protocol + "://" + url.getHost() + (port != -1 ? ":" + port : ""); + return protocol + "://" + url.getHost() + ":" + port; } } diff --git a/src/test/java/com/github/nkzawa/socketio/client/Connection.java b/src/test/java/com/github/nkzawa/socketio/client/Connection.java index 07b10b7..22dcf56 100644 --- a/src/test/java/com/github/nkzawa/socketio/client/Connection.java +++ b/src/test/java/com/github/nkzawa/socketio/client/Connection.java @@ -91,7 +91,6 @@ public abstract class Connection { IO.Options createOptions() { IO.Options opts = new IO.Options(); opts.forceNew = true; - opts.reconnection = false; return opts; } diff --git a/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java b/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java index 119250c..732f3c0 100644 --- a/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java +++ b/src/test/java/com/github/nkzawa/socketio/client/ConnectionTest.java @@ -196,21 +196,42 @@ public class ConnectionTest extends Connection { @Test(timeout = TIMEOUT) public void reconnectByDefault() throws URISyntaxException, InterruptedException { final CountDownLatch latch = new CountDownLatch(1); - socket = IO.socket(uri()); - socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { + socket = client(); + socket.io.on(Manager.EVENT_RECONNECT, new Emitter.Listener() { @Override public void call(Object... objects) { - socket.io.engine.close(); - socket.io.on(Manager.EVENT_RECONNECT, new Emitter.Listener() { - @Override - public void call(Object... objects) { - socket.close(); - latch.countDown(); - } - }); + socket.close(); + latch.countDown(); } }); socket.open(); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + socket.io.engine.close(); + } + }, 500); + latch.await(); + } + + @Test(timeout = TIMEOUT) + public void reconnectEventFireInSocket() throws URISyntaxException, InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + socket = client(); + socket.on(Socket.EVENT_RECONNECT, new Emitter.Listener() { + @Override + public void call(Object... objects) { + socket.close(); + latch.countDown(); + } + }); + socket.open(); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + socket.io.engine.close(); + } + }, 500); latch.await(); } @@ -312,6 +333,74 @@ public class ConnectionTest extends Connection { latch.await(); } + @Test(timeout = TIMEOUT) + public void fireReconnectEventsOnSocket() throws URISyntaxException, InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + + Manager.Options opts = new Manager.Options(); + opts.reconnection = true; + opts.timeout = 0; + opts.reconnectionAttempts = 2; + opts.reconnectionDelay = 10; + Manager manager = new Manager(new URI(uri()), opts); + socket = manager.socket("/timeout_socket"); + + final int[] reconnects = new int[] {0}; + Emitter.Listener reconnectCb = new Emitter.Listener() { + @Override + public void call(Object... args) { + reconnects[0]++; + assertThat((Integer)args[0], is(reconnects[0])); + } + }; + + socket.on(Socket.EVENT_RECONNECT_ATTEMPT, reconnectCb); + socket.on(Socket.EVENT_RECONNECT_FAILED, new Emitter.Listener() { + @Override + public void call(Object... objects) { + assertThat(reconnects[0], is(2)); + socket.close(); + latch.countDown(); + } + }); + socket.open(); + latch.await(); + } + + @Test(timeout = TIMEOUT) + public void fireReconnectingWithAttemptsNumberWhenReconnectingTwice() throws URISyntaxException, InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + + Manager.Options opts = new Manager.Options(); + opts.reconnection = true; + opts.timeout = 0; + opts.reconnectionAttempts = 2; + opts.reconnectionDelay = 10; + Manager manager = new Manager(new URI(uri()), opts); + socket = manager.socket("/timeout_socket"); + + final int[] reconnects = new int[] {0}; + Emitter.Listener reconnectCb = new Emitter.Listener() { + @Override + public void call(Object... args) { + reconnects[0]++; + assertThat((Integer)args[0], is(reconnects[0])); + } + }; + + socket.on(Socket.EVENT_RECONNECTING, reconnectCb); + socket.on(Socket.EVENT_RECONNECT_FAILED, new Emitter.Listener() { + @Override + public void call(Object... objects) { + assertThat(reconnects[0], is(2)); + socket.close(); + latch.countDown(); + } + }); + socket.open(); + latch.await(); + } + @Test(timeout = TIMEOUT) public void emitDateAsString() throws URISyntaxException, InterruptedException { final CountDownLatch latch = new CountDownLatch(1); diff --git a/src/test/java/com/github/nkzawa/socketio/client/UrlTest.java b/src/test/java/com/github/nkzawa/socketio/client/UrlTest.java index 32465bd..cc5a9f0 100644 --- a/src/test/java/com/github/nkzawa/socketio/client/UrlTest.java +++ b/src/test/java/com/github/nkzawa/socketio/client/UrlTest.java @@ -5,7 +5,6 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.net.MalformedURLException; -import java.net.URI; import java.net.URISyntaxException; import java.net.URL; @@ -18,7 +17,13 @@ public class UrlTest { @Test public void parse() throws MalformedURLException, URISyntaxException { - URL url = Url.parse(new URI("https://woot.com/test")); + assertThat(Url.parse("http://username:password@host:8080/directory/file?query#ref").toString(), + is("http://username:password@host:8080/directory/file?query#ref")); + } + + @Test + public void parseRelativePath() throws MalformedURLException, URISyntaxException { + URL url = Url.parse("https://woot.com/test"); assertThat(url.getProtocol(), is("https")); assertThat(url.getHost(), is("woot.com")); assertThat(url.getPath(), is("/test")); @@ -26,7 +31,7 @@ public class UrlTest { @Test public void parseNoProtocol() throws MalformedURLException, URISyntaxException { - URL url = Url.parse(new URI("//localhost:3000")); + URL url = Url.parse("//localhost:3000"); assertThat(url.getProtocol(), is("https")); assertThat(url.getHost(), is("localhost")); assertThat(url.getPort(), is(3000)); @@ -34,23 +39,24 @@ public class UrlTest { @Test public void parseNamespace() throws MalformedURLException, URISyntaxException { - assertThat(Url.parse(new URI("http://woot.com/woot")).getPath(), is("/woot")); - assertThat(Url.parse(new URI("http://google.com")).getPath(), is("/")); - assertThat(Url.parse(new URI("http://google.com/")).getPath(), is("/")); + assertThat(Url.parse("http://woot.com/woot").getPath(), is("/woot")); + assertThat(Url.parse("http://google.com").getPath(), is("/")); + assertThat(Url.parse("http://google.com/").getPath(), is("/")); } @Test public void parseDefaultPort() throws MalformedURLException, URISyntaxException { - assertThat(Url.parse(new URI("http://google.com:80/")).toString(), is("http://google.com/")); - assertThat(Url.parse(new URI("https://google.com:443/")).toString(), is("https://google.com/")); + assertThat(Url.parse("http://google.com/").toString(), is("http://google.com:80/")); + assertThat(Url.parse("https://google.com/").toString(), is("https://google.com:443/")); } @Test - public void extractId() throws MalformedURLException, URISyntaxException { - String id1 = Url.extractId(new URL("http://google.com:80/")); - String id2 = Url.extractId(new URL("http://google.com/")); - String id3 = Url.extractId(new URL("https://google.com/")); + public void extractId() throws MalformedURLException { + String id1 = Url.extractId("http://google.com:80/"); + String id2 = Url.extractId("http://google.com/"); + String id3 = Url.extractId("https://google.com/"); assertThat(id1, is(id2)); assertThat(id1, is(not(id3))); + assertThat(id2, is(not(id3))); } }