remove callbackBuffer from Socket

This commit is contained in:
nkzawa
2016-01-31 03:03:25 +09:00
parent f75ea34440
commit 5fe896fd7e

View File

@@ -120,7 +120,6 @@ public class Socket extends Emitter {
private List<String> upgrades; private List<String> upgrades;
private Map<String, String> query; private Map<String, String> query;
/*package*/ LinkedList<Packet> writeBuffer = new LinkedList<Packet>(); /*package*/ LinkedList<Packet> writeBuffer = new LinkedList<Packet>();
private LinkedList<Runnable> callbackBuffer = new LinkedList<Runnable>();
/*package*/ Transport transport; /*package*/ Transport transport;
private Future pingTimeoutTimer; private Future pingTimeoutTimer;
private Future pingIntervalTimer; private Future pingIntervalTimer;
@@ -589,20 +588,12 @@ public class Socket extends Emitter {
} }
private void onDrain() { private void onDrain() {
for (int i = 0; i < this.prevBufferLen; i++) {
Runnable callback = this.callbackBuffer.get(i);
if (callback != null) {
callback.run();
}
}
for (int i = 0; i < this.prevBufferLen; i++) { for (int i = 0; i < this.prevBufferLen; i++) {
this.writeBuffer.poll(); this.writeBuffer.poll();
this.callbackBuffer.poll();
} }
this.prevBufferLen = 0; this.prevBufferLen = 0;
if (this.writeBuffer.size() == 0) { if (0 == this.writeBuffer.size()) {
this.emit(EVENT_DRAIN); this.emit(EVENT_DRAIN);
} else { } else {
this.flush(); this.flush();
@@ -686,19 +677,21 @@ public class Socket extends Emitter {
sendPacket(packet, fn); sendPacket(packet, fn);
} }
private void sendPacket(Packet packet, Runnable fn) { private void sendPacket(Packet packet, final Runnable fn) {
if (ReadyState.CLOSING == this.readyState || ReadyState.CLOSED == this.readyState) { if (ReadyState.CLOSING == this.readyState || ReadyState.CLOSED == this.readyState) {
return; return;
} }
if (fn == null) {
// ConcurrentLinkedList does not permit `null`.
fn = noop;
}
this.emit(EVENT_PACKET_CREATE, packet); this.emit(EVENT_PACKET_CREATE, packet);
this.writeBuffer.offer(packet); this.writeBuffer.offer(packet);
this.callbackBuffer.offer(fn); if (null != fn) {
this.once(EVENT_FLUSH, new Listener() {
@Override
public void call(Object... args) {
fn.run();
}
});
}
this.flush(); this.flush();
} }
@@ -793,15 +786,6 @@ public class Socket extends Emitter {
this.heartbeatScheduler.shutdown(); this.heartbeatScheduler.shutdown();
} }
EventThread.nextTick(new Runnable() {
@Override
public void run() {
self.writeBuffer.clear();
self.callbackBuffer.clear();
self.prevBufferLen = 0;
}
});
// stop event from firing again for transport // stop event from firing again for transport
this.transport.off(EVENT_CLOSE); this.transport.off(EVENT_CLOSE);
@@ -819,6 +803,11 @@ public class Socket extends Emitter {
// emit close events // emit close events
this.emit(EVENT_CLOSE, reason, desc); this.emit(EVENT_CLOSE, reason, desc);
// clear buffers after, so users can still
// grab the buffers on `close` event
self.writeBuffer.clear();
self.prevBufferLen = 0;
} }
} }