package com.github.nkzawa.engineio.client; import com.github.nkzawa.emitter.Emitter; import com.github.nkzawa.engineio.client.transports.Polling; import com.github.nkzawa.engineio.client.transports.PollingXHR; import com.github.nkzawa.engineio.client.transports.WebSocket; import com.github.nkzawa.engineio.parser.Packet; import com.github.nkzawa.engineio.parser.Parser; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import org.apache.http.Consts; import org.apache.http.NameValuePair; import org.apache.http.client.utils.URLEncodedUtils; import org.apache.http.message.BasicNameValuePair; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.concurrent.*; import java.util.logging.Logger; public abstract class Socket extends Emitter { private static final Logger logger = Logger.getLogger("engine.io-client:socket"); private static final Runnable noop = new Runnable() { @Override public void run() {} }; public static final Sockets sockets = new Sockets(); public static final int protocol = Parser.protocol; private boolean secure; private boolean upgrade; private boolean timestampRequests; private boolean upgrading; private int port; private int policyPort; private int prevBufferLen; private long pingInterval; private long pingTimeout; private String id; private String hostname; private String path; private String timestampParam; private String readyState = ""; private List transports; private List upgrades; private List query; private ConcurrentLinkedQueue writeBuffer = new ConcurrentLinkedQueue(); private ConcurrentLinkedQueue callbackBuffer = new ConcurrentLinkedQueue(); private Transport transport; private Future pingTimeoutTimer; private Future pingIntervalTimer; private ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(); public Socket(String uri) throws URISyntaxException { this(uri, null); } public Socket(URI uri) { this(uri, null); } public Socket(String uri, Options opts) throws URISyntaxException { this(new URI(uri), opts); } public Socket(URI uri, Options opts) { this(Options.fromURI(uri, opts)); } public Socket(Options opts) { if (opts.host != null) { String[] pieces = opts.host.split(":"); opts.hostname = pieces[0]; if (pieces.length > 1) { opts.port = Integer.parseInt(pieces[pieces.length - 1]); } } this.secure = opts.secure; this.hostname = opts.hostname != null ? opts.hostname : "localhost"; this.port = opts.port != 0 ? opts.port : (this.secure ? 443 : 80); this.query = URLEncodedUtils.parse(opts.query, Consts.UTF_8); this.upgrade = opts.upgrade; this.path = (opts.path != null ? opts.path : "/engine.io").replaceAll("/$", "") + "/"; this.timestampParam = opts.timestampParam != null ? opts.timestampParam : "t"; this.timestampRequests = opts.timestampRequests; this.transports = new ArrayList(Arrays.asList( opts.transports != null ? opts.transports : new String[] {"polling", "websocket"})); this.policyPort = opts.policyPort != 0 ? opts.policyPort : 843; Socket.sockets.add(this); Socket.sockets.evs.emit("add", this); } public Socket open() { this.readyState = "opening"; Transport transport = this.createTransport(this.transports.get(0)); this.setTransport(transport); transport.open(); return this; } private Transport createTransport(String name) { logger.info(String.format("creating transport '%s'", name)); List query = new ArrayList(this.query); query.add(new BasicNameValuePair("EIO", String.valueOf(Parser.protocol))); query.add(new BasicNameValuePair("transport", name)); if (this.id != null) { query.add(new BasicNameValuePair("sid", this.id)); } Transport.Options opts = new Transport.Options(); opts.hostname = this.hostname; opts.port = this.port; opts.secure = this.secure; opts.path = this.path; opts.query = query; opts.timestampRequests = this.timestampRequests; opts.timestampParam = this.timestampParam; opts.policyPort = this.policyPort; if ("websocket".equals(name)) { return new WebSocket(opts); } else if ("polling".equals(name)) { return new PollingXHR(opts); } throw new RuntimeException(); } private void setTransport(Transport transport) { final Socket self = this; if (this.transport != null) { logger.info("clearing existing transport"); this.transport.off(); } this.transport = transport; transport.on("drain", new Listener() { @Override public void call(Object... args) { self.onDrain(); } }).on("packet", new Listener() { @Override public void call(Object... args) { self.onPacket(args.length > 0 ? (Packet) args[0] : null); } }).on("error", new Listener() { @Override public void call(Object... args) { self.onError(args.length > 0 ? (Exception) args[0] : null); } }).on("close", new Listener() { @Override public void call(Object... args) { self.onClose("transport close"); } }); } private void probe(final String name) { logger.info(String.format("probing transport '%s'", name)); final Transport[] transport = new Transport[] {this.createTransport(name)}; final boolean[] failed = new boolean[] {false}; final Socket self = this; final Listener onerror = new Listener() { @Override public void call(Object... args) { if (failed[0]) return; failed[0] = true; // TODO: handle error Exception err = args.length > 0 ? (Exception)args[0] : null; Transport.TransportException error = new Transport.TransportException("probe error", err); error.transport = transport[0].name; transport[0].close(); transport[0] = null; logger.info(String.format("probing transport '%s' failed because of error: %s", name, err)); self.emit("error", error); } }; transport[0].once("open", new Listener() { @Override public void call(Object... args) { if (failed[0]) return; logger.info(String.format("probe transport '%s' opened", name)); Packet packet = new Packet("ping", "probe"); transport[0].send(new Packet[] {packet}); transport[0].once("packet", new Listener() { @Override public void call(Object... args) { if (failed[0]) return; Packet msg = (Packet)args[0]; if ("pong".equals(msg.type) && "probe".equals(msg.data)) { logger.info(String.format("probe transport '%s' pong", name)); self.upgrading = true; self.emit("upgrading", transport[0]); logger.info(String.format("pausing current transport '%s'", self.transport.name)); ((Polling)self.transport).pause(new Runnable() { @Override public void run() { if (failed[0]) return; if ("close".equals(self.readyState) || "closing".equals(self.readyState)) { return; } logger.info("changing transport and sending upgrade packet"); transport[0].off("error", onerror); self.emit("upgrade", transport); self.setTransport(transport[0]); Packet packet = new Packet("upgrade", null); transport[0].send(new Packet[]{packet}); transport[0] = null; self.upgrading = false; self.flush(); } }); } else { logger.info(String.format("probe transport '%s' failed", name)); Transport.TransportException err = new Transport.TransportException("probe error"); err.transport = transport[0].name; self.emit("error", err); } } }); } }); transport[0].once("error", onerror); this.once("close", new Listener() { @Override public void call(Object... args) { if (transport[0] != null) { logger.info("socket closed prematurely - aborting probe"); failed[0] = true; transport[0].close(); transport[0] = null; } } }); this.once("upgrading", new Listener() { @Override public void call(Object... args) { Transport to = (Transport)args[0]; if (transport[0] != null && !to.name.equals(transport[0].name)) { logger.info(String.format("'%s' works - aborting '%s'", to.name, transport[0].name)); transport[0].close(); transport[0] = null; } } }); transport[0].open(); } private void onOpen() { logger.info("socket open"); this.readyState = "open"; this.emit("open"); this.onopen(); this.flush(); if ("open".equals(this.readyState) && this.upgrade && this.transport instanceof Polling) { logger.info("starting upgrade probes"); for (String upgrade: this.upgrades) { this.probe(upgrade); } } } private void onPacket(Packet packet) { if ("opening".equals(this.readyState) || "open".equals(this.readyState)) { logger.info(String.format("socket received: type '%s', data '%s'", packet.type, packet.data)); this.emit("packet", packet); this.emit("heartbeat"); if ("open".equals(packet.type)) { this.onHandshake(new JsonParser().parse(packet.data).getAsJsonObject()); } else if ("pong".equals(packet.type)) { this.ping(); } else if ("error".equals(packet.type)) { // TODO: handle error Exception err = new Exception("server error"); // err.code = packet.data; this.emit("error", err); } else if ("message".equals(packet.type)) { this.emit("data", packet.data); this.emit("message", packet.data); this.onmessage(packet.data); } } else { logger.info(String.format("packet received with socket readyState '%s'", this.readyState)); } } private void onHandshake(JsonObject data) { this.emit("handshake", data); this.id = data.get("sid").getAsString(); Iterator i = this.transport.query.iterator(); while (i.hasNext()) { NameValuePair pair = i.next(); if ("sid".equals(pair.getName())) { i.remove(); } } this.transport.query.add(new BasicNameValuePair("sid", data.get("sid").getAsString())); List upgrades = new ArrayList(); for (JsonElement upgrade : data.get("upgrades").getAsJsonArray()) { upgrades.add(upgrade.getAsString()); } this.upgrades = this.filterUpgrades(upgrades); this.pingInterval = data.get("pingInterval").getAsLong(); this.pingTimeout = data.get("pingTimeout").getAsLong(); this.onOpen(); this.ping(); this.off("heartbear", this.onHeartbeatAsListener); this.on("heartbear", this.onHeartbeatAsListener); } private final Listener onHeartbeatAsListener = new Listener() { @Override public void call(Object... args) { Socket.this.onHeartbeat(args.length > 0 ? (Long)args[0]: 0); } }; private synchronized void onHeartbeat(long timeout) { if (this.pingTimeoutTimer != null) { pingTimeoutTimer.cancel(true); } if (timeout <= 0) { timeout = this.pingInterval + this.pingTimeout; } final Socket self = this; this.pingTimeoutTimer = this.heartbeatScheduler.schedule(new Runnable() { @Override public void run() { if ("closed".equals(self.readyState)) return; self.onClose("ping timeout"); } }, timeout, TimeUnit.MILLISECONDS); } private synchronized void ping() { if (this.pingIntervalTimer != null) { pingIntervalTimer.cancel(true); } final Socket self = this; this.pingIntervalTimer = this.heartbeatScheduler.schedule(new Runnable() { @Override public void run() { logger.info(String.format("writing ping packet - expecting pong within %sms", self.pingTimeout)); self.sendPacket("ping"); self.onHeartbeat(self.pingTimeout); } }, this.pingInterval, TimeUnit.MILLISECONDS); } private void onDrain() { this.callbacks(); for (int i = 0; i < this.prevBufferLen; i++) { this.writeBuffer.poll(); this.callbackBuffer.poll(); } this.prevBufferLen = 0; if (this.writeBuffer.size() == 0) { this.emit("drain"); } else { this.flush(); } } private void callbacks() { Iterator iter = this.callbackBuffer.iterator(); for (int i = 0; i < this.prevBufferLen && iter.hasNext(); i++) { Runnable callback = iter.next(); if (callback != null) { callback.run(); } } } private void flush() { if (!"closed".equals(this.readyState) && this.transport.writable && !this.upgrading && this.writeBuffer.size() != 0) { logger.info(String.format("flushing %d packets in socket", this.writeBuffer.size())); this.prevBufferLen = this.writeBuffer.size(); this.transport.send(this.writeBuffer.toArray(new Packet[0])); this.emit("flush"); } } public void write(String msg) { this.write(msg, null); } public void write(String msg, Runnable fn) { this.send(msg, fn); } public void send(String msg) { this.send(msg, null); } public void send(String msg, Runnable fn) { this.sendPacket("message", msg, fn); } private void sendPacket(String type) { this.sendPacket(type, null, null); } private void sendPacket(String type, String data, Runnable fn) { if (fn == null) { // ConcurrentLinkedList does not permit `null`. fn = noop; } Packet packet = new Packet(type, data); this.emit("packetCreate", packet); this.writeBuffer.offer(packet); this.callbackBuffer.offer(fn); this.flush(); } public Socket close() { if ("opening".equals(this.readyState) || "open".equals(this.readyState)) { this.onClose("forced close"); logger.info("socket closing - telling transport to close"); this.transport.close(); this.transport.off(); } return this; } private void onError(Exception err) { logger.info(String.format("socket error %s", err)); this.emit("error", err); this.onClose("transport error", err); } private void onClose(String reason) { this.onClose(reason, null); } private void onClose(String reason, Exception desc) { if ("opening".equals(this.readyState) || "open".equals(this.readyState)) { logger.info(String.format("socket close with reason: %s", reason)); if (this.pingIntervalTimer != null) { this.pingIntervalTimer.cancel(true); } if (this.pingTimeoutTimer != null) { this.pingTimeoutTimer.cancel(true); } this.readyState = "closed"; this.emit("close", reason, desc); this.onclose(); // TODO: // clean buffer in next tick, so developers can still // gra the buffers on `close` event // setTimeout(function() {} // self.writeBuffer = []; // self.callbackBuffer = []; // ); this.writeBuffer.clear(); this.callbackBuffer.clear(); this.id = null; } } private List filterUpgrades(List upgrades) { List filteredUpgrades = new ArrayList(); for (String upgrade : upgrades) { if (this.transports.contains(upgrade)) { filteredUpgrades.add(upgrade); } } return filteredUpgrades; } public abstract void onopen(); public abstract void onmessage(String data); public abstract void onclose(); public static class Options extends Transport.Options { public String host; public String query; public String[] transports; public boolean upgrade = true; private static Options fromURI(URI uri, Options opts) { if (opts == null) { opts = new Options(); } opts.host = uri.getHost(); opts.secure = "https".equals(uri.getScheme()) || "wss".equals(uri.getScheme()); opts.port = uri.getPort(); String query = uri.getQuery(); if (query != null) { opts.query = uri.getQuery(); } return opts; } } public static class Sockets extends ConcurrentLinkedQueue { public Emitter evs = new Emitter(); } }