compatible with 0.6.2

This commit is contained in:
Naoyuki Kanezawa
2013-06-22 13:38:54 +09:00
parent 5e237ec47d
commit 2465d35e50
5 changed files with 65 additions and 19 deletions

View File

@@ -34,6 +34,9 @@ socket = new Socket("ws://localhost") {
@Override @Override
public void onclose() {} public void onclose() {}
@Override
public void onerror(Exception err) {}
}; };
socket.open(); socket.open();
``` ```

View File

@@ -106,8 +106,8 @@ public abstract class Socket extends Emitter {
private List<String> transports; private List<String> transports;
private List<String> upgrades; private List<String> upgrades;
private Map<String, String> query; private Map<String, String> query;
private Queue<Packet> writeBuffer = new LinkedList<Packet>(); private LinkedList<Packet> writeBuffer = new LinkedList<Packet>();
private Queue<Runnable> callbackBuffer = new LinkedList<Runnable>(); private LinkedList<Runnable> callbackBuffer = new LinkedList<Runnable>();
private Transport transport; private Transport transport;
private Future pingTimeoutTimer; private Future pingTimeoutTimer;
private Future pingIntervalTimer; private Future pingIntervalTimer;
@@ -461,7 +461,7 @@ public abstract class Socket extends Emitter {
} }
/** /**
* Sends a ping packet * Sends a ping packet.
*/ */
public void ping() { public void ping() {
EventThread.exec(new Runnable() { EventThread.exec(new Runnable() {
@@ -473,7 +473,13 @@ public abstract class Socket extends Emitter {
} }
private void onDrain() { private void onDrain() {
this.callbacks(); for (int i = 0; i < this.prevBufferLen; i++) {
Runnable callback = this.callbackBuffer.get(i);
if (callback != null) {
callback.run();
}
}
for (int i = 0; i < this.prevBufferLen; i++) { for (int i = 0; i < this.prevBufferLen; i++) {
this.writeBuffer.poll(); this.writeBuffer.poll();
this.callbackBuffer.poll(); this.callbackBuffer.poll();
@@ -487,16 +493,6 @@ public abstract class Socket extends Emitter {
} }
} }
private void callbacks() {
Iterator<Runnable> iter = this.callbackBuffer.iterator();
for (int i = 0; i < this.prevBufferLen && iter.hasNext(); i++) {
Runnable callback = iter.next();
if (callback != null) {
callback.run();
}
}
}
private void flush() { private void flush() {
if (this.readyState != ReadyState.CLOSED && this.transport.writable && if (this.readyState != ReadyState.CLOSED && this.transport.writable &&
!this.upgrading && this.writeBuffer.size() != 0) { !this.upgrading && this.writeBuffer.size() != 0) {
@@ -569,7 +565,6 @@ public abstract class Socket extends Emitter {
Socket.this.onClose("forced close"); Socket.this.onClose("forced close");
logger.fine("socket closing - telling transport to close"); logger.fine("socket closing - telling transport to close");
Socket.this.transport.close(); Socket.this.transport.close();
Socket.this.transport.off();
} }
} }
@@ -580,6 +575,7 @@ public abstract class Socket extends Emitter {
private void onError(Exception err) { private void onError(Exception err) {
logger.fine(String.format("socket error %s", err)); logger.fine(String.format("socket error %s", err));
this.emit(EVENT_ERROR, err); this.emit(EVENT_ERROR, err);
this.onerror(err);
this.onClose("transport error", err); this.onClose("transport error", err);
} }
@@ -591,12 +587,15 @@ public abstract class Socket extends Emitter {
if (this.readyState == ReadyState.OPENING || this.readyState == ReadyState.OPEN) { if (this.readyState == ReadyState.OPENING || this.readyState == ReadyState.OPEN) {
logger.fine(String.format("socket close with reason: %s", reason)); logger.fine(String.format("socket close with reason: %s", reason));
final Socket self = this; final Socket self = this;
// clear timers
if (this.pingIntervalTimer != null) { if (this.pingIntervalTimer != null) {
this.pingIntervalTimer.cancel(true); this.pingIntervalTimer.cancel(true);
} }
if (this.pingTimeoutTimer != null) { if (this.pingTimeoutTimer != null) {
this.pingTimeoutTimer.cancel(true); this.pingTimeoutTimer.cancel(true);
} }
EventThread.nextTick(new Runnable() { EventThread.nextTick(new Runnable() {
@Override @Override
public void run() { public void run() {
@@ -604,13 +603,22 @@ public abstract class Socket extends Emitter {
self.callbackBuffer.clear(); self.callbackBuffer.clear();
} }
}); });
// ignore further transport communication
this.transport.off();
// set ready state
ReadyState prev = this.readyState; ReadyState prev = this.readyState;
this.readyState = ReadyState.CLOSED; this.readyState = ReadyState.CLOSED;
// clear session id
this.id = null;
// emit events
if (prev == ReadyState.OPEN) { if (prev == ReadyState.OPEN) {
this.emit(EVENT_CLOSE, reason, desc); this.emit(EVENT_CLOSE, reason, desc);
}
this.onclose(); this.onclose();
this.id = null; }
} }
} }
@@ -630,6 +638,7 @@ public abstract class Socket extends Emitter {
public abstract void onclose(); public abstract void onclose();
public abstract void onerror(Exception err);
public static class Options extends Transport.Options { public static class Options extends Transport.Options {

View File

@@ -128,8 +128,25 @@ abstract public class Polling extends Transport {
} }
protected void doClose() { protected void doClose() {
logger.fine("sending close packet"); final Polling self = this;
this.send(new Packet[] {new Packet(Packet.CLOSE, null)});
Listener close = new Listener() {
@Override
public void call(Object... args) {
logger.fine("writing close packet");
self.write(new Packet[] {new Packet(Packet.CLOSE, null)});
}
};
if (this.readyState == ReadyState.OPEN) {
logger.fine("transport open - closing");
close.call();
} else {
// in case we're trying to close while
// handshaking is in progress (engine.io-client GH-164)
logger.fine("transport not open - deferring close");
this.once(EVENT_OPEN, close);
}
} }
protected void write(Packet[] packets) { protected void write(Packet[] packets) {

View File

@@ -105,6 +105,9 @@ public class ServerConnectionTest {
System.out.println("onclose:"); System.out.println("onclose:");
events.offer("onclose"); events.offer("onclose");
} }
@Override
public void onerror(Exception err) {}
}; };
socket.open(); socket.open();
@@ -132,6 +135,9 @@ public class ServerConnectionTest {
@Override @Override
public void onclose() {} public void onclose() {}
@Override
public void onerror(Exception err) {}
}; };
socket.open(); socket.open();
@@ -151,6 +157,8 @@ public class ServerConnectionTest {
public void onmessage(String data) {} public void onmessage(String data) {}
@Override @Override
public void onclose() {} public void onclose() {}
@Override
public void onerror(Exception err) {}
}; };
socket.on(Socket.EVENT_HANDSHAKE, new Emitter.Listener() { socket.on(Socket.EVENT_HANDSHAKE, new Emitter.Listener() {
@Override @Override
@@ -186,6 +194,8 @@ public class ServerConnectionTest {
public void onmessage(String data) {} public void onmessage(String data) {}
@Override @Override
public void onclose() {} public void onclose() {}
@Override
public void onerror(Exception err) {}
}; };
socket.on(Socket.EVENT_UPGRADING, new Emitter.Listener() { socket.on(Socket.EVENT_UPGRADING, new Emitter.Listener() {
@Override @Override
@@ -237,6 +247,9 @@ public class ServerConnectionTest {
@Override @Override
public void onclose() {} public void onclose() {}
@Override
public void onerror(Exception err) {}
}; };
socket.open(); socket.open();

View File

@@ -28,6 +28,8 @@ public class SocketTest {
public void onmessage(String data) {} public void onmessage(String data) {}
@Override @Override
public void onclose() {} public void onclose() {}
@Override
public void onerror(Exception err) {}
}; };
List<String> upgrades = new ArrayList<String>() {{ List<String> upgrades = new ArrayList<String>() {{
add(Polling.NAME); add(Polling.NAME);
@@ -51,6 +53,8 @@ public class SocketTest {
public void onmessage(String data) {} public void onmessage(String data) {}
@Override @Override
public void onclose() {} public void onclose() {}
@Override
public void onerror(Exception err) {}
}; };
final boolean[] closed = {false}; final boolean[] closed = {false};