compatible with engine.io-client v1.4.2

This commit is contained in:
Naoyuki Kanezawa
2014-11-03 22:45:45 +09:00
parent ba8755384e
commit 96fba4e20a
2 changed files with 181 additions and 9 deletions

View File

@@ -115,7 +115,7 @@ public class Socket extends Emitter {
private List<String> transports; private List<String> transports;
private List<String> upgrades; private List<String> upgrades;
private Map<String, String> query; private Map<String, String> query;
private LinkedList<Packet> writeBuffer = new LinkedList<Packet>(); /*package*/ LinkedList<Packet> writeBuffer = new LinkedList<Packet>();
private LinkedList<Runnable> callbackBuffer = new LinkedList<Runnable>(); private LinkedList<Runnable> callbackBuffer = new LinkedList<Runnable>();
/*package*/ Transport transport; /*package*/ Transport transport;
private Future pingTimeoutTimer; private Future pingTimeoutTimer;
@@ -325,6 +325,7 @@ public class Socket extends Emitter {
logger.fine(String.format("probe transport '%s' pong", name)); logger.fine(String.format("probe transport '%s' pong", name));
self.upgrading = true; self.upgrading = true;
self.emit(EVENT_UPGRADING, transport[0]); self.emit(EVENT_UPGRADING, transport[0]);
if (null == transport) return;
Socket.priorWebsocketSuccess = WebSocket.NAME.equals(transport[0].name); Socket.priorWebsocketSuccess = WebSocket.NAME.equals(transport[0].name);
logger.fine(String.format("pausing current transport '%s'", self.transport.name)); logger.fine(String.format("pausing current transport '%s'", self.transport.name));
@@ -332,9 +333,7 @@ public class Socket extends Emitter {
@Override @Override
public void run() { public void run() {
if (failed[0]) return; if (failed[0]) return;
if (ReadyState.CLOSED == self.readyState || ReadyState.CLOSING == self.readyState) { if (ReadyState.CLOSED == self.readyState) return;
return;
}
logger.fine("changing transport and sending upgrade packet"); logger.fine("changing transport and sending upgrade packet");
@@ -667,6 +666,10 @@ public class Socket extends Emitter {
} }
private void sendPacket(Packet packet, Runnable fn) { private void sendPacket(Packet packet, Runnable fn) {
if (ReadyState.CLOSING == this.readyState || ReadyState.CLOSED == this.readyState) {
return;
}
if (fn == null) { if (fn == null) {
// ConcurrentLinkedList does not permit `null`. // ConcurrentLinkedList does not permit `null`.
fn = noop; fn = noop;
@@ -688,11 +691,55 @@ public class Socket extends Emitter {
@Override @Override
public void run() { public void run() {
if (Socket.this.readyState == ReadyState.OPENING || Socket.this.readyState == ReadyState.OPEN) { if (Socket.this.readyState == ReadyState.OPENING || Socket.this.readyState == ReadyState.OPEN) {
Socket.this.onClose("forced close"); Socket.this.readyState = ReadyState.CLOSING;
logger.fine("socket closing - telling transport to close");
Socket.this.transport.close();
}
final Socket self = Socket.this;
final Runnable close = new Runnable() {
@Override
public void run() {
self.onClose("forced close");
logger.fine("socket closing - telling transport to close");
self.transport.close();
}
};
final Listener[] cleanupAndClose = new Listener[1];
cleanupAndClose[0] = new Listener() {
@Override
public void call(Object ...args) {
self.off(EVENT_UPGRADE, cleanupAndClose[0]);
self.off(EVENT_UPGRADE_ERROR, cleanupAndClose[0]);
close.run();
}
};
final Runnable waitForUpgrade = new Runnable() {
@Override
public void run() {
// wait for updade to finish since we can't send packets while pausing a transport
self.once(EVENT_UPGRADE, cleanupAndClose[0]);
self.once(EVENT_UPGRADE_ERROR, cleanupAndClose[0]);
}
};
if (Socket.this.writeBuffer.size() > 0) {
Socket.this.once(EVENT_DRAIN, new Listener() {
@Override
public void call(Object... args) {
if (Socket.this.upgrading) {
waitForUpgrade.run();
} else {
close.run();
}
}
});
} else if (Socket.this.upgrading) {
waitForUpgrade.run();
} else {
close.run();
}
}
} }
}); });
return this; return this;
@@ -710,7 +757,7 @@ public class Socket extends Emitter {
} }
private void onClose(String reason, Exception desc) { private void onClose(String reason, Exception desc) {
if (this.readyState == ReadyState.OPENING || this.readyState == ReadyState.OPEN) { if (ReadyState.OPENING == this.readyState || ReadyState.OPEN == this.readyState || ReadyState.CLOSING == this.readyState) {
logger.fine(String.format("socket close with reason: %s", reason)); logger.fine(String.format("socket close with reason: %s", reason));
final Socket self = this; final Socket self = this;

View File

@@ -104,6 +104,7 @@ public class ConnectionTest extends Connection {
} }
}); });
socket.close(); socket.close();
socket.send("hi");
Timer timer = new Timer(); Timer timer = new Timer();
timer.schedule(new TimerTask() { timer.schedule(new TimerTask() {
@Override @Override
@@ -117,4 +118,128 @@ public class ConnectionTest extends Connection {
socket.open(); socket.open();
assertThat((Boolean)values.take(), is(true)); assertThat((Boolean)values.take(), is(true));
} }
@Test(timeout = TIMEOUT)
public void deferCloseWhenUpgrading() throws InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
socket = new Socket(createOptions());
socket.on(Socket.EVENT_OPEN, new Emitter.Listener() {
@Override
public void call(Object... args) {
final boolean[] upgraded = new boolean[] {false};
socket.on(Socket.EVENT_UPGRADE, new Emitter.Listener() {
@Override
public void call(Object... args) {
upgraded[0] = true;
}
}).on(Socket.EVENT_UPGRADING, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.on(Socket.EVENT_CLOSE, new Emitter.Listener() {
@Override
public void call(Object... args) {
values.offer(upgraded[0]);
}
});
socket.close();
}
});
}
});
socket.open();
assertThat((Boolean)values.take(), is(true));
}
@Test(timeout = TIMEOUT)
public void closeOnUpgradeErrorIfClosingIsDeferred() throws InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
socket = new Socket(createOptions());
socket.on(Socket.EVENT_OPEN, new Emitter.Listener() {
@Override
public void call(Object... args) {
final boolean[] upgradError = new boolean[] {false};
socket.on(Socket.EVENT_UPGRADE_ERROR, new Emitter.Listener() {
@Override
public void call(Object... args) {
upgradError[0] = true;
}
}).on(Socket.EVENT_UPGRADING, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.on(Socket.EVENT_CLOSE, new Emitter.Listener() {
@Override
public void call(Object... args) {
values.offer(upgradError[0]);
}
});
socket.close();
socket.transport.onError("upgrade error", new Exception());
}
});
}
});
socket.open();
assertThat((Boolean) values.take(), is(true));
}
public void notSendPacketsIfClosingIsDeferred() throws InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
socket = new Socket(createOptions());
socket.on(Socket.EVENT_OPEN, new Emitter.Listener() {
@Override
public void call(Object... args) {
final boolean[] noPacket = new boolean[] {true};
socket.on(Socket.EVENT_UPGRADING, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.on(Socket.EVENT_PACKET_CREATE, new Emitter.Listener() {
@Override
public void call(Object... args) {
noPacket[0] = false;
}
});
socket.close();
socket.send("hi");
}
});
new Timer().schedule(new TimerTask() {
@Override
public void run() {
values.offer(noPacket[0]);
}
}, 1200);
}
});
socket.open();
assertThat((Boolean) values.take(), is(true));
}
@Test(timeout = TIMEOUT)
public void sendAllBufferedPacketsIfClosingIsDeferred() throws InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
socket = new Socket(createOptions());
socket.on(Socket.EVENT_OPEN, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.on(Socket.EVENT_UPGRADING, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.send("hi");
socket.close();
}
}).on(Socket.EVENT_CLOSE, new Emitter.Listener() {
@Override
public void call(Object... args) {
values.offer(socket.writeBuffer.size());
}
});
}
});
socket.open();
assertThat((Integer) values.take(), is(0));
}
} }