diff --git a/src/main/java/com/github/nkzawa/engineio/client/Socket.java b/src/main/java/com/github/nkzawa/engineio/client/Socket.java index 223917f..753412b 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/Socket.java +++ b/src/main/java/com/github/nkzawa/engineio/client/Socket.java @@ -31,6 +31,9 @@ public abstract class Socket extends Emitter { put(CLOSED, "closed"); }}; + public static final String POLLING = "polling"; + public static final String WEBSOCKET = "websocket"; + public static final String EVENT_OPEN = "open"; public static final String EVENT_CLOSE = "close"; public static final String EVENT_HANDSHAKE = "handshake"; @@ -114,7 +117,7 @@ public abstract class Socket extends Emitter { this.timestampParam = opts.timestampParam != null ? opts.timestampParam : "t"; this.timestampRequests = opts.timestampRequests; this.transports = new ArrayList(Arrays.asList( - opts.transports != null ? opts.transports : new String[] {"polling", "websocket"})); + opts.transports != null ? opts.transports : new String[] {POLLING, WEBSOCKET})); this.policyPort = opts.policyPort != 0 ? opts.policyPort : 843; Socket.sockets.add(this); @@ -148,9 +151,9 @@ public abstract class Socket extends Emitter { opts.timestampParam = this.timestampParam; opts.policyPort = this.policyPort; - if ("websocket".equals(name)) { + if (WEBSOCKET.equals(name)) { return new WebSocket(opts); - } else if ("polling".equals(name)) { + } else if (POLLING.equals(name)) { return new PollingXHR(opts); } @@ -167,22 +170,22 @@ public abstract class Socket extends Emitter { this.transport = transport; - transport.on("drain", new Listener() { + transport.on(Transport.EVENT_DRAIN, new Listener() { @Override public void call(Object... args) { self.onDrain(); } - }).on("packet", new Listener() { + }).on(Transport.EVENT_PACKET, new Listener() { @Override public void call(Object... args) { self.onPacket(args.length > 0 ? (Packet) args[0] : null); } - }).on("error", new Listener() { + }).on(Transport.EVENT_ERROR, new Listener() { @Override public void call(Object... args) { self.onError(args.length > 0 ? (Exception) args[0] : null); } - }).on("close", new Listener() { + }).on(Transport.EVENT_CLOSE, new Listener() { @Override public void call(Object... args) { self.onClose("transport close"); @@ -215,20 +218,20 @@ public abstract class Socket extends Emitter { } }; - transport[0].once("open", new Listener() { + transport[0].once(Transport.EVENT_OPEN, new Listener() { @Override public void call(Object... args) { if (failed[0]) return; logger.info(String.format("probe transport '%s' opened", name)); - Packet packet = new Packet("ping", "probe"); + Packet packet = new Packet(Packet.PING, "probe"); transport[0].send(new Packet[] {packet}); - transport[0].once("packet", new Listener() { + transport[0].once(Transport.EVENT_PACKET, new Listener() { @Override public void call(Object... args) { if (failed[0]) return; Packet msg = (Packet)args[0]; - if ("pong".equals(msg.type) && "probe".equals(msg.data)) { + if (Packet.PONG.equals(msg.type) && "probe".equals(msg.data)) { logger.info(String.format("probe transport '%s' pong", name)); self.upgrading = true; self.emit(EVENT_UPGRADING, transport[0]); @@ -243,10 +246,10 @@ public abstract class Socket extends Emitter { } logger.info("changing transport and sending upgrade packet"); - transport[0].off("error", onerror); + transport[0].off(Transport.EVENT_ERROR, onerror); self.emit(EVENT_UPGRADE, transport); self.setTransport(transport[0]); - Packet packet = new Packet("upgrade", null); + Packet packet = new Packet(Packet.UPGRADE, null); transport[0].send(new Packet[]{packet}); transport[0] = null; self.upgrading = false; @@ -264,9 +267,9 @@ public abstract class Socket extends Emitter { } }); - transport[0].once("error", onerror); + transport[0].once(Transport.EVENT_ERROR, onerror); - this.once("close", new Listener() { + this.once(EVENT_CLOSE, new Listener() { @Override public void call(Object... args) { if (transport[0] != null) { @@ -278,7 +281,7 @@ public abstract class Socket extends Emitter { } }); - this.once("upgrading", new Listener() { + this.once(EVENT_UPGRADING, new Listener() { @Override public void call(Object... args) { Transport to = (Transport)args[0]; @@ -315,16 +318,16 @@ public abstract class Socket extends Emitter { this.emit(EVENT_PACKET, packet); this.emit(EVENT_HEARTBEAT); - if ("open".equals(packet.type)) { + if (Packet.OPEN.equals(packet.type)) { this.onHandshake(new JsonParser().parse(packet.data).getAsJsonObject()); - } else if ("pong".equals(packet.type)) { + } else if (Packet.PONG.equals(packet.type)) { this.ping(); - } else if ("error".equals(packet.type)) { + } else if (Packet.ERROR.equals(packet.type)) { // TODO: handle error EngineIOException err = new EngineIOException("server error"); //err.code = packet.data; this.emit(EVENT_ERROR, err); - } else if ("message".equals(packet.type)) { + } else if (Packet.MESSAGE.equals(packet.type)) { this.emit(EVENT_DATA, packet.data); this.emit(EVENT_MESSAGE, packet.data); this.onmessage(packet.data); @@ -390,7 +393,7 @@ public abstract class Socket extends Emitter { @Override public void run() { logger.info(String.format("writing ping packet - expecting pong within %sms", self.pingTimeout)); - self.sendPacket("ping"); + self.sendPacket(Packet.PING); self.onHeartbeat(self.pingTimeout); } }, this.pingInterval, TimeUnit.MILLISECONDS); @@ -444,7 +447,7 @@ public abstract class Socket extends Emitter { } public void send(String msg, Runnable fn) { - this.sendPacket("message", msg, fn); + this.sendPacket(Packet.MESSAGE, msg, fn); } private void sendPacket(String type) { diff --git a/src/main/java/com/github/nkzawa/engineio/client/Transport.java b/src/main/java/com/github/nkzawa/engineio/client/Transport.java index 5cc33a5..4140d85 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/Transport.java +++ b/src/main/java/com/github/nkzawa/engineio/client/Transport.java @@ -5,10 +5,28 @@ import com.github.nkzawa.emitter.Emitter; import com.github.nkzawa.engineio.parser.Packet; import com.github.nkzawa.engineio.parser.Parser; +import java.util.HashMap; import java.util.Map; public abstract class Transport extends Emitter { + protected static final int OPENING = 0; + protected static final int OPEN = 1; + protected static final int CLOSED = 2; + protected static final int PAUSED = 3; + protected static final Map STATE_MAP = new HashMap() {{ + put(OPENING, "opening"); + put(OPEN, "open"); + put(CLOSED, "closed"); + put(PAUSED, "paused"); + }}; + + public static final String EVENT_OPEN = "open"; + public static final String EVENT_CLOSE = "close"; + public static final String EVENT_PACKET = "packet"; + public static final String EVENT_DRAIN = "drain"; + public static final String EVENT_ERROR = "error"; + public boolean writable; public String name; public Map query; @@ -16,10 +34,10 @@ public abstract class Transport extends Emitter { protected boolean secure; protected boolean timestampRequests; protected int port; + protected int readyState = -1; protected String path; protected String hostname; protected String timestampParam; - protected String readyState = ""; public Transport(Options opts) { @@ -35,20 +53,20 @@ public abstract class Transport extends Emitter { protected Transport onError(String msg, Exception desc) { // TODO: handle error Exception err = new EngineIOException(msg, desc); - this.emit("error", err); + this.emit(EVENT_ERROR, err); return this; } public Emitter open() { - if ("closed".equals(this.readyState) || "".equals(this.readyState)) { - this.readyState = "opening"; + if (this.readyState == CLOSED || this.readyState < 0) { + this.readyState = OPENING; this.doOpen(); } return this; } public Transport close() { - if ("opening".equals(this.readyState) || "open".equals(this.readyState)) { + if (this.readyState == OPENING || this.readyState == OPEN) { this.doClose(); this.onClose(); } @@ -56,7 +74,7 @@ public abstract class Transport extends Emitter { } public void send(Packet[] packets) { - if ("open".equals(this.readyState)) { + if (this.readyState == OPEN) { this.write(packets); } else { throw new RuntimeException("Transport not open"); @@ -64,9 +82,9 @@ public abstract class Transport extends Emitter { } protected void onOpen() { - this.readyState = "open"; + this.readyState = OPEN; this.writable = true; - this.emit("open"); + this.emit(EVENT_OPEN); } protected void onData(String data) { @@ -74,12 +92,12 @@ public abstract class Transport extends Emitter { } protected void onPacket(Packet packet) { - this.emit("packet", packet); + this.emit(EVENT_PACKET, packet); } protected void onClose() { - this.readyState = "closed"; - this.emit("close"); + this.readyState = CLOSED; + this.emit(EVENT_CLOSE); } abstract protected void write(Packet[] packets); diff --git a/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java b/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java index 63f77b1..4ef5724 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java +++ b/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java @@ -1,6 +1,7 @@ package com.github.nkzawa.engineio.client.transports; +import com.github.nkzawa.engineio.client.Socket; import com.github.nkzawa.engineio.client.Transport; import com.github.nkzawa.engineio.client.Util; import com.github.nkzawa.engineio.parser.Packet; @@ -15,12 +16,15 @@ abstract public class Polling extends Transport { private static final Logger logger = Logger.getLogger("engine.io-client:polling"); + public static final String EVENT_POLL = "poll"; + public static final String EVENT_POLL_COMPLETE = "pollComplete"; + private boolean polling; public Polling(Options opts) { super(opts); - this.name = "polling"; + this.name = Socket.POLLING; } protected void doOpen() { @@ -31,13 +35,13 @@ abstract public class Polling extends Transport { int pending = 0; final Polling self = this; - this.readyState = "paused"; + this.readyState = PAUSED; final Runnable pause = new Runnable() { @Override public void run() { logger.info("paused"); - self.readyState = "paused"; + self.readyState = PAUSED; onPause.run(); } }; @@ -48,7 +52,7 @@ abstract public class Polling extends Transport { if (this.polling) { logger.info("we are currently polling - waiting to pause"); total[0]++; - this.once("pollComplete", new Listener() { + this.once(EVENT_POLL_COMPLETE, new Listener() { @Override public void call(Object... args) { logger.info("pre-pause polling complete"); @@ -62,7 +66,7 @@ abstract public class Polling extends Transport { if (!this.writable) { logger.info("we are currently writing - waiting to pause"); total[0]++; - this.once("drain", new Listener() { + this.once(EVENT_DRAIN, new Listener() { @Override public void call(Object... args) { logger.info("pre-pause writing complete"); @@ -81,7 +85,7 @@ abstract public class Polling extends Transport { logger.info("polling"); this.polling = true; this.doPoll(); - this.emit("poll"); + this.emit(EVENT_POLL); } protected void onData(String data) { @@ -91,11 +95,11 @@ abstract public class Polling extends Transport { Parser.decodePayload(data, new Parser.DecodePayloadCallback() { @Override public boolean call(Packet packet, int index, int total) { - if ("opening".equals(self.readyState)) { + if (self.readyState == OPENING) { self.onOpen(); } - if ("close".equals(packet.type)) { + if (Packet.CLOSE.equals(packet.type)) { self.onClose(); return false; } @@ -105,21 +109,21 @@ abstract public class Polling extends Transport { } }); - if (!"closed".equals(this.readyState)) { + if (this.readyState != CLOSED) { this.polling = false; - this.emit("pollComplete"); + this.emit(EVENT_POLL_COMPLETE); - if ("open".equals(this.readyState)) { + if (this.readyState == OPEN) { this.poll(); } else { - logger.info(String.format("ignoring poll - transport state '%s'", this.readyState)); + logger.info(String.format("ignoring poll - transport state '%s'", STATE_MAP.get(this.readyState))); } } } protected void doClose() { logger.info("sending close packet"); - this.send(new Packet[] {new Packet("close", null)}); + this.send(new Packet[] {new Packet(Packet.CLOSE, null)}); } protected void write(Packet[] packets) { @@ -129,7 +133,7 @@ abstract public class Polling extends Transport { @Override public void run() { self.writable = true; - self.emit("drain"); + self.emit(EVENT_DRAIN); } }); } diff --git a/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java b/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java index f7d8a31..81d6dfb 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java +++ b/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java @@ -39,13 +39,13 @@ public class PollingXHR extends Polling { opts.data = data; Request req = this.request(opts); final PollingXHR self = this; - req.on("success", new Listener() { + req.on(Request.EVENT_SUCCESS, new Listener() { @Override public void call(Object... args) { fn.run(); } }); - req.on("error", new Listener() { + req.on(Request.EVENT_ERROR, new Listener() { @Override public void call(Object... args) { Exception err = args.length > 0 && args[0] instanceof Exception ? (Exception)args[0] : null; @@ -60,14 +60,14 @@ public class PollingXHR extends Polling { logger.info("xhr poll"); Request req = this.request(); final PollingXHR self = this; - req.on("data", new Listener() { + req.on(Request.EVENT_DATA, new Listener() { @Override public void call(Object... args) { String data = args.length > 0 ? (String)args[0] : null; self.onData(data); } }); - req.on("error", new Listener() { + req.on(Request.EVENT_ERROR, new Listener() { @Override public void call(Object... args) { Exception err = args.length > 0 && args[0] instanceof Exception ? (Exception)args[0] : null; @@ -80,6 +80,10 @@ public class PollingXHR extends Polling { private static class Request extends Emitter { + private static final String EVENT_SUCCESS = "success"; + private static final String EVENT_DATA = "data"; + private static final String EVENT_ERROR = "error"; + private static final ExecutorService xhrService = Executors.newCachedThreadPool(); String method; @@ -146,17 +150,17 @@ public class PollingXHR extends Polling { } public void onSuccess() { - this.emit("success"); + this.emit(EVENT_SUCCESS); this.cleanup(); } public void onData(String data) { - this.emit("data", data); + this.emit(EVENT_DATA, data); this.onSuccess(); } public void onError(Exception err) { - this.emit("error", err); + this.emit(EVENT_ERROR, err); this.cleanup(); } diff --git a/src/main/java/com/github/nkzawa/engineio/client/transports/WebSocket.java b/src/main/java/com/github/nkzawa/engineio/client/transports/WebSocket.java index 19fb724..a489b89 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/transports/WebSocket.java +++ b/src/main/java/com/github/nkzawa/engineio/client/transports/WebSocket.java @@ -1,6 +1,7 @@ package com.github.nkzawa.engineio.client.transports; +import com.github.nkzawa.engineio.client.Socket; import com.github.nkzawa.engineio.client.Transport; import com.github.nkzawa.engineio.client.Util; import com.github.nkzawa.engineio.parser.Packet; @@ -29,7 +30,7 @@ public class WebSocket extends Transport { public WebSocket(Options opts) { super(opts); - this.name = "websocket"; + this.name = Socket.WEBSOCKET; } protected void doOpen() { @@ -74,7 +75,7 @@ public class WebSocket extends Transport { @Override public void run() { self.writable = true; - self.emit("drain"); + self.emit(EVENT_DRAIN); } }; diff --git a/src/main/java/com/github/nkzawa/engineio/parser/Packet.java b/src/main/java/com/github/nkzawa/engineio/parser/Packet.java index 6407d92..71126d8 100644 --- a/src/main/java/com/github/nkzawa/engineio/parser/Packet.java +++ b/src/main/java/com/github/nkzawa/engineio/parser/Packet.java @@ -3,6 +3,15 @@ package com.github.nkzawa.engineio.parser; public class Packet { + static final public String OPEN = "open"; + static final public String CLOSE = "close"; + static final public String PING = "ping"; + static final public String PONG = "pong"; + static final public String UPGRADE = "upgrade"; + static final public String MESSAGE = "message"; + static final public String NOOP = "noop"; + static final public String ERROR = "error"; + public String type; public String data; diff --git a/src/main/java/com/github/nkzawa/engineio/parser/Parser.java b/src/main/java/com/github/nkzawa/engineio/parser/Parser.java index 5d246e7..3fee73a 100644 --- a/src/main/java/com/github/nkzawa/engineio/parser/Parser.java +++ b/src/main/java/com/github/nkzawa/engineio/parser/Parser.java @@ -8,13 +8,13 @@ public class Parser { public static final int protocol = 2; public static final Map packets = new HashMap() {{ - put("open", 0); - put("close", 1); - put("ping", 2); - put("pong", 3); - put("message", 4); - put("upgrade", 5); - put("noop", 6); + put(Packet.OPEN, 0); + put(Packet.CLOSE, 1); + put(Packet.PING, 2); + put(Packet.PONG, 3); + put(Packet.MESSAGE, 4); + put(Packet.UPGRADE, 5); + put(Packet.NOOP, 6); }}; public static final Map bipackets = new HashMap(); static { @@ -23,7 +23,7 @@ public class Parser { } } - private static Packet err = new Packet("error", "parser error"); + private static Packet err = new Packet(Packet.ERROR, "parser error"); private Parser() {}