add ping and pong events
This commit is contained in:
@@ -64,6 +64,10 @@ public class Manager extends Emitter {
|
||||
|
||||
public static final String EVENT_RECONNECTING = "reconnecting";
|
||||
|
||||
public static final String EVENT_PING = "ping";
|
||||
|
||||
public static final String EVENT_PONG = "pong";
|
||||
|
||||
/**
|
||||
* Called when a new transport is created. (experimental)
|
||||
*/
|
||||
@@ -85,6 +89,7 @@ public class Manager extends Emitter {
|
||||
private Backoff backoff;
|
||||
private long _timeout;
|
||||
private Set<Socket> connecting = new HashSet<Socket>();
|
||||
private Date lastPing;
|
||||
private URI uri;
|
||||
private List<Packet> packetBuffer;
|
||||
private Queue<On.Handle> subs;
|
||||
@@ -348,10 +353,16 @@ public class Manager extends Emitter {
|
||||
}
|
||||
}
|
||||
}));
|
||||
this.subs.add(On.on(this.decoder, Parser.Decoder.EVENT_DECODED, new Listener() {
|
||||
this.subs.add(On.on(socket, Engine.EVENT_PING, new Listener() {
|
||||
@Override
|
||||
public void call(Object... objects) {
|
||||
Manager.this.ondecoded((Packet) objects[0]);
|
||||
Manager.this.onping();
|
||||
}
|
||||
}));
|
||||
this.subs.add(On.on(socket, Engine.EVENT_PONG, new Listener() {
|
||||
@Override
|
||||
public void call(Object... objects) {
|
||||
Manager.this.onpong();
|
||||
}
|
||||
}));
|
||||
this.subs.add(On.on(socket, Engine.EVENT_ERROR, new Listener() {
|
||||
@@ -366,6 +377,22 @@ public class Manager extends Emitter {
|
||||
Manager.this.onclose((String)objects[0]);
|
||||
}
|
||||
}));
|
||||
this.subs.add(On.on(this.decoder, Parser.Decoder.EVENT_DECODED, new Listener() {
|
||||
@Override
|
||||
public void call(Object... objects) {
|
||||
Manager.this.ondecoded((Packet) objects[0]);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
private void onping() {
|
||||
this.lastPing = new Date();
|
||||
this.emitAll(EVENT_PING);
|
||||
}
|
||||
|
||||
private void onpong() {
|
||||
this.emitAll(EVENT_PONG,
|
||||
null != this.lastPing ? new Date().getTime() - this.lastPing.getTime() : 0);
|
||||
}
|
||||
|
||||
private void ondata(String data) {
|
||||
@@ -458,8 +485,12 @@ public class Manager extends Emitter {
|
||||
}
|
||||
|
||||
private void cleanup() {
|
||||
logger.fine("cleanup");
|
||||
|
||||
On.Handle sub;
|
||||
while ((sub = this.subs.poll()) != null) sub.destroy();
|
||||
|
||||
this.lastPing = null;
|
||||
}
|
||||
|
||||
/*package*/ void close() {
|
||||
|
||||
@@ -58,6 +58,10 @@ public class Socket extends Emitter {
|
||||
|
||||
public static final String EVENT_RECONNECTING = Manager.EVENT_RECONNECTING;
|
||||
|
||||
public static final String EVENT_PING = Manager.EVENT_PING;
|
||||
|
||||
public static final String EVENT_PONG = Manager.EVENT_PONG;
|
||||
|
||||
protected static Map<String, Integer> events = new HashMap<String, Integer>() {{
|
||||
put(EVENT_CONNECT, 1);
|
||||
put(EVENT_CONNECT_ERROR, 1);
|
||||
@@ -70,6 +74,8 @@ public class Socket extends Emitter {
|
||||
put(EVENT_RECONNECT_FAILED, 1);
|
||||
put(EVENT_RECONNECT_ERROR, 1);
|
||||
put(EVENT_RECONNECTING, 1);
|
||||
put(EVENT_PING, 1);
|
||||
put(EVENT_PONG, 1);
|
||||
}};
|
||||
|
||||
/*package*/ String id;
|
||||
|
||||
@@ -11,6 +11,7 @@ import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.*;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
@RunWith(JUnit4.class)
|
||||
@@ -60,6 +61,43 @@ public class SocketTest extends Connection {
|
||||
assertThat(id.isPresent(), is(false));
|
||||
}
|
||||
|
||||
@Test(timeout = TIMEOUT)
|
||||
public void pingAndPongWithLatency() throws URISyntaxException, InterruptedException {
|
||||
final BlockingQueue<Object> values = new LinkedBlockingQueue<Object>();
|
||||
socket = client();
|
||||
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
|
||||
@Override
|
||||
public void call(Object... objects) {
|
||||
final boolean[] pinged = new boolean[] { false };
|
||||
socket.once(Socket.EVENT_PING, new Emitter.Listener() {
|
||||
@Override
|
||||
public void call(Object... args) {
|
||||
pinged[0] = true;
|
||||
}
|
||||
});
|
||||
socket.once(Socket.EVENT_PONG, new Emitter.Listener() {
|
||||
@Override
|
||||
public void call(Object... args) {
|
||||
long ms = (long)args[0];
|
||||
values.offer(pinged[0]);
|
||||
values.offer(ms);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
socket.connect();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
boolean pinged = (boolean)values.take();
|
||||
assertThat(pinged, is(true));
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
long ms = (long)values.take();
|
||||
assertThat(ms, greaterThan((long)0));
|
||||
|
||||
socket.disconnect();
|
||||
}
|
||||
|
||||
@Test(timeout = TIMEOUT)
|
||||
public void shouldChangeSocketIdUponReconnection() throws URISyntaxException, InterruptedException {
|
||||
final BlockingQueue<Optional> values = new LinkedBlockingQueue<Optional>();
|
||||
|
||||
@@ -10,7 +10,7 @@ if (process.env.SSL) {
|
||||
server = require('http').createServer();
|
||||
}
|
||||
|
||||
var io = require('socket.io')(server);
|
||||
var io = require('socket.io')(server, { pingInterval: 2000 });
|
||||
var port = process.env.PORT || 3000;
|
||||
var nsp = process.argv[2] || '/';
|
||||
var slice = Array.prototype.slice;
|
||||
|
||||
Reference in New Issue
Block a user