thread constraint

This commit is contained in:
Naoyuki Kanezawa
2013-05-06 20:48:38 +09:00
parent a5898573c6
commit 06c1da57b4
8 changed files with 261 additions and 106 deletions

View File

@@ -9,7 +9,7 @@ import java.util.concurrent.ConcurrentMap;
/** /**
* The event emitter which is ported from the JavaScript module. * The event emitter which is ported from the JavaScript module. This class is thread-safe.
* *
* @see <a href="https://github.com/component/emitter">https://github.com/component/emitter</a> * @see <a href="https://github.com/component/emitter">https://github.com/component/emitter</a>
*/ */

View File

@@ -0,0 +1,60 @@
package com.github.nkzawa.engineio.client;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* The main thread for Engine.IO Client.
*/
class EventThread extends Thread {
private static final ExecutorService service = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
thread = new EventThread(runnable);
return thread;
}
});
private static volatile EventThread thread;
private EventThread(Runnable runnable) {
super(runnable);
}
/**
* check if the current thread is EventThread.
*
* @return true if the current thread is EventThread.
*/
public static boolean isCurrent() {
return currentThread() == thread;
}
/**
* Executes a task in EventThread.
*
* @param task
*/
public static void exec(Runnable task) {
if (isCurrent()) {
task.run();
} else {
service.execute(task);
}
}
/**
* Executes a task on the next loop in EventThread.
*
* @param task
*/
public static void nextTick(Runnable task) {
service.execute(task);
}
}

View File

@@ -108,8 +108,8 @@ public abstract class Socket extends Emitter {
private List<String> transports; private List<String> transports;
private List<String> upgrades; private List<String> upgrades;
private Map<String, String> query; private Map<String, String> query;
private ConcurrentLinkedQueue<Packet> writeBuffer = new ConcurrentLinkedQueue<Packet>(); private Queue<Packet> writeBuffer = new LinkedList<Packet>();
private ConcurrentLinkedQueue<Runnable> callbackBuffer = new ConcurrentLinkedQueue<Runnable>(); private Queue<Runnable> callbackBuffer = new LinkedList<Runnable>();
private Transport transport; private Transport transport;
private Future pingTimeoutTimer; private Future pingTimeoutTimer;
private Future pingIntervalTimer; private Future pingIntervalTimer;
@@ -176,10 +176,15 @@ public abstract class Socket extends Emitter {
* Connects the client. * Connects the client.
*/ */
public void open() { public void open() {
this.readyState = OPENING; EventThread.exec(new Runnable() {
Transport transport = this.createTransport(this.transports.get(0)); @Override
this.setTransport(transport); public void run() {
transport.open(); Socket.this.readyState = OPENING;
Transport transport = Socket.this.createTransport(Socket.this.transports.get(0));
Socket.this.setTransport(transport);
transport.open();
}
});
} }
private Transport createTransport(String name) { private Transport createTransport(String name) {
@@ -409,7 +414,7 @@ public abstract class Socket extends Emitter {
} }
}; };
private synchronized void onHeartbeat(long timeout) { private void onHeartbeat(long timeout) {
if (this.pingTimeoutTimer != null) { if (this.pingTimeoutTimer != null) {
pingTimeoutTimer.cancel(true); pingTimeoutTimer.cancel(true);
} }
@@ -422,13 +427,18 @@ public abstract class Socket extends Emitter {
this.pingTimeoutTimer = this.heartbeatScheduler.schedule(new Runnable() { this.pingTimeoutTimer = this.heartbeatScheduler.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
if (self.readyState == CLOSED) return; EventThread.exec(new Runnable() {
self.onClose("ping timeout"); @Override
public void run() {
if (self.readyState == CLOSED) return;
self.onClose("ping timeout");
}
});
} }
}, timeout, TimeUnit.MILLISECONDS); }, timeout, TimeUnit.MILLISECONDS);
} }
private synchronized void ping() { private void ping() {
if (this.pingIntervalTimer != null) { if (this.pingIntervalTimer != null) {
pingIntervalTimer.cancel(true); pingIntervalTimer.cancel(true);
} }
@@ -437,9 +447,14 @@ public abstract class Socket extends Emitter {
this.pingIntervalTimer = this.heartbeatScheduler.schedule(new Runnable() { this.pingIntervalTimer = this.heartbeatScheduler.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
logger.fine(String.format("writing ping packet - expecting pong within %sms", self.pingTimeout)); EventThread.exec(new Runnable() {
self.sendPacket(Packet.PING); @Override
self.onHeartbeat(self.pingTimeout); public void run() {
logger.fine(String.format("writing ping packet - expecting pong within %sms", self.pingTimeout));
self.sendPacket(Packet.PING);
self.onHeartbeat(self.pingTimeout);
}
});
} }
}, this.pingInterval, TimeUnit.MILLISECONDS); }, this.pingInterval, TimeUnit.MILLISECONDS);
} }
@@ -502,8 +517,13 @@ public abstract class Socket extends Emitter {
* @param msg * @param msg
* @param fn callback to be called on drain * @param fn callback to be called on drain
*/ */
public void send(String msg, Runnable fn) { public void send(final String msg, final Runnable fn) {
this.sendPacket(Packet.MESSAGE, msg, fn); EventThread.exec(new Runnable() {
@Override
public void run() {
Socket.this.sendPacket(Packet.MESSAGE, msg, fn);
}
});
} }
private void sendPacket(String type) { private void sendPacket(String type) {
@@ -529,13 +549,18 @@ public abstract class Socket extends Emitter {
* @return a reference to to this object. * @return a reference to to this object.
*/ */
public Socket close() { public Socket close() {
if (this.readyState == OPENING || this.readyState == OPEN) { EventThread.exec(new Runnable() {
this.onClose("forced close"); @Override
logger.fine("socket closing - telling transport to close"); public void run() {
this.transport.close(); if (Socket.this.readyState == OPENING || Socket.this.readyState == OPEN) {
this.transport.off(); Socket.this.onClose("forced close");
} logger.fine("socket closing - telling transport to close");
Socket.this.transport.close();
Socket.this.transport.off();
}
}
});
return this; return this;
} }
@@ -558,18 +583,16 @@ public abstract class Socket extends Emitter {
if (this.pingTimeoutTimer != null) { if (this.pingTimeoutTimer != null) {
this.pingTimeoutTimer.cancel(true); this.pingTimeoutTimer.cancel(true);
} }
EventThread.nextTick(new Runnable() {
@Override
public void run() {
Socket.this.writeBuffer.clear();
Socket.this.callbackBuffer.clear();
}
});
this.readyState = CLOSED; this.readyState = CLOSED;
this.emit(EVENT_CLOSE, reason, desc); this.emit(EVENT_CLOSE, reason, desc);
this.onclose(); this.onclose();
// TODO:
// clean buffer in next tick, so developers can still
// grab the buffers on `close` event
// setTimeout(function() {}
// self.writeBuffer = [];
// self.callbackBuffer = [];
// );
this.writeBuffer.clear();
this.callbackBuffer.clear();
this.id = null; this.id = null;
} }
} }

View File

@@ -58,27 +58,42 @@ public abstract class Transport extends Emitter {
} }
public Transport open() { public Transport open() {
if (this.readyState == CLOSED || this.readyState < 0) { exec(new Runnable() {
this.readyState = OPENING; @Override
this.doOpen(); public void run() {
} if (Transport.this.readyState == CLOSED || Transport.this.readyState < 0) {
Transport.this.readyState = OPENING;
Transport.this.doOpen();
}
}
});
return this; return this;
} }
public Transport close() { public Transport close() {
if (this.readyState == OPENING || this.readyState == OPEN) { exec(new Runnable() {
this.doClose(); @Override
this.onClose(); public void run() {
} if (Transport.this.readyState == OPENING || Transport.this.readyState == OPEN) {
Transport.this.doClose();
Transport.this.onClose();
}
}
});
return this; return this;
} }
public void send(Packet[] packets) { public void send(final Packet[] packets) {
if (this.readyState == OPEN) { exec(new Runnable() {
this.write(packets); @Override
} else { public void run() {
throw new RuntimeException("Transport not open"); if (Transport.this.readyState == OPEN) {
} Transport.this.write(packets);
} else {
throw new RuntimeException("Transport not open");
}
}
});
} }
protected void onOpen() { protected void onOpen() {
@@ -106,6 +121,14 @@ public abstract class Transport extends Emitter {
abstract protected void doClose(); abstract protected void doClose();
protected static void exec(Runnable task) {
EventThread.exec(task);
}
protected static void nextTick(Runnable task) {
EventThread.nextTick(task);
}
public static class Options { public static class Options {

View File

@@ -33,53 +33,57 @@ abstract public class Polling extends Transport {
} }
public void pause(final Runnable onPause) { public void pause(final Runnable onPause) {
int pending = 0; exec(new Runnable() {
final Polling self = this;
this.readyState = PAUSED;
final Runnable pause = new Runnable() {
@Override @Override
public void run() { public void run() {
logger.fine("paused"); final Polling self = Polling.this;
self.readyState = PAUSED;
onPause.run();
}
};
if (this.polling || !this.writable) { Polling.this.readyState = PAUSED;
final int[] total = new int[] {0};
if (this.polling) { final Runnable pause = new Runnable() {
logger.fine("we are currently polling - waiting to pause");
total[0]++;
this.once(EVENT_POLL_COMPLETE, new Listener() {
@Override @Override
public void call(Object... args) { public void run() {
logger.fine("pre-pause polling complete"); logger.fine("paused");
if (--total[0] == 0) { self.readyState = PAUSED;
pause.run(); onPause.run();
}
} }
}); };
}
if (!this.writable) { if (Polling.this.polling || !Polling.this.writable) {
logger.fine("we are currently writing - waiting to pause"); final int[] total = new int[] {0};
total[0]++;
this.once(EVENT_DRAIN, new Listener() { if (Polling.this.polling) {
@Override logger.fine("we are currently polling - waiting to pause");
public void call(Object... args) { total[0]++;
logger.fine("pre-pause writing complete"); Polling.this.once(EVENT_POLL_COMPLETE, new Listener() {
if (--total[0] == 0) { @Override
pause.run(); public void call(Object... args) {
} logger.fine("pre-pause polling complete");
if (--total[0] == 0) {
pause.run();
}
}
});
} }
});
if (!Polling.this.writable) {
logger.fine("we are currently writing - waiting to pause");
total[0]++;
Polling.this.once(EVENT_DRAIN, new Listener() {
@Override
public void call(Object... args) {
logger.fine("pre-pause writing complete");
if (--total[0] == 0) {
pause.run();
}
}
});
}
} else {
pause.run();
}
} }
} else { });
pause.run();
}
} }
private void poll() { private void poll() {

View File

@@ -42,14 +42,24 @@ public class PollingXHR extends Polling {
req.on(Request.EVENT_SUCCESS, new Listener() { req.on(Request.EVENT_SUCCESS, new Listener() {
@Override @Override
public void call(Object... args) { public void call(Object... args) {
fn.run(); exec(new Runnable() {
@Override
public void run() {
fn.run();
}
});
} }
}); });
req.on(Request.EVENT_ERROR, new Listener() { req.on(Request.EVENT_ERROR, new Listener() {
@Override @Override
public void call(Object... args) { public void call(final Object... args) {
Exception err = args.length > 0 && args[0] instanceof Exception ? (Exception)args[0] : null; exec(new Runnable() {
self.onError("xhr post error", err); @Override
public void run() {
Exception err = args.length > 0 && args[0] instanceof Exception ? (Exception)args[0] : null;
self.onError("xhr post error", err);
}
});
} }
}); });
req.create(); req.create();
@@ -62,16 +72,26 @@ public class PollingXHR extends Polling {
final PollingXHR self = this; final PollingXHR self = this;
req.on(Request.EVENT_DATA, new Listener() { req.on(Request.EVENT_DATA, new Listener() {
@Override @Override
public void call(Object... args) { public void call(final Object... args) {
String data = args.length > 0 ? (String)args[0] : null; exec(new Runnable() {
self.onData(data); @Override
public void run() {
String data = args.length > 0 ? (String) args[0] : null;
self.onData(data);
}
});
} }
}); });
req.on(Request.EVENT_ERROR, new Listener() { req.on(Request.EVENT_ERROR, new Listener() {
@Override @Override
public void call(Object... args) { public void call(final Object... args) {
Exception err = args.length > 0 && args[0] instanceof Exception ? (Exception)args[0] : null; exec(new Runnable() {
self.onError("xhr poll error", err); @Override
public void run() {
Exception err = args.length > 0 && args[0] instanceof Exception ? (Exception) args[0] : null;
self.onError("xhr poll error", err);
}
});
} }
}); });
req.create(); req.create();

View File

@@ -44,19 +44,39 @@ public class WebSocket extends Transport {
this.socket = new WebSocketClient(new URI(this.uri()), new Draft_17()) { this.socket = new WebSocketClient(new URI(this.uri()), new Draft_17()) {
@Override @Override
public void onOpen(ServerHandshake serverHandshake) { public void onOpen(ServerHandshake serverHandshake) {
self.onOpen(); exec(new Runnable() {
@Override
public void run() {
self.onOpen();
}
});
} }
@Override @Override
public void onClose(int i, String s, boolean b) { public void onClose(int i, String s, boolean b) {
self.onClose(); exec(new Runnable() {
@Override
public void run() {
self.onClose();
}
});
} }
@Override @Override
public void onMessage(String s) { public void onMessage(final String s) {
self.onData(s); exec(new Runnable() {
@Override
public void run() {
self.onData(s);
}
});
} }
@Override @Override
public void onError(Exception e) { public void onError(final Exception e) {
self.onError("websocket error", e); exec(new Runnable() {
@Override
public void run() {
self.onError("websocket error", e);
}
});
} }
}; };
this.socket.connect(); this.socket.connect();
@@ -84,14 +104,19 @@ public class WebSocket extends Transport {
this.bufferedAmountId = this.drainScheduler.scheduleAtFixedRate(new Runnable() { this.bufferedAmountId = this.drainScheduler.scheduleAtFixedRate(new Runnable() {
@Override @Override
public void run() { public void run() {
if (!self.socket.getConnection().hasBufferedData()) { exec(new Runnable() {
self.bufferedAmountId.cancel(true); @Override
ondrain.run(); public void run() {
} if (!self.socket.getConnection().hasBufferedData()) {
self.bufferedAmountId.cancel(true);
ondrain.run();
}
}
});
} }
}, 50, 50, TimeUnit.MILLISECONDS); }, 50, 50, TimeUnit.MILLISECONDS);
} else { } else {
this.drainScheduler.schedule(ondrain, 0, TimeUnit.MILLISECONDS); nextTick(ondrain);
} }
} }

View File

@@ -80,8 +80,8 @@ public class ServerConnectionTest {
public void stopServer() throws InterruptedException { public void stopServer() throws InterruptedException {
System.out.println("Stopping server ..."); System.out.println("Stopping server ...");
serverProcess.destroy(); serverProcess.destroy();
serverOutout.cancel(false); serverOutout.cancel(true);
serverError.cancel(false); serverError.cancel(true);
serverService.shutdown(); serverService.shutdown();
serverService.awaitTermination(3000, TimeUnit.MILLISECONDS); serverService.awaitTermination(3000, TimeUnit.MILLISECONDS);
} }