From 5fe896fd7e370661fd2819c234c7239c404c166d Mon Sep 17 00:00:00 2001 From: nkzawa Date: Sun, 31 Jan 2016 03:03:25 +0900 Subject: [PATCH] remove callbackBuffer from Socket --- .../io/socket/engineio/client/Socket.java | 41 +++++++------------ 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/src/main/java/io/socket/engineio/client/Socket.java b/src/main/java/io/socket/engineio/client/Socket.java index cbe9f5a..9063fcc 100644 --- a/src/main/java/io/socket/engineio/client/Socket.java +++ b/src/main/java/io/socket/engineio/client/Socket.java @@ -120,7 +120,6 @@ public class Socket extends Emitter { private List upgrades; private Map query; /*package*/ LinkedList writeBuffer = new LinkedList(); - private LinkedList callbackBuffer = new LinkedList(); /*package*/ Transport transport; private Future pingTimeoutTimer; private Future pingIntervalTimer; @@ -589,20 +588,12 @@ public class Socket extends Emitter { } private void onDrain() { - for (int i = 0; i < this.prevBufferLen; i++) { - Runnable callback = this.callbackBuffer.get(i); - if (callback != null) { - callback.run(); - } - } - for (int i = 0; i < this.prevBufferLen; i++) { this.writeBuffer.poll(); - this.callbackBuffer.poll(); } this.prevBufferLen = 0; - if (this.writeBuffer.size() == 0) { + if (0 == this.writeBuffer.size()) { this.emit(EVENT_DRAIN); } else { this.flush(); @@ -686,19 +677,21 @@ public class Socket extends Emitter { sendPacket(packet, fn); } - private void sendPacket(Packet packet, Runnable fn) { + private void sendPacket(Packet packet, final Runnable fn) { if (ReadyState.CLOSING == this.readyState || ReadyState.CLOSED == this.readyState) { return; } - if (fn == null) { - // ConcurrentLinkedList does not permit `null`. - fn = noop; - } - this.emit(EVENT_PACKET_CREATE, packet); this.writeBuffer.offer(packet); - this.callbackBuffer.offer(fn); + if (null != fn) { + this.once(EVENT_FLUSH, new Listener() { + @Override + public void call(Object... args) { + fn.run(); + } + }); + } this.flush(); } @@ -793,15 +786,6 @@ public class Socket extends Emitter { this.heartbeatScheduler.shutdown(); } - EventThread.nextTick(new Runnable() { - @Override - public void run() { - self.writeBuffer.clear(); - self.callbackBuffer.clear(); - self.prevBufferLen = 0; - } - }); - // stop event from firing again for transport this.transport.off(EVENT_CLOSE); @@ -819,6 +803,11 @@ public class Socket extends Emitter { // emit close events this.emit(EVENT_CLOSE, reason, desc); + + // clear buffers after, so users can still + // grab the buffers on `close` event + self.writeBuffer.clear(); + self.prevBufferLen = 0; } }