compatible with socket.io-client 1.0.6

This commit is contained in:
Naoyuki Kanezawa
2014-07-13 08:21:40 +09:00
parent d070baa1b6
commit 289b282a76
6 changed files with 239 additions and 77 deletions

View File

@@ -12,6 +12,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -63,6 +64,8 @@ public class Manager extends Emitter {
public static final String EVENT_RECONNECT_ATTEMPT = "reconnect_attempt";
public static final String EVENT_RECONNECTING = "reconnecting";
/*package*/ static SSLContext defaultSSLContext;
/*package*/ ReadyState readyState = null;
@@ -135,6 +138,13 @@ public class Manager extends Emitter {
this.decoder = new Parser.Decoder();
}
private void emitAll(String event, Object... args) {
this.emit(event, args);
for (Socket socket : this.nsps.values()) {
socket.emit(event, args);
}
}
public boolean reconnection() {
return this._reconnection;
}
@@ -226,7 +236,7 @@ public class Manager extends Emitter {
logger.fine("connect_error");
self.cleanup();
self.readyState = ReadyState.CLOSED;
self.emit(EVENT_CONNECT_ERROR, data);
self.emitAll(EVENT_CONNECT_ERROR, data);
if (fn != null) {
Exception err = new SocketIOException("Connection error",
data instanceof Exception ? (Exception) data : null);
@@ -251,7 +261,7 @@ public class Manager extends Emitter {
openSub.destroy();
socket.close();
socket.emit(Engine.EVENT_ERROR, new SocketIOException("timeout"));
self.emit(EVENT_CONNECT_TIMEOUT, timeout);
self.emitAll(EVENT_CONNECT_TIMEOUT, timeout);
}
});
}
@@ -327,7 +337,8 @@ public class Manager extends Emitter {
}
private void onerror(Exception err) {
this.emit(EVENT_ERROR, err);
logger.log(Level.FINE, "error", err);
this.emitAll(EVENT_ERROR, err);
}
/**
@@ -423,7 +434,7 @@ public class Manager extends Emitter {
if (attempts > this._reconnectionAttempts) {
logger.fine("reconnect failed");
this.emit(EVENT_RECONNECT_FAILED);
this.emitAll(EVENT_RECONNECT_FAILED);
this.reconnecting = false;
} else {
long delay = this.attempts * this.reconnectionDelay();
@@ -438,7 +449,8 @@ public class Manager extends Emitter {
@Override
public void run() {
logger.fine("attempting reconnect");
self.emit(EVENT_RECONNECT_ATTEMPT);
self.emitAll(EVENT_RECONNECT_ATTEMPT, self.attempts);
self.emitAll(EVENT_RECONNECTING, self.attempts);
self.open(new OpenCallback() {
@Override
public void call(Exception err) {
@@ -446,7 +458,7 @@ public class Manager extends Emitter {
logger.fine("reconnect attempt error");
self.reconnecting = false;
self.reconnect();
self.emit(EVENT_RECONNECT_ERROR, err);
self.emitAll(EVENT_RECONNECT_ERROR, err);
} else {
logger.fine("reconnect success");
self.onreconnect();
@@ -471,7 +483,7 @@ public class Manager extends Emitter {
int attempts = this.attempts;
this.attempts = 0;
this.reconnecting = false;
this.emit(EVENT_RECONNECT, attempts);
this.emitAll(EVENT_RECONNECT, attempts);
}

View File

@@ -41,10 +41,31 @@ public class Socket extends Emitter {
public static final String EVENT_MESSAGE = "message";
public static final String EVENT_CONNECT_ERROR = Manager.EVENT_CONNECT_ERROR;
public static final String EVENT_CONNECT_TIMEOUT = Manager.EVENT_CONNECT_TIMEOUT;
public static final String EVENT_RECONNECT = Manager.EVENT_RECONNECT;
public static final String EVENT_RECONNECT_ERROR = Manager.EVENT_RECONNECT_ERROR;
public static final String EVENT_RECONNECT_FAILED = Manager.EVENT_RECONNECT_FAILED;
public static final String EVENT_RECONNECT_ATTEMPT = Manager.EVENT_RECONNECT_ATTEMPT;
public static final String EVENT_RECONNECTING = Manager.EVENT_RECONNECTING;
private static Map<String, Integer> events = new HashMap<String, Integer>() {{
put(EVENT_CONNECT, 1);
put(EVENT_CONNECT_ERROR, 1);
put(EVENT_CONNECT_TIMEOUT, 1);
put(EVENT_DISCONNECT, 1);
put(EVENT_ERROR, 1);
put(EVENT_RECONNECT, 1);
put(EVENT_RECONNECT_ATTEMPT, 1);
put(EVENT_RECONNECT_FAILED, 1);
put(EVENT_RECONNECT_ERROR, 1);
put(EVENT_RECONNECTING, 1);
}};
private boolean connected;
@@ -54,11 +75,37 @@ public class Socket extends Emitter {
/*package*/ Manager io;
private Map<Integer, Ack> acks = new HashMap<Integer, Ack>();
private Queue<On.Handle> subs;
private final Queue<List<Object>> buffer = new LinkedList<List<Object>>();
private final Queue<List<Object>> receiveBuffer = new LinkedList<List<Object>>();
private final Queue<Packet<JSONArray>> sendBuffer = new LinkedList<Packet<JSONArray>>();
public Socket(Manager io, String nsp) {
this.io = io;
this.nsp = nsp;
this.subEvents();
}
private void subEvents() {
final Manager io = Socket.this.io;
Socket.this.subs = new LinkedList<On.Handle>() {{
add(On.on(io, Manager.EVENT_OPEN, new Listener() {
@Override
public void call(Object... args) {
Socket.this.onopen();
}
}));
add(On.on(io, Manager.EVENT_PACKET, new Listener() {
@Override
public void call(Object... args) {
Socket.this.onpacket((Packet) args[0]);
}
}));
add(On.on(io, Manager.EVENT_CLOSE, new Listener() {
@Override
public void call(Object... args) {
Socket.this.onclose(args.length > 0 ? (String) args[0] : null);
}
}));
}};
}
/**
@@ -69,34 +116,8 @@ public class Socket extends Emitter {
@Override
public void run() {
if (Socket.this.connected) return;
final Manager io = Socket.this.io;
io.open();
Socket.this.subs = new LinkedList<On.Handle>() {{
add(On.on(io, Manager.EVENT_OPEN, new Listener() {
@Override
public void call(Object... args) {
Socket.this.onopen();
}
}));
add(On.on(io, Manager.EVENT_ERROR, new Listener() {
@Override
public void call(Object... args) {
Socket.this.onerror(args.length > 0 ? (Exception) args[0] : null);
}
}));
add(On.on(io, Manager.EVENT_PACKET, new Listener() {
@Override
public void call(Object... args) {
Socket.this.onpacket((Packet) args[0]);
}
}));
add(On.on(io, Manager.EVENT_CLOSE, new Listener() {
@Override
public void call(Object... args) {
Socket.this.onclose(args.length > 0 ? (String) args[0] : null);
}
}));
}};
Socket.this.io.open();
if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen();
}
});
@@ -163,7 +184,11 @@ public class Socket extends Emitter {
packet.id = Socket.this.ids++;
}
Socket.this.packet(packet);
if (Socket.this.connected) {
Socket.this.packet(packet);
} else {
Socket.this.sendBuffer.add(packet);
}
}
});
return this;
@@ -220,10 +245,6 @@ public class Socket extends Emitter {
this.io.packet(packet);
}
private void onerror(Exception err) {
this.emit(EVENT_ERROR, err);
}
private void onopen() {
logger.fine("transport is open - connecting");
@@ -286,7 +307,7 @@ public class Socket extends Emitter {
String event = (String)args.remove(0);
super.emit(event, args.toArray());
} else {
this.buffer.add(args);
this.receiveBuffer.add(args);
}
}
@@ -328,10 +349,17 @@ public class Socket extends Emitter {
private void emitBuffered() {
List<Object> data;
while ((data = this.buffer.poll()) != null) {
while ((data = this.receiveBuffer.poll()) != null) {
String event = (String)data.get(0);
super.emit(event, data.toArray());
}
this.receiveBuffer.clear();
Packet<JSONArray> packet;
while ((packet = this.sendBuffer.poll()) != null) {
this.packet(packet);
}
this.sendBuffer.clear();
}
private void ondisconnect() {

View File

@@ -2,40 +2,68 @@ package com.github.nkzawa.socketio.client;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.regex.Pattern;
public class Url {
private static Pattern PATTERN_HTTP = Pattern.compile("^http|ws$");
private static Pattern PATTERN_HTTPS = Pattern.compile("^(http|ws)s$");
private Url() {}
public static URL parse(String uri) throws URISyntaxException, MalformedURLException {
return parse(new URI(uri));
}
public static URL parse(URI uri) throws MalformedURLException {
String protocol = uri.getScheme();
if (protocol == null || !protocol.matches("^https?|wss?$")) {
uri = uri.resolve("https://" + uri.getAuthority());
protocol = "https";
}
int port = uri.getPort();
if (protocol != null && ((protocol.matches("^http|ws$") && port == 80) ||
(protocol.matches("^(http|ws)s$") && port == 443))) {
uri = uri.resolve("//" + uri.getHost());
if (port == -1) {
if (PATTERN_HTTP.matcher(protocol).matches()) {
port = 80;
} else if (PATTERN_HTTPS.matcher(protocol).matches()) {
port = 443;
}
}
String path = uri.getPath();
String path = uri.getRawPath();
if (path == null || path.length() == 0) {
uri = uri.resolve("/");
path = "/";
}
return uri.toURL();
String userInfo = uri.getRawUserInfo();
String query = uri.getRawQuery();
String fragment = uri.getRawFragment();
return new URL(protocol + "://"
+ (userInfo != null ? userInfo + "@" : "")
+ uri.getHost()
+ (port != -1 ? ":" + port : "")
+ path
+ (query != null ? "?" + query : "")
+ (fragment != null ? "#" + fragment : ""));
}
public static String extractId(String url) throws MalformedURLException {
return extractId(new URL(url));
}
public static String extractId(URL url) {
String protocol = url.getProtocol();
int port = url.getPort();
if ((protocol.matches("^http|ws$") && port == 80) ||
(protocol.matches("^(http|ws)s$") && port == 443)) {
port = -1;
if (port == -1) {
if (PATTERN_HTTP.matcher(protocol).matches()) {
port = 80;
} else if (PATTERN_HTTPS.matcher(protocol).matches()) {
port = 443;
}
}
return protocol + "://" + url.getHost() + (port != -1 ? ":" + port : "");
return protocol + "://" + url.getHost() + ":" + port;
}
}

View File

@@ -91,7 +91,6 @@ public abstract class Connection {
IO.Options createOptions() {
IO.Options opts = new IO.Options();
opts.forceNew = true;
opts.reconnection = false;
return opts;
}

View File

@@ -196,21 +196,42 @@ public class ConnectionTest extends Connection {
@Test(timeout = TIMEOUT)
public void reconnectByDefault() throws URISyntaxException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
socket = IO.socket(uri());
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
socket = client();
socket.io.on(Manager.EVENT_RECONNECT, new Emitter.Listener() {
@Override
public void call(Object... objects) {
socket.io.engine.close();
socket.io.on(Manager.EVENT_RECONNECT, new Emitter.Listener() {
@Override
public void call(Object... objects) {
socket.close();
latch.countDown();
}
});
socket.close();
latch.countDown();
}
});
socket.open();
new Timer().schedule(new TimerTask() {
@Override
public void run() {
socket.io.engine.close();
}
}, 500);
latch.await();
}
@Test(timeout = TIMEOUT)
public void reconnectEventFireInSocket() throws URISyntaxException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
socket = client();
socket.on(Socket.EVENT_RECONNECT, new Emitter.Listener() {
@Override
public void call(Object... objects) {
socket.close();
latch.countDown();
}
});
socket.open();
new Timer().schedule(new TimerTask() {
@Override
public void run() {
socket.io.engine.close();
}
}, 500);
latch.await();
}
@@ -312,6 +333,74 @@ public class ConnectionTest extends Connection {
latch.await();
}
@Test(timeout = TIMEOUT)
public void fireReconnectEventsOnSocket() throws URISyntaxException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
Manager.Options opts = new Manager.Options();
opts.reconnection = true;
opts.timeout = 0;
opts.reconnectionAttempts = 2;
opts.reconnectionDelay = 10;
Manager manager = new Manager(new URI(uri()), opts);
socket = manager.socket("/timeout_socket");
final int[] reconnects = new int[] {0};
Emitter.Listener reconnectCb = new Emitter.Listener() {
@Override
public void call(Object... args) {
reconnects[0]++;
assertThat((Integer)args[0], is(reconnects[0]));
}
};
socket.on(Socket.EVENT_RECONNECT_ATTEMPT, reconnectCb);
socket.on(Socket.EVENT_RECONNECT_FAILED, new Emitter.Listener() {
@Override
public void call(Object... objects) {
assertThat(reconnects[0], is(2));
socket.close();
latch.countDown();
}
});
socket.open();
latch.await();
}
@Test(timeout = TIMEOUT)
public void fireReconnectingWithAttemptsNumberWhenReconnectingTwice() throws URISyntaxException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
Manager.Options opts = new Manager.Options();
opts.reconnection = true;
opts.timeout = 0;
opts.reconnectionAttempts = 2;
opts.reconnectionDelay = 10;
Manager manager = new Manager(new URI(uri()), opts);
socket = manager.socket("/timeout_socket");
final int[] reconnects = new int[] {0};
Emitter.Listener reconnectCb = new Emitter.Listener() {
@Override
public void call(Object... args) {
reconnects[0]++;
assertThat((Integer)args[0], is(reconnects[0]));
}
};
socket.on(Socket.EVENT_RECONNECTING, reconnectCb);
socket.on(Socket.EVENT_RECONNECT_FAILED, new Emitter.Listener() {
@Override
public void call(Object... objects) {
assertThat(reconnects[0], is(2));
socket.close();
latch.countDown();
}
});
socket.open();
latch.await();
}
@Test(timeout = TIMEOUT)
public void emitDateAsString() throws URISyntaxException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);

View File

@@ -5,7 +5,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@@ -18,7 +17,13 @@ public class UrlTest {
@Test
public void parse() throws MalformedURLException, URISyntaxException {
URL url = Url.parse(new URI("https://woot.com/test"));
assertThat(Url.parse("http://username:password@host:8080/directory/file?query#ref").toString(),
is("http://username:password@host:8080/directory/file?query#ref"));
}
@Test
public void parseRelativePath() throws MalformedURLException, URISyntaxException {
URL url = Url.parse("https://woot.com/test");
assertThat(url.getProtocol(), is("https"));
assertThat(url.getHost(), is("woot.com"));
assertThat(url.getPath(), is("/test"));
@@ -26,7 +31,7 @@ public class UrlTest {
@Test
public void parseNoProtocol() throws MalformedURLException, URISyntaxException {
URL url = Url.parse(new URI("//localhost:3000"));
URL url = Url.parse("//localhost:3000");
assertThat(url.getProtocol(), is("https"));
assertThat(url.getHost(), is("localhost"));
assertThat(url.getPort(), is(3000));
@@ -34,23 +39,24 @@ public class UrlTest {
@Test
public void parseNamespace() throws MalformedURLException, URISyntaxException {
assertThat(Url.parse(new URI("http://woot.com/woot")).getPath(), is("/woot"));
assertThat(Url.parse(new URI("http://google.com")).getPath(), is("/"));
assertThat(Url.parse(new URI("http://google.com/")).getPath(), is("/"));
assertThat(Url.parse("http://woot.com/woot").getPath(), is("/woot"));
assertThat(Url.parse("http://google.com").getPath(), is("/"));
assertThat(Url.parse("http://google.com/").getPath(), is("/"));
}
@Test
public void parseDefaultPort() throws MalformedURLException, URISyntaxException {
assertThat(Url.parse(new URI("http://google.com:80/")).toString(), is("http://google.com/"));
assertThat(Url.parse(new URI("https://google.com:443/")).toString(), is("https://google.com/"));
assertThat(Url.parse("http://google.com/").toString(), is("http://google.com:80/"));
assertThat(Url.parse("https://google.com/").toString(), is("https://google.com:443/"));
}
@Test
public void extractId() throws MalformedURLException, URISyntaxException {
String id1 = Url.extractId(new URL("http://google.com:80/"));
String id2 = Url.extractId(new URL("http://google.com/"));
String id3 = Url.extractId(new URL("https://google.com/"));
public void extractId() throws MalformedURLException {
String id1 = Url.extractId("http://google.com:80/");
String id2 = Url.extractId("http://google.com/");
String id3 = Url.extractId("https://google.com/");
assertThat(id1, is(id2));
assertThat(id1, is(not(id3)));
assertThat(id2, is(not(id3)));
}
}