From 2465d35e5068eeb9a1eeffcb2935246e88745694 Mon Sep 17 00:00:00 2001 From: Naoyuki Kanezawa Date: Sat, 22 Jun 2013 13:38:54 +0900 Subject: [PATCH] compatible with 0.6.2 --- README.md | 3 ++ .../github/nkzawa/engineio/client/Socket.java | 43 +++++++++++-------- .../engineio/client/transports/Polling.java | 21 ++++++++- .../engineio/client/ServerConnectionTest.java | 13 ++++++ .../nkzawa/engineio/client/SocketTest.java | 4 ++ 5 files changed, 65 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index c71333f..0db59c4 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,9 @@ socket = new Socket("ws://localhost") { @Override public void onclose() {} + + @Override + public void onerror(Exception err) {} }; socket.open(); ``` diff --git a/src/main/java/com/github/nkzawa/engineio/client/Socket.java b/src/main/java/com/github/nkzawa/engineio/client/Socket.java index 049860d..2d556b8 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/Socket.java +++ b/src/main/java/com/github/nkzawa/engineio/client/Socket.java @@ -106,8 +106,8 @@ public abstract class Socket extends Emitter { private List transports; private List upgrades; private Map query; - private Queue writeBuffer = new LinkedList(); - private Queue callbackBuffer = new LinkedList(); + private LinkedList writeBuffer = new LinkedList(); + private LinkedList callbackBuffer = new LinkedList(); private Transport transport; private Future pingTimeoutTimer; private Future pingIntervalTimer; @@ -461,7 +461,7 @@ public abstract class Socket extends Emitter { } /** - * Sends a ping packet + * Sends a ping packet. */ public void ping() { EventThread.exec(new Runnable() { @@ -473,7 +473,13 @@ public abstract class Socket extends Emitter { } private void onDrain() { - this.callbacks(); + for (int i = 0; i < this.prevBufferLen; i++) { + Runnable callback = this.callbackBuffer.get(i); + if (callback != null) { + callback.run(); + } + } + for (int i = 0; i < this.prevBufferLen; i++) { this.writeBuffer.poll(); this.callbackBuffer.poll(); @@ -487,16 +493,6 @@ public abstract class Socket extends Emitter { } } - private void callbacks() { - Iterator iter = this.callbackBuffer.iterator(); - for (int i = 0; i < this.prevBufferLen && iter.hasNext(); i++) { - Runnable callback = iter.next(); - if (callback != null) { - callback.run(); - } - } - } - private void flush() { if (this.readyState != ReadyState.CLOSED && this.transport.writable && !this.upgrading && this.writeBuffer.size() != 0) { @@ -569,7 +565,6 @@ public abstract class Socket extends Emitter { Socket.this.onClose("forced close"); logger.fine("socket closing - telling transport to close"); Socket.this.transport.close(); - Socket.this.transport.off(); } } @@ -580,6 +575,7 @@ public abstract class Socket extends Emitter { private void onError(Exception err) { logger.fine(String.format("socket error %s", err)); this.emit(EVENT_ERROR, err); + this.onerror(err); this.onClose("transport error", err); } @@ -591,12 +587,15 @@ public abstract class Socket extends Emitter { if (this.readyState == ReadyState.OPENING || this.readyState == ReadyState.OPEN) { logger.fine(String.format("socket close with reason: %s", reason)); final Socket self = this; + + // clear timers if (this.pingIntervalTimer != null) { this.pingIntervalTimer.cancel(true); } if (this.pingTimeoutTimer != null) { this.pingTimeoutTimer.cancel(true); } + EventThread.nextTick(new Runnable() { @Override public void run() { @@ -604,13 +603,22 @@ public abstract class Socket extends Emitter { self.callbackBuffer.clear(); } }); + + // ignore further transport communication + this.transport.off(); + + // set ready state ReadyState prev = this.readyState; this.readyState = ReadyState.CLOSED; + + // clear session id + this.id = null; + + // emit events if (prev == ReadyState.OPEN) { this.emit(EVENT_CLOSE, reason, desc); + this.onclose(); } - this.onclose(); - this.id = null; } } @@ -630,6 +638,7 @@ public abstract class Socket extends Emitter { public abstract void onclose(); + public abstract void onerror(Exception err); public static class Options extends Transport.Options { diff --git a/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java b/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java index 1a43359..71ef9dc 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java +++ b/src/main/java/com/github/nkzawa/engineio/client/transports/Polling.java @@ -128,8 +128,25 @@ abstract public class Polling extends Transport { } protected void doClose() { - logger.fine("sending close packet"); - this.send(new Packet[] {new Packet(Packet.CLOSE, null)}); + final Polling self = this; + + Listener close = new Listener() { + @Override + public void call(Object... args) { + logger.fine("writing close packet"); + self.write(new Packet[] {new Packet(Packet.CLOSE, null)}); + } + }; + + if (this.readyState == ReadyState.OPEN) { + logger.fine("transport open - closing"); + close.call(); + } else { + // in case we're trying to close while + // handshaking is in progress (engine.io-client GH-164) + logger.fine("transport not open - deferring close"); + this.once(EVENT_OPEN, close); + } } protected void write(Packet[] packets) { diff --git a/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java b/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java index 0bd1654..d0abc28 100644 --- a/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java +++ b/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java @@ -105,6 +105,9 @@ public class ServerConnectionTest { System.out.println("onclose:"); events.offer("onclose"); } + + @Override + public void onerror(Exception err) {} }; socket.open(); @@ -132,6 +135,9 @@ public class ServerConnectionTest { @Override public void onclose() {} + + @Override + public void onerror(Exception err) {} }; socket.open(); @@ -151,6 +157,8 @@ public class ServerConnectionTest { public void onmessage(String data) {} @Override public void onclose() {} + @Override + public void onerror(Exception err) {} }; socket.on(Socket.EVENT_HANDSHAKE, new Emitter.Listener() { @Override @@ -186,6 +194,8 @@ public class ServerConnectionTest { public void onmessage(String data) {} @Override public void onclose() {} + @Override + public void onerror(Exception err) {} }; socket.on(Socket.EVENT_UPGRADING, new Emitter.Listener() { @Override @@ -237,6 +247,9 @@ public class ServerConnectionTest { @Override public void onclose() {} + + @Override + public void onerror(Exception err) {} }; socket.open(); diff --git a/src/test/java/com/github/nkzawa/engineio/client/SocketTest.java b/src/test/java/com/github/nkzawa/engineio/client/SocketTest.java index 7ced5b1..64e2f28 100644 --- a/src/test/java/com/github/nkzawa/engineio/client/SocketTest.java +++ b/src/test/java/com/github/nkzawa/engineio/client/SocketTest.java @@ -28,6 +28,8 @@ public class SocketTest { public void onmessage(String data) {} @Override public void onclose() {} + @Override + public void onerror(Exception err) {} }; List upgrades = new ArrayList() {{ add(Polling.NAME); @@ -51,6 +53,8 @@ public class SocketTest { public void onmessage(String data) {} @Override public void onclose() {} + @Override + public void onerror(Exception err) {} }; final boolean[] closed = {false};