support protocol v3 (except binary)

This commit is contained in:
Naoyuki Kanezawa
2014-04-05 15:45:51 +09:00
parent d40601020a
commit f44cb0a956
10 changed files with 400 additions and 194 deletions

View File

@@ -57,7 +57,7 @@
<dependency>
<groupId>com.github.nkzawa</groupId>
<artifactId>engine.io-client</artifactId>
<version>0.1.3</version>
<version>0.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>

View File

@@ -66,8 +66,7 @@ public class IO {
io = managers.get(id);
}
String path = uri.getPath();
return io.socket(path != null && !path.isEmpty() ? path : "/");
return io.socket(parsed.getPath());
}

View File

@@ -1,12 +1,14 @@
package com.github.nkzawa.socketio.client;
import com.github.nkzawa.emitter.Emitter;
import com.github.nkzawa.engineio.client.EventThread;
import com.github.nkzawa.socketio.parser.Packet;
import com.github.nkzawa.socketio.parser.Parser;
import com.github.nkzawa.thread.EventThread;
import java.net.URI;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.logging.Logger;
@@ -58,52 +60,65 @@ public class Manager extends Emitter {
public static final String EVENT_RECONNECT_FAILED = "reconnect_failed";
/*package*/ ReadyState readyState = ReadyState.CLOSED;
public static final String EVENT_RECONNECT_ATTEMPT = "reconnect_attempt";
/*package*/ ReadyState readyState = null;
private boolean _reconnection;
private boolean skipReconnect;
private boolean reconnecting;
private boolean encoding;
private boolean openReconnect;
private int _reconnectionAttempts;
private long _reconnectionDelay;
private long _reconnectionDelayMax;
private long _timeout;
private int connected;
private int attempts;
private Queue<On.Handle> subs = new LinkedList<On.Handle>();
private URI uri;
private List<Packet> packetBuffer;
private Queue<On.Handle> subs;
private IO.Options opts;
private com.github.nkzawa.engineio.client.Socket engine;
private Parser.Encoder encoder;
private Parser.Decoder decoder;
/**
* This HashMap can be accessed from outside of EventThread.
*/
private ConcurrentHashMap<String, Socket> nsps = new ConcurrentHashMap<String, Socket>();
private ConcurrentHashMap<String, Socket> nsps;
private ScheduledExecutorService timeoutScheduler = Executors.newSingleThreadScheduledExecutor();
private ScheduledExecutorService reconnectScheduler = Executors.newSingleThreadScheduledExecutor();
public Manager(IO.Options opts) {
this(null, opts);
}
public Manager(URI uri, IO.Options opts) {
opts = initOptions(opts);
this.engine = new Engine(uri, opts);
}
public Manager(com.github.nkzawa.engineio.client.Socket socket, IO.Options opts) {
opts = initOptions(opts);
this.engine = socket;
}
private IO.Options initOptions(IO.Options opts) {
if (opts == null) {
opts = new IO.Options();
}
if (opts.path == null) {
opts.path = "/socket.io";
}
this.opts = opts;
this.nsps = new ConcurrentHashMap<String, Socket>();
this.subs = new LinkedList<On.Handle>();
this.reconnection(opts.reconnection);
this.reconnectionAttempts(opts.reconnectionAttempts != 0 ? opts.reconnectionAttempts : Integer.MAX_VALUE);
this.reconnectionDelay(opts.reconnectionDelay != 0 ? opts.reconnectionDelay : 1000);
this.reconnectionDelayMax(opts.reconnectionDelayMax != 0 ? opts.reconnectionDelayMax : 5000);
this.timeout(opts.timeout < 0 ? 10000 : opts.timeout);
return opts;
this.timeout(opts.timeout < 0 ? 20000 : opts.timeout);
this.readyState = ReadyState.CLOSED;
this.uri = uri;
this.connected = 0;
this.attempts = 0;
this.encoding = false;
this.packetBuffer = new ArrayList<Packet>();
this.encoder = new Parser.Encoder();
this.decoder = new Parser.Decoder();
}
public boolean reconnection() {
@@ -151,6 +166,13 @@ public class Manager extends Emitter {
return this;
}
private void maybeReconnectOnOpen() {
if (!this.openReconnect && !this.reconnecting && this._reconnection) {
this.openReconnect = true;
this.reconnect();
}
}
public Manager open(){
return open(null);
}
@@ -165,8 +187,11 @@ public class Manager extends Emitter {
EventThread.exec(new Runnable() {
@Override
public void run() {
logger.fine(String.format("readyState %s", Manager.this.readyState));
if (Manager.this.readyState == ReadyState.OPEN) return;
logger.fine(String.format("opening %s", Manager.this.uri));
Manager.this.engine = new Engine(Manager.this.uri, Manager.this.opts);
final com.github.nkzawa.engineio.client.Socket socket = Manager.this.engine;
final Manager self = Manager.this;
@@ -184,13 +209,17 @@ public class Manager extends Emitter {
@Override
public void call(Object... objects) {
Object data = objects.length > 0 ? objects[0] : null;
logger.fine("connect_error");
self.cleanup();
self.readyState = ReadyState.CLOSED;
self.emit(EVENT_CONNECT_ERROR, data);
if (fn != null) {
Exception err = new SocketIOException("Connection error",
data instanceof Exception ? (Exception)data : null);
data instanceof Exception ? (Exception) data : null);
fn.call(err);
}
self.maybeReconnectOnOpen();
}
});
@@ -214,14 +243,12 @@ public class Manager extends Emitter {
}
}, timeout, TimeUnit.MILLISECONDS);
On.Handle timeSub = new On.Handle() {
Manager.this.subs.add(new On.Handle() {
@Override
public void destroy() {
timer.cancel(false);
}
};
Manager.this.subs.add(timeSub);
});
}
Manager.this.subs.add(openSub);
@@ -234,6 +261,8 @@ public class Manager extends Emitter {
}
private void onopen() {
logger.fine("open");
this.cleanup();
this.readyState = ReadyState.OPEN;
@@ -246,6 +275,12 @@ public class Manager extends Emitter {
Manager.this.ondata((String)objects[0]);
}
}));
this.subs.add(On.on(this.decoder, Parser.Decoder.EVENT_DECODED, new Listener() {
@Override
public void call(Object... objects) {
Manager.this.ondecoded((Packet) objects[0]);
}
}));
this.subs.add(On.on(socket, Engine.EVENT_ERROR, new Listener() {
@Override
public void call(Object... objects) {
@@ -255,13 +290,17 @@ public class Manager extends Emitter {
this.subs.add(On.on(socket, Engine.EVENT_CLOSE, new Listener() {
@Override
public void call(Object... objects) {
Manager.this.onclose();
Manager.this.onclose((String)objects[0]);
}
}));
}
private void ondata(String data) {
this.emit(EVENT_PACKET, Parser.decode(data));
this.decoder.add(data);
}
private void ondecoded(Packet packet) {
this.emit(EVENT_PACKET, packet);
}
private void onerror(Exception err) {
@@ -295,15 +334,38 @@ public class Manager extends Emitter {
}
/*package*/ void destroy(Socket socket) {
this.connected--;
if (connected == 0) {
--this.connected;
if (this.connected == 0) {
this.close();
}
}
/*package*/ void packet(Packet packet) {
logger.fine(String.format("writing packet %s", packet));
this.engine.write(Parser.encode(packet));
final Manager self = this;
if (!self.encoding) {
self.encoding = true;
this.encoder.encode(packet, new Parser.Encoder.Callback() {
@Override
public void call(String[] encodedPackets) {
for (int i = 0; i < encodedPackets.length; i++) {
self.engine.write(encodedPackets[i]);
}
self.encoding = false;
self.processPacketQueue();
}
});
} else {
self.packetBuffer.add(packet);
}
}
private void processPacketQueue() {
if (this.packetBuffer.size() > 0 && !this.encoding) {
Packet pack = this.packetBuffer.remove(0);
this.packet(pack);
}
}
private void cleanup() {
@@ -313,24 +375,27 @@ public class Manager extends Emitter {
private void close() {
this.skipReconnect = true;
this.cleanup();
this.readyState = ReadyState.CLOSED;
this.engine.close();
}
private void onclose() {
private void onclose(String reason) {
logger.fine("close");
this.cleanup();
this.readyState = ReadyState.CLOSED;
this.emit(EVENT_CLOSE, reason);
if (this._reconnection && !this.skipReconnect) {
this.reconnect();
}
}
private void reconnect() {
if (this.reconnecting) return;
final Manager self = this;
this.attempts++;
if (attempts > this._reconnectionAttempts) {
logger.fine("reconnect failed");
this.emit(EVENT_RECONNECT_FAILED);
this.reconnecting = false;
} else {
@@ -346,11 +411,13 @@ public class Manager extends Emitter {
@Override
public void run() {
logger.fine("attempting reconnect");
self.emit(EVENT_RECONNECT_ATTEMPT);
self.open(new OpenCallback() {
@Override
public void call(Exception err) {
if (err != null) {
logger.fine("reconnect attempt error");
self.reconnecting = false;
self.reconnect();
self.emit(EVENT_RECONNECT_ERROR, err);
} else {

View File

@@ -1,9 +1,9 @@
package com.github.nkzawa.socketio.client;
import com.github.nkzawa.emitter.Emitter;
import com.github.nkzawa.engineio.client.EventThread;
import com.github.nkzawa.socketio.parser.Packet;
import com.github.nkzawa.socketio.parser.Parser;
import com.github.nkzawa.thread.EventThread;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
@@ -42,10 +42,10 @@ public class Socket extends Emitter {
public static final String EVENT_MESSAGE = "message";
private static List<String> events = new ArrayList<String>() {{
add(EVENT_CONNECT);
add(EVENT_DISCONNECT);
add(EVENT_ERROR);
private static Map<String, Integer> events = new HashMap<String, Integer>() {{
put(EVENT_CONNECT, 1);
put(EVENT_DISCONNECT, 1);
put(EVENT_ERROR, 1);
}};
private boolean connected;
@@ -66,36 +66,50 @@ public class Socket extends Emitter {
/**
* Connects the socket.
*/
public void open() {
public Socket open() {
EventThread.exec(new Runnable() {
@Override
public void run() {
if (Socket.this.connected) return;
final Manager io = Socket.this.io;
io.open();
Socket.this.subs = new LinkedList<On.Handle>() {{
add(On.on(io, Manager.EVENT_OPEN, new Listener() {
@Override
public void call(Object... objects) {
public void call(Object... args) {
Socket.this.onopen();
}
}));
add(On.on(io, Manager.EVENT_ERROR, new Listener() {
@Override
public void call(Object... objects) {
Socket.this.onerror(objects.length > 0 ? (Exception) objects[0] : null);
public void call(Object... args) {
Socket.this.onerror(args.length > 0 ? (Exception) args[0] : null);
}
}));
add(On.on(io, Manager.EVENT_PACKET, new Listener() {
@Override
public void call(Object... args) {
Socket.this.onpacket((Packet) args[0]);
}
}));
add(On.on(io, Manager.EVENT_CLOSE, new Listener() {
@Override
public void call(Object... args) {
Socket.this.onclose(args.length > 0 ? (String) args[0] : null);
}
}));
}};
if (Socket.this.io.readyState == Manager.ReadyState.OPEN) Socket.this.onopen();
io.open();
if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen();
}
});
return this;
}
/**
* Connects the socket.
*/
public void connect() {
this.open();
public Socket connect() {
return this.open();
}
/**
@@ -126,20 +140,25 @@ public class Socket extends Emitter {
EventThread.exec(new Runnable() {
@Override
public void run() {
if (events.contains(event)) {
if (events.containsKey(event)) {
Socket.super.emit(event, args);
} else {
LinkedList<Object> _args = new LinkedList<Object>(Arrays.asList(args));
if (_args.peekLast() instanceof Ack) {
Ack ack = (Ack)_args.pollLast();
Socket.this.emit(event, _args.toArray(), ack);
return;
}
_args.offerFirst(event);
Packet packet = new Packet(Parser.EVENT, toJsonArray(_args));
Socket.this.packet(packet);
List<Object> _args = new ArrayList<Object>(args.length + 1);
_args.add(event);
_args.addAll(Arrays.asList(args));
int parserType = Parser.EVENT;
// TODO: hasBin(_args)
Packet packet = new Packet(parserType, toJsonArray(_args));
if (_args.get(_args.size() - 1) instanceof Ack) {
logger.fine(String.format("emitting packet with ack id %d", Socket.this.ids));
Socket.this.acks.put(Socket.this.ids, (Ack)_args.remove(_args.size() - 1));
packet.id = Socket.this.ids++;
}
Socket.this.packet(packet);
}
});
return this;
@@ -190,21 +209,6 @@ public class Socket extends Emitter {
if (!"/".equals(this.nsp)) {
this.packet(new Packet(Parser.CONNECT));
}
Manager io = this.io;
this.subs.add(On.on(io, Manager.EVENT_PACKET, new Listener() {
@Override
public void call(Object... objects) {
Socket.this.onpacket((Packet)objects[0]);
}
}));
this.subs.add(On.on(io, Manager.EVENT_CLOSE, new Listener() {
@Override
public void call(Object... objects) {
String reason = objects.length > 0 ? (String) objects[0] : null;
Socket.this.onclose(reason);
}
}));
}
private void onclose(String reason) {
@@ -226,6 +230,10 @@ public class Socket extends Emitter {
this.onevent(packet);
break;
case Parser.BINARY_EVENT:
this.onevent(packet);
break;
case Parser.ACK:
this.onack(packet);
break;
@@ -303,8 +311,6 @@ public class Socket extends Emitter {
}
private void destroy() {
logger.fine(String.format("destroying socket (%s)", this.nsp));
for (On.Handle sub : this.subs) {
sub.destroy();
}

View File

@@ -6,18 +6,25 @@ import java.net.URL;
public class Url {
private Url() {
}
private Url() {}
public static URL parse(URI uri) throws MalformedURLException {
String protocol = uri.getScheme();
if (protocol == null || !protocol.matches("^https?|wss?$")) {
uri = uri.resolve("https://" + uri.getAuthority());
}
int port = uri.getPort();
if (protocol != null && ((protocol.matches("^http|ws$") && port == 80) ||
(protocol.matches("^(http|ws)s$") && port == 443))) {
uri = uri.resolve("//" + uri.getHost());
}
String path = uri.getPath();
if (path == null || path.isEmpty()) {
uri = uri.resolve("/");
}
return uri.toURL();
}

View File

@@ -8,6 +8,7 @@ public class Packet {
public int id = -1;
public String nsp;
public JsonElement data;
public int attachments;
public Packet() {}

View File

@@ -1,9 +1,7 @@
package com.github.nkzawa.socketio.parser;
import com.google.gson.Gson;
import com.google.gson.JsonParseException;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.github.nkzawa.emitter.Emitter;
import com.google.gson.*;
import java.util.ArrayList;
import java.util.List;
@@ -14,7 +12,7 @@ public class Parser {
private static final Logger logger = Logger.getLogger(Parser.class.getName());
private static final Gson gson = new Gson();
private static final JsonParser parser = new JsonParser();
private static final JsonParser json = new JsonParser();
/**
* Packet type `connect`.
@@ -41,26 +39,59 @@ public class Parser {
*/
public static final int ERROR = 4;
public static int protocol = 1;
/**
* Packet type `binary event`.
*/
public static final int BINARY_EVENT = 5;
public static int protocol = 3;
/**
* Packet types.
*/
public static List<String > types = new ArrayList<String>() {{
add("CONNECT");
add("DISCONNECT");
add("EVENT");
add("ACK");
add("ERROR");
}};
public static String[] types = new String[] {
"CONNECT",
"DISCONNECT",
"EVENT",
"BINARY_EVENT",
"ACK",
"ERROR",
};
private Parser() {}
public static String encode(Packet obj) {
private static Packet error() {
return new Packet(ERROR, new JsonPrimitive("parser error"));
}
public static class Encoder {
public Encoder() {}
public void encode(Packet obj, Callback callback) {
logger.fine(String.format("encoding packet %s", obj));
if (BINARY_EVENT == obj.type || ACK == obj.type) {
encodeAsBinary(obj, callback);
} else {
String encoding = encodeAsString(obj);
callback.call(new String[] {encoding});
}
}
private String encodeAsString(Packet obj) {
StringBuilder str = new StringBuilder();
boolean nsp = false;
str.append(obj.type);
if (BINARY_EVENT == obj.type || ACK == obj.type) {
str.append(obj.attachments);
str.append("-");
}
if (obj.nsp != null && !obj.nsp.isEmpty() && !"/".equals(obj.nsp)) {
nsp = true;
str.append(obj.nsp);
@@ -83,29 +114,72 @@ public class Parser {
return str.toString();
}
public static Packet decode(String str) {
private void encodeAsBinary(Packet obj, Callback callback) {
// TODO
}
public interface Callback {
public void call(String[] data);
}
}
public static class Decoder extends Emitter {
public static String EVENT_DECODED = "decoded";
private BinaryReconstructor reconstructor;
public Decoder() {
this.reconstructor = null;
}
public void add(String obj) {
Packet packet = decodeString(obj);
if (BINARY_EVENT == packet.type || ACK == packet.type) {
this.reconstructor = new BinaryReconstructor(packet);
if (this.reconstructor.reconPack.attachments == 0) {
this.emit(EVENT_DECODED, packet);
}
} else {
this.emit(EVENT_DECODED, packet);
}
}
public void add(byte[] obj) {
if (this.reconstructor == null) {
throw new RuntimeException("got binary data when not reconstructing a packet");
} else {
Packet packet = this.reconstructor.takeBinaryData(obj);
if (packet != null) {
this.reconstructor = null;
this.emit(EVENT_DECODED, packet);
}
}
}
public static Packet decodeString(String str) {
Packet p = new Packet();
int i = 0;
try {
p.type = Character.getNumericValue(str.charAt(0));
types.get(p.type);
} catch (IndexOutOfBoundsException e) {
return error();
if (p.type < 0 || p.type > types.length - 1) return error();
if (BINARY_EVENT == p.type || ACK == p.type) {
StringBuilder attachments = new StringBuilder();
while (str.charAt(++i) != '-') {
attachments.append(str.charAt(i));
}
p.attachments = Integer.parseInt(attachments.toString());
}
char next = Character.UNASSIGNED;
try {
next = str.charAt(i + 1);
} catch (IndexOutOfBoundsException e) {
// do nothing
}
if (next == '/') {
if (str.length() > i + 1 && '/' == str.charAt(i + 1)) {
StringBuilder nsp = new StringBuilder();
while (true) {
++i;
char c = str.charAt(i);
if (c == ',') break;
if (',' == c) break;
nsp.append(c);
if (i + 1 == str.length()) break;
}
@@ -114,18 +188,18 @@ public class Parser {
p.nsp = "/";
}
next = Character.UNASSIGNED;
Character next;
try {
next = str.charAt(i + 1);
} catch (IndexOutOfBoundsException e) {
// do nothing
next = Character.UNASSIGNED;
}
if (Character.getNumericValue(next) != -1) {
if (Character.UNASSIGNED != next && Character.getNumericValue(next) > -1) {
StringBuilder id = new StringBuilder();
while (true) {
++i;
Character c = str.charAt(i);
if (c == null || Character.getNumericValue(c) == -1) {
char c = str.charAt(i);
if (Character.getNumericValue(c) < 0) {
--i;
break;
}
@@ -137,7 +211,7 @@ public class Parser {
try {
str.charAt(++i);
p.data = parser.parse(str.substring(i));
p.data = json.parse(str.substring(i));
} catch (IndexOutOfBoundsException e) {
// do nothing
} catch (JsonParseException e) {
@@ -148,7 +222,38 @@ public class Parser {
return p;
}
private static Packet error() {
return new Packet(ERROR, new JsonPrimitive("parser error"));
public void destroy() {
if (this.reconstructor != null) {
this.reconstructor.finishReconstruction();
}
}
}
private static class BinaryReconstructor {
public Packet reconPack;
private List<byte[]> buffers;
BinaryReconstructor(Packet packet) {
this.reconPack = packet;
this.buffers = new ArrayList<byte[]>();
}
public Packet takeBinaryData(byte[] binData) {
this.buffers.add(binData);
if (this.buffers.size() == this.reconPack.attachments) {
// TODO:
}
return null;
}
public void finishReconstruction () {
this.reconPack = null;
this.buffers = new ArrayList<byte[]>();
}
}
}

View File

@@ -18,28 +18,33 @@ public class UrlTest {
@Test
public void parse() throws MalformedURLException, URISyntaxException {
URL url = Url.parse(new URI("http://woot.com/test"));
assertThat(url.getProtocol(), is("http"));
URL url = Url.parse(new URI("https://woot.com/test"));
assertThat(url.getProtocol(), is("https"));
assertThat(url.getHost(), is("woot.com"));
assertThat(url.getPath(), is("/test"));
}
@Test
public void parse_NoProtocol() throws MalformedURLException, URISyntaxException {
public void parseNoProtocol() throws MalformedURLException, URISyntaxException {
URL url = Url.parse(new URI("//localhost:3000"));
assertThat(url.getProtocol(), is("https"));
assertThat(url.getHost(), is("localhost"));
assertThat(url.getAuthority(), is("localhost:3000"));
assertThat(url.getPort(), is(3000));
}
@Test
public void parse_Namaspace() throws MalformedURLException, URISyntaxException {
public void parseNamespace() throws MalformedURLException, URISyntaxException {
assertThat(Url.parse(new URI("http://woot.com/woot")).getPath(), is("/woot"));
assertThat(Url.parse(new URI("http://google.com")).getPath(), is("/"));
assertThat(Url.parse(new URI("http://google.com/")).getPath(), is("/"));
}
@Test
public void parseDefaultPort() throws MalformedURLException, URISyntaxException {
assertThat(Url.parse(new URI("http://google.com:80/")).toString(), is("http://google.com/"));
assertThat(Url.parse(new URI("https://google.com:443/")).toString(), is("https://google.com/"));
}
@Test
public void extractId() throws MalformedURLException, URISyntaxException {
String id1 = Url.extractId(new URL("http://google.com:80/"));

View File

@@ -1,18 +1,20 @@
package com.github.nkzawa.socketio.parser;
import com.github.nkzawa.emitter.Emitter;
import com.google.gson.JsonParser;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import static com.github.nkzawa.socketio.parser.Parser.decode;
import static com.github.nkzawa.socketio.parser.Parser.encode;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@RunWith(JUnit4.class)
public class ParserTest {
private static Parser.Encoder encoder = new Parser.Encoder();
@Test
public void connect() {
Packet packet = new Packet(Parser.CONNECT);
@@ -49,10 +51,24 @@ public class ParserTest {
test(packet);
}
private void test(Packet packet) {
Packet _packet = decode(encode(packet));
assertThat(_packet.type, is(packet.type));
assertThat(_packet.data, is(packet.data));
assertThat(_packet.nsp, is(packet.nsp));
private void test(final Packet obj) {
encoder.encode(obj, new Parser.Encoder.Callback() {
@Override
public void call(String[] encodedPackets) {
Parser.Decoder decoder = new Parser.Decoder();
decoder.on(Parser.Decoder.EVENT_DECODED, new Emitter.Listener() {
@Override
public void call(Object... args) {
Packet packet = (Packet)args[0];
assertThat(packet.type, is(obj.type));
assertThat(packet.id, is(obj.id));
assertThat(packet.data, is(obj.data));
assertThat(packet.nsp, is(obj.nsp));
assertThat(packet.attachments, is(obj.attachments));
}
});
decoder.add(encodedPackets[0]);
}
});
}
}

View File

@@ -3,6 +3,6 @@
"version": "0.0.0",
"private": true,
"dependencies": {
"socket.io": "nkzawa/socket.io"
"socket.io": "LearnBoost/socket.io"
}
}