diff --git a/src/main/java/com/github/nkzawa/emitter/Emitter.java b/src/main/java/com/github/nkzawa/emitter/Emitter.java index 7a65617..80ce94b 100644 --- a/src/main/java/com/github/nkzawa/emitter/Emitter.java +++ b/src/main/java/com/github/nkzawa/emitter/Emitter.java @@ -9,7 +9,7 @@ import java.util.concurrent.ConcurrentMap; /** - * The event emitter which is ported from the JavaScript module. + * The event emitter which is ported from the JavaScript module. This class is thread-safe. * * @see https://github.com/component/emitter */ diff --git a/src/main/java/com/github/nkzawa/engineio/client/EventThread.java b/src/main/java/com/github/nkzawa/engineio/client/EventThread.java new file mode 100644 index 0000000..a93186d --- /dev/null +++ b/src/main/java/com/github/nkzawa/engineio/client/EventThread.java @@ -0,0 +1,60 @@ +package com.github.nkzawa.engineio.client; + + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + + +/** + * The main thread for Engine.IO Client. + */ +class EventThread extends Thread { + + private static final ExecutorService service = Executors.newSingleThreadExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable runnable) { + thread = new EventThread(runnable); + return thread; + } + }); + + private static volatile EventThread thread; + + + private EventThread(Runnable runnable) { + super(runnable); + } + + /** + * check if the current thread is EventThread. + * + * @return true if the current thread is EventThread. + */ + public static boolean isCurrent() { + return currentThread() == thread; + } + + /** + * Executes a task in EventThread. + * + * @param task + */ + public static void exec(Runnable task) { + if (isCurrent()) { + task.run(); + } else { + service.execute(task); + } + } + + /** + * Executes a task on the next loop in EventThread. + * + * @param task + */ + public static void nextTick(Runnable task) { + service.execute(task); + } + +} diff --git a/src/main/java/com/github/nkzawa/engineio/client/Socket.java b/src/main/java/com/github/nkzawa/engineio/client/Socket.java index 2e6ea65..241a82d 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/Socket.java +++ b/src/main/java/com/github/nkzawa/engineio/client/Socket.java @@ -108,8 +108,8 @@ public abstract class Socket extends Emitter { private List transports; private List upgrades; private Map query; - private ConcurrentLinkedQueue writeBuffer = new ConcurrentLinkedQueue(); - private ConcurrentLinkedQueue callbackBuffer = new ConcurrentLinkedQueue(); + private Queue writeBuffer = new LinkedList(); + private Queue callbackBuffer = new LinkedList(); private Transport transport; private Future pingTimeoutTimer; private Future pingIntervalTimer; @@ -176,10 +176,15 @@ public abstract class Socket extends Emitter { * Connects the client. */ public void open() { - this.readyState = OPENING; - Transport transport = this.createTransport(this.transports.get(0)); - this.setTransport(transport); - transport.open(); + EventThread.exec(new Runnable() { + @Override + public void run() { + Socket.this.readyState = OPENING; + Transport transport = Socket.this.createTransport(Socket.this.transports.get(0)); + Socket.this.setTransport(transport); + transport.open(); + } + }); } private Transport createTransport(String name) { @@ -409,7 +414,7 @@ public abstract class Socket extends Emitter { } }; - private synchronized void onHeartbeat(long timeout) { + private void onHeartbeat(long timeout) { if (this.pingTimeoutTimer != null) { pingTimeoutTimer.cancel(true); } @@ -422,13 +427,18 @@ public abstract class Socket extends Emitter { this.pingTimeoutTimer = this.heartbeatScheduler.schedule(new Runnable() { @Override public void run() { - if (self.readyState == CLOSED) return; - self.onClose("ping timeout"); + EventThread.exec(new Runnable() { + @Override + public void run() { + if (self.readyState == CLOSED) return; + self.onClose("ping timeout"); + } + }); } }, timeout, TimeUnit.MILLISECONDS); } - private synchronized void ping() { + private void ping() { if (this.pingIntervalTimer != null) { pingIntervalTimer.cancel(true); } @@ -437,9 +447,14 @@ public abstract class Socket extends Emitter { this.pingIntervalTimer = this.heartbeatScheduler.schedule(new Runnable() { @Override public void run() { - logger.fine(String.format("writing ping packet - expecting pong within %sms", self.pingTimeout)); - self.sendPacket(Packet.PING); - self.onHeartbeat(self.pingTimeout); + EventThread.exec(new Runnable() { + @Override + public void run() { + logger.fine(String.format("writing ping packet - expecting pong within %sms", self.pingTimeout)); + self.sendPacket(Packet.PING); + self.onHeartbeat(self.pingTimeout); + } + }); } }, this.pingInterval, TimeUnit.MILLISECONDS); } @@ -502,8 +517,13 @@ public abstract class Socket extends Emitter { * @param msg * @param fn callback to be called on drain */ - public void send(String msg, Runnable fn) { - this.sendPacket(Packet.MESSAGE, msg, fn); + public void send(final String msg, final Runnable fn) { + EventThread.exec(new Runnable() { + @Override + public void run() { + Socket.this.sendPacket(Packet.MESSAGE, msg, fn); + } + }); } private void sendPacket(String type) { @@ -529,13 +549,18 @@ public abstract class Socket extends Emitter { * @return a reference to to this object. */ public Socket close() { - if (this.readyState == OPENING || this.readyState == OPEN) { - this.onClose("forced close"); - logger.fine("socket closing - telling transport to close"); - this.transport.close(); - this.transport.off(); - } + EventThread.exec(new Runnable() { + @Override + public void run() { + if (Socket.this.readyState == OPENING || Socket.this.readyState == OPEN) { + Socket.this.onClose("forced close"); + logger.fine("socket closing - telling transport to close"); + Socket.this.transport.close(); + Socket.this.transport.off(); + } + } + }); return this; } @@ -558,18 +583,16 @@ public abstract class Socket extends Emitter { if (this.pingTimeoutTimer != null) { this.pingTimeoutTimer.cancel(true); } + EventThread.nextTick(new Runnable() { + @Override + public void run() { + Socket.this.writeBuffer.clear(); + Socket.this.callbackBuffer.clear(); + } + }); this.readyState = CLOSED; this.emit(EVENT_CLOSE, reason, desc); this.onclose(); - // TODO: - // clean buffer in next tick, so developers can still - // grab the buffers on `close` event - // setTimeout(function() {} - // self.writeBuffer = []; - // self.callbackBuffer = []; - // ); - this.writeBuffer.clear(); - this.callbackBuffer.clear(); this.id = null; } } diff --git a/src/main/java/com/github/nkzawa/engineio/client/Transport.java b/src/main/java/com/github/nkzawa/engineio/client/Transport.java index 809aff1..bb241f6 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/Transport.java +++ b/src/main/java/com/github/nkzawa/engineio/client/Transport.java @@ -58,27 +58,42 @@ public abstract class Transport extends Emitter { } public Transport open() { - if (this.readyState == CLOSED || this.readyState < 0) { - this.readyState = OPENING; - this.doOpen(); - } + exec(new Runnable() { + @Override + public void run() { + if (Transport.this.readyState == CLOSED || Transport.this.readyState < 0) { + Transport.this.readyState = OPENING; + Transport.this.doOpen(); + } + } + }); return this; } public Transport close() { - if (this.readyState == OPENING || this.readyState == OPEN) { - this.doClose(); - this.onClose(); - } + exec(new Runnable() { + @Override + public void run() { + if (Transport.this.readyState == OPENING || Transport.this.readyState == OPEN) { + Transport.this.doClose(); + Transport.this.onClose(); + } + } + }); return this; } - public void send(Packet[] packets) { - if (this.readyState == OPEN) { - this.write(packets); - } else { - throw new RuntimeException("Transport not open"); - } + public void send(final Packet[] packets) { + exec(new Runnable() { + @Override + public void run() { + if (Transport.this.readyState == OPEN) { + Transport.this.write(packets); + } else { + throw new RuntimeException("Transport not open"); + } + } + }); } protected void onOpen() { @@ -106,6 +121,14 @@ public abstract class Transport extends Emitter { abstract protected void doClose(); + protected static void exec(Runnable task) { + EventThread.exec(task); + } + + protected static void nextTick(Runnable task) { + EventThread.nextTick(task); + } + public static class Options { diff --git a/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java b/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java index 27bbde7..344c5bf 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java +++ b/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java @@ -33,53 +33,57 @@ abstract public class Polling extends Transport { } public void pause(final Runnable onPause) { - int pending = 0; - final Polling self = this; - - this.readyState = PAUSED; - - final Runnable pause = new Runnable() { + exec(new Runnable() { @Override public void run() { - logger.fine("paused"); - self.readyState = PAUSED; - onPause.run(); - } - }; + final Polling self = Polling.this; - if (this.polling || !this.writable) { - final int[] total = new int[] {0}; + Polling.this.readyState = PAUSED; - if (this.polling) { - logger.fine("we are currently polling - waiting to pause"); - total[0]++; - this.once(EVENT_POLL_COMPLETE, new Listener() { + final Runnable pause = new Runnable() { @Override - public void call(Object... args) { - logger.fine("pre-pause polling complete"); - if (--total[0] == 0) { - pause.run(); - } + public void run() { + logger.fine("paused"); + self.readyState = PAUSED; + onPause.run(); } - }); - } + }; - if (!this.writable) { - logger.fine("we are currently writing - waiting to pause"); - total[0]++; - this.once(EVENT_DRAIN, new Listener() { - @Override - public void call(Object... args) { - logger.fine("pre-pause writing complete"); - if (--total[0] == 0) { - pause.run(); - } + if (Polling.this.polling || !Polling.this.writable) { + final int[] total = new int[] {0}; + + if (Polling.this.polling) { + logger.fine("we are currently polling - waiting to pause"); + total[0]++; + Polling.this.once(EVENT_POLL_COMPLETE, new Listener() { + @Override + public void call(Object... args) { + logger.fine("pre-pause polling complete"); + if (--total[0] == 0) { + pause.run(); + } + } + }); } - }); + + if (!Polling.this.writable) { + logger.fine("we are currently writing - waiting to pause"); + total[0]++; + Polling.this.once(EVENT_DRAIN, new Listener() { + @Override + public void call(Object... args) { + logger.fine("pre-pause writing complete"); + if (--total[0] == 0) { + pause.run(); + } + } + }); + } + } else { + pause.run(); + } } - } else { - pause.run(); - } + }); } private void poll() { diff --git a/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java b/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java index 7515e16..2c6f2fb 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java +++ b/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java @@ -42,14 +42,24 @@ public class PollingXHR extends Polling { req.on(Request.EVENT_SUCCESS, new Listener() { @Override public void call(Object... args) { - fn.run(); + exec(new Runnable() { + @Override + public void run() { + fn.run(); + } + }); } }); req.on(Request.EVENT_ERROR, new Listener() { @Override - public void call(Object... args) { - Exception err = args.length > 0 && args[0] instanceof Exception ? (Exception)args[0] : null; - self.onError("xhr post error", err); + public void call(final Object... args) { + exec(new Runnable() { + @Override + public void run() { + Exception err = args.length > 0 && args[0] instanceof Exception ? (Exception)args[0] : null; + self.onError("xhr post error", err); + } + }); } }); req.create(); @@ -62,16 +72,26 @@ public class PollingXHR extends Polling { final PollingXHR self = this; req.on(Request.EVENT_DATA, new Listener() { @Override - public void call(Object... args) { - String data = args.length > 0 ? (String)args[0] : null; - self.onData(data); + public void call(final Object... args) { + exec(new Runnable() { + @Override + public void run() { + String data = args.length > 0 ? (String) args[0] : null; + self.onData(data); + } + }); } }); req.on(Request.EVENT_ERROR, new Listener() { @Override - public void call(Object... args) { - Exception err = args.length > 0 && args[0] instanceof Exception ? (Exception)args[0] : null; - self.onError("xhr poll error", err); + public void call(final Object... args) { + exec(new Runnable() { + @Override + public void run() { + Exception err = args.length > 0 && args[0] instanceof Exception ? (Exception) args[0] : null; + self.onError("xhr poll error", err); + } + }); } }); req.create(); diff --git a/src/main/java/com/github/nkzawa/engineio/client/transports/WebSocket.java b/src/main/java/com/github/nkzawa/engineio/client/transports/WebSocket.java index 684fc25..bc0a217 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/transports/WebSocket.java +++ b/src/main/java/com/github/nkzawa/engineio/client/transports/WebSocket.java @@ -44,19 +44,39 @@ public class WebSocket extends Transport { this.socket = new WebSocketClient(new URI(this.uri()), new Draft_17()) { @Override public void onOpen(ServerHandshake serverHandshake) { - self.onOpen(); + exec(new Runnable() { + @Override + public void run() { + self.onOpen(); + } + }); } @Override public void onClose(int i, String s, boolean b) { - self.onClose(); + exec(new Runnable() { + @Override + public void run() { + self.onClose(); + } + }); } @Override - public void onMessage(String s) { - self.onData(s); + public void onMessage(final String s) { + exec(new Runnable() { + @Override + public void run() { + self.onData(s); + } + }); } @Override - public void onError(Exception e) { - self.onError("websocket error", e); + public void onError(final Exception e) { + exec(new Runnable() { + @Override + public void run() { + self.onError("websocket error", e); + } + }); } }; this.socket.connect(); @@ -84,14 +104,19 @@ public class WebSocket extends Transport { this.bufferedAmountId = this.drainScheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { - if (!self.socket.getConnection().hasBufferedData()) { - self.bufferedAmountId.cancel(true); - ondrain.run(); - } + exec(new Runnable() { + @Override + public void run() { + if (!self.socket.getConnection().hasBufferedData()) { + self.bufferedAmountId.cancel(true); + ondrain.run(); + } + } + }); } }, 50, 50, TimeUnit.MILLISECONDS); } else { - this.drainScheduler.schedule(ondrain, 0, TimeUnit.MILLISECONDS); + nextTick(ondrain); } } diff --git a/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java b/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java index 0c91b0f..4d85815 100644 --- a/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java +++ b/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java @@ -80,8 +80,8 @@ public class ServerConnectionTest { public void stopServer() throws InterruptedException { System.out.println("Stopping server ..."); serverProcess.destroy(); - serverOutout.cancel(false); - serverError.cancel(false); + serverOutout.cancel(true); + serverError.cancel(true); serverService.shutdown(); serverService.awaitTermination(3000, TimeUnit.MILLISECONDS); }