diff --git a/pom.xml b/pom.xml
index 3272f31..8205605 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,7 +57,7 @@
com.github.nkzawa
engine.io-client
- 0.1.3
+ 0.2.0-SNAPSHOT
com.google.code.gson
diff --git a/src/main/java/com/github/nkzawa/socketio/client/IO.java b/src/main/java/com/github/nkzawa/socketio/client/IO.java
index 6357ef1..6502dd7 100644
--- a/src/main/java/com/github/nkzawa/socketio/client/IO.java
+++ b/src/main/java/com/github/nkzawa/socketio/client/IO.java
@@ -66,8 +66,7 @@ public class IO {
io = managers.get(id);
}
- String path = uri.getPath();
- return io.socket(path != null && !path.isEmpty() ? path : "/");
+ return io.socket(parsed.getPath());
}
diff --git a/src/main/java/com/github/nkzawa/socketio/client/Manager.java b/src/main/java/com/github/nkzawa/socketio/client/Manager.java
index ff32f9e..7922e75 100644
--- a/src/main/java/com/github/nkzawa/socketio/client/Manager.java
+++ b/src/main/java/com/github/nkzawa/socketio/client/Manager.java
@@ -1,12 +1,14 @@
package com.github.nkzawa.socketio.client;
import com.github.nkzawa.emitter.Emitter;
-import com.github.nkzawa.engineio.client.EventThread;
import com.github.nkzawa.socketio.parser.Packet;
import com.github.nkzawa.socketio.parser.Parser;
+import com.github.nkzawa.thread.EventThread;
import java.net.URI;
+import java.util.ArrayList;
import java.util.LinkedList;
+import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.logging.Logger;
@@ -58,52 +60,65 @@ public class Manager extends Emitter {
public static final String EVENT_RECONNECT_FAILED = "reconnect_failed";
- /*package*/ ReadyState readyState = ReadyState.CLOSED;
+ public static final String EVENT_RECONNECT_ATTEMPT = "reconnect_attempt";
+
+ /*package*/ ReadyState readyState = null;
private boolean _reconnection;
private boolean skipReconnect;
private boolean reconnecting;
+ private boolean encoding;
+ private boolean openReconnect;
private int _reconnectionAttempts;
private long _reconnectionDelay;
private long _reconnectionDelayMax;
private long _timeout;
private int connected;
private int attempts;
- private Queue subs = new LinkedList();
+ private URI uri;
+ private List packetBuffer;
+ private Queue subs;
+ private IO.Options opts;
private com.github.nkzawa.engineio.client.Socket engine;
+ private Parser.Encoder encoder;
+ private Parser.Decoder decoder;
/**
* This HashMap can be accessed from outside of EventThread.
*/
- private ConcurrentHashMap nsps = new ConcurrentHashMap();
+ private ConcurrentHashMap nsps;
private ScheduledExecutorService timeoutScheduler = Executors.newSingleThreadScheduledExecutor();
private ScheduledExecutorService reconnectScheduler = Executors.newSingleThreadScheduledExecutor();
+ public Manager(IO.Options opts) {
+ this(null, opts);
+ }
+
public Manager(URI uri, IO.Options opts) {
- opts = initOptions(opts);
- this.engine = new Engine(uri, opts);
- }
-
- public Manager(com.github.nkzawa.engineio.client.Socket socket, IO.Options opts) {
- opts = initOptions(opts);
- this.engine = socket;
- }
-
- private IO.Options initOptions(IO.Options opts) {
if (opts == null) {
opts = new IO.Options();
}
if (opts.path == null) {
opts.path = "/socket.io";
}
+ this.opts = opts;
+ this.nsps = new ConcurrentHashMap();
+ this.subs = new LinkedList();
this.reconnection(opts.reconnection);
this.reconnectionAttempts(opts.reconnectionAttempts != 0 ? opts.reconnectionAttempts : Integer.MAX_VALUE);
this.reconnectionDelay(opts.reconnectionDelay != 0 ? opts.reconnectionDelay : 1000);
this.reconnectionDelayMax(opts.reconnectionDelayMax != 0 ? opts.reconnectionDelayMax : 5000);
- this.timeout(opts.timeout < 0 ? 10000 : opts.timeout);
- return opts;
+ this.timeout(opts.timeout < 0 ? 20000 : opts.timeout);
+ this.readyState = ReadyState.CLOSED;
+ this.uri = uri;
+ this.connected = 0;
+ this.attempts = 0;
+ this.encoding = false;
+ this.packetBuffer = new ArrayList();
+ this.encoder = new Parser.Encoder();
+ this.decoder = new Parser.Decoder();
}
public boolean reconnection() {
@@ -151,6 +166,13 @@ public class Manager extends Emitter {
return this;
}
+ private void maybeReconnectOnOpen() {
+ if (!this.openReconnect && !this.reconnecting && this._reconnection) {
+ this.openReconnect = true;
+ this.reconnect();
+ }
+ }
+
public Manager open(){
return open(null);
}
@@ -165,8 +187,11 @@ public class Manager extends Emitter {
EventThread.exec(new Runnable() {
@Override
public void run() {
+ logger.fine(String.format("readyState %s", Manager.this.readyState));
if (Manager.this.readyState == ReadyState.OPEN) return;
+ logger.fine(String.format("opening %s", Manager.this.uri));
+ Manager.this.engine = new Engine(Manager.this.uri, Manager.this.opts);
final com.github.nkzawa.engineio.client.Socket socket = Manager.this.engine;
final Manager self = Manager.this;
@@ -184,13 +209,17 @@ public class Manager extends Emitter {
@Override
public void call(Object... objects) {
Object data = objects.length > 0 ? objects[0] : null;
+ logger.fine("connect_error");
self.cleanup();
+ self.readyState = ReadyState.CLOSED;
self.emit(EVENT_CONNECT_ERROR, data);
if (fn != null) {
Exception err = new SocketIOException("Connection error",
- data instanceof Exception ? (Exception)data : null);
+ data instanceof Exception ? (Exception) data : null);
fn.call(err);
}
+
+ self.maybeReconnectOnOpen();
}
});
@@ -214,14 +243,12 @@ public class Manager extends Emitter {
}
}, timeout, TimeUnit.MILLISECONDS);
- On.Handle timeSub = new On.Handle() {
+ Manager.this.subs.add(new On.Handle() {
@Override
public void destroy() {
timer.cancel(false);
}
- };
-
- Manager.this.subs.add(timeSub);
+ });
}
Manager.this.subs.add(openSub);
@@ -234,6 +261,8 @@ public class Manager extends Emitter {
}
private void onopen() {
+ logger.fine("open");
+
this.cleanup();
this.readyState = ReadyState.OPEN;
@@ -246,6 +275,12 @@ public class Manager extends Emitter {
Manager.this.ondata((String)objects[0]);
}
}));
+ this.subs.add(On.on(this.decoder, Parser.Decoder.EVENT_DECODED, new Listener() {
+ @Override
+ public void call(Object... objects) {
+ Manager.this.ondecoded((Packet) objects[0]);
+ }
+ }));
this.subs.add(On.on(socket, Engine.EVENT_ERROR, new Listener() {
@Override
public void call(Object... objects) {
@@ -255,13 +290,17 @@ public class Manager extends Emitter {
this.subs.add(On.on(socket, Engine.EVENT_CLOSE, new Listener() {
@Override
public void call(Object... objects) {
- Manager.this.onclose();
+ Manager.this.onclose((String)objects[0]);
}
}));
}
private void ondata(String data) {
- this.emit(EVENT_PACKET, Parser.decode(data));
+ this.decoder.add(data);
+ }
+
+ private void ondecoded(Packet packet) {
+ this.emit(EVENT_PACKET, packet);
}
private void onerror(Exception err) {
@@ -295,15 +334,38 @@ public class Manager extends Emitter {
}
/*package*/ void destroy(Socket socket) {
- this.connected--;
- if (connected == 0) {
+ --this.connected;
+ if (this.connected == 0) {
this.close();
}
}
/*package*/ void packet(Packet packet) {
logger.fine(String.format("writing packet %s", packet));
- this.engine.write(Parser.encode(packet));
+ final Manager self = this;
+
+ if (!self.encoding) {
+ self.encoding = true;
+ this.encoder.encode(packet, new Parser.Encoder.Callback() {
+ @Override
+ public void call(String[] encodedPackets) {
+ for (int i = 0; i < encodedPackets.length; i++) {
+ self.engine.write(encodedPackets[i]);
+ }
+ self.encoding = false;
+ self.processPacketQueue();
+ }
+ });
+ } else {
+ self.packetBuffer.add(packet);
+ }
+ }
+
+ private void processPacketQueue() {
+ if (this.packetBuffer.size() > 0 && !this.encoding) {
+ Packet pack = this.packetBuffer.remove(0);
+ this.packet(pack);
+ }
}
private void cleanup() {
@@ -313,24 +375,27 @@ public class Manager extends Emitter {
private void close() {
this.skipReconnect = true;
- this.cleanup();
- this.readyState = ReadyState.CLOSED;
this.engine.close();
}
- private void onclose() {
+ private void onclose(String reason) {
+ logger.fine("close");
this.cleanup();
this.readyState = ReadyState.CLOSED;
+ this.emit(EVENT_CLOSE, reason);
if (this._reconnection && !this.skipReconnect) {
this.reconnect();
}
}
private void reconnect() {
+ if (this.reconnecting) return;
+
final Manager self = this;
this.attempts++;
if (attempts > this._reconnectionAttempts) {
+ logger.fine("reconnect failed");
this.emit(EVENT_RECONNECT_FAILED);
this.reconnecting = false;
} else {
@@ -346,11 +411,13 @@ public class Manager extends Emitter {
@Override
public void run() {
logger.fine("attempting reconnect");
+ self.emit(EVENT_RECONNECT_ATTEMPT);
self.open(new OpenCallback() {
@Override
public void call(Exception err) {
if (err != null) {
logger.fine("reconnect attempt error");
+ self.reconnecting = false;
self.reconnect();
self.emit(EVENT_RECONNECT_ERROR, err);
} else {
diff --git a/src/main/java/com/github/nkzawa/socketio/client/Socket.java b/src/main/java/com/github/nkzawa/socketio/client/Socket.java
index 6b404d3..6d15a48 100644
--- a/src/main/java/com/github/nkzawa/socketio/client/Socket.java
+++ b/src/main/java/com/github/nkzawa/socketio/client/Socket.java
@@ -1,9 +1,9 @@
package com.github.nkzawa.socketio.client;
import com.github.nkzawa.emitter.Emitter;
-import com.github.nkzawa.engineio.client.EventThread;
import com.github.nkzawa.socketio.parser.Packet;
import com.github.nkzawa.socketio.parser.Parser;
+import com.github.nkzawa.thread.EventThread;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
@@ -42,10 +42,10 @@ public class Socket extends Emitter {
public static final String EVENT_MESSAGE = "message";
- private static List events = new ArrayList() {{
- add(EVENT_CONNECT);
- add(EVENT_DISCONNECT);
- add(EVENT_ERROR);
+ private static Map events = new HashMap() {{
+ put(EVENT_CONNECT, 1);
+ put(EVENT_DISCONNECT, 1);
+ put(EVENT_ERROR, 1);
}};
private boolean connected;
@@ -66,36 +66,50 @@ public class Socket extends Emitter {
/**
* Connects the socket.
*/
- public void open() {
+ public Socket open() {
EventThread.exec(new Runnable() {
@Override
public void run() {
+ if (Socket.this.connected) return;
final Manager io = Socket.this.io;
+ io.open();
Socket.this.subs = new LinkedList() {{
add(On.on(io, Manager.EVENT_OPEN, new Listener() {
@Override
- public void call(Object... objects) {
+ public void call(Object... args) {
Socket.this.onopen();
}
}));
add(On.on(io, Manager.EVENT_ERROR, new Listener() {
@Override
- public void call(Object... objects) {
- Socket.this.onerror(objects.length > 0 ? (Exception) objects[0] : null);
+ public void call(Object... args) {
+ Socket.this.onerror(args.length > 0 ? (Exception) args[0] : null);
+ }
+ }));
+ add(On.on(io, Manager.EVENT_PACKET, new Listener() {
+ @Override
+ public void call(Object... args) {
+ Socket.this.onpacket((Packet) args[0]);
+ }
+ }));
+ add(On.on(io, Manager.EVENT_CLOSE, new Listener() {
+ @Override
+ public void call(Object... args) {
+ Socket.this.onclose(args.length > 0 ? (String) args[0] : null);
}
}));
}};
- if (Socket.this.io.readyState == Manager.ReadyState.OPEN) Socket.this.onopen();
- io.open();
+ if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen();
}
});
+ return this;
}
/**
* Connects the socket.
*/
- public void connect() {
- this.open();
+ public Socket connect() {
+ return this.open();
}
/**
@@ -126,20 +140,25 @@ public class Socket extends Emitter {
EventThread.exec(new Runnable() {
@Override
public void run() {
- if (events.contains(event)) {
+ if (events.containsKey(event)) {
Socket.super.emit(event, args);
- } else {
- LinkedList