From 0d4d45570424a8509af31e44287605322dc24c4a Mon Sep 17 00:00:00 2001 From: nkzawa Date: Mon, 1 Feb 2016 00:48:57 +0900 Subject: [PATCH] add ping and pong events --- src/main/java/io/socket/client/Manager.java | 35 ++++++++++++++++- src/main/java/io/socket/client/Socket.java | 6 +++ .../java/io/socket/client/SocketTest.java | 38 +++++++++++++++++++ src/test/resources/server.js | 2 +- 4 files changed, 78 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/socket/client/Manager.java b/src/main/java/io/socket/client/Manager.java index 1c2e0b6..2cc6d9d 100644 --- a/src/main/java/io/socket/client/Manager.java +++ b/src/main/java/io/socket/client/Manager.java @@ -64,6 +64,10 @@ public class Manager extends Emitter { public static final String EVENT_RECONNECTING = "reconnecting"; + public static final String EVENT_PING = "ping"; + + public static final String EVENT_PONG = "pong"; + /** * Called when a new transport is created. (experimental) */ @@ -85,6 +89,7 @@ public class Manager extends Emitter { private Backoff backoff; private long _timeout; private Set connecting = new HashSet(); + private Date lastPing; private URI uri; private List packetBuffer; private Queue subs; @@ -348,10 +353,16 @@ public class Manager extends Emitter { } } })); - this.subs.add(On.on(this.decoder, Parser.Decoder.EVENT_DECODED, new Listener() { + this.subs.add(On.on(socket, Engine.EVENT_PING, new Listener() { @Override public void call(Object... objects) { - Manager.this.ondecoded((Packet) objects[0]); + Manager.this.onping(); + } + })); + this.subs.add(On.on(socket, Engine.EVENT_PONG, new Listener() { + @Override + public void call(Object... objects) { + Manager.this.onpong(); } })); this.subs.add(On.on(socket, Engine.EVENT_ERROR, new Listener() { @@ -366,6 +377,22 @@ public class Manager extends Emitter { Manager.this.onclose((String)objects[0]); } })); + this.subs.add(On.on(this.decoder, Parser.Decoder.EVENT_DECODED, new Listener() { + @Override + public void call(Object... objects) { + Manager.this.ondecoded((Packet) objects[0]); + } + })); + } + + private void onping() { + this.lastPing = new Date(); + this.emitAll(EVENT_PING); + } + + private void onpong() { + this.emitAll(EVENT_PONG, + null != this.lastPing ? new Date().getTime() - this.lastPing.getTime() : 0); } private void ondata(String data) { @@ -458,8 +485,12 @@ public class Manager extends Emitter { } private void cleanup() { + logger.fine("cleanup"); + On.Handle sub; while ((sub = this.subs.poll()) != null) sub.destroy(); + + this.lastPing = null; } /*package*/ void close() { diff --git a/src/main/java/io/socket/client/Socket.java b/src/main/java/io/socket/client/Socket.java index 06ba841..cd998f4 100644 --- a/src/main/java/io/socket/client/Socket.java +++ b/src/main/java/io/socket/client/Socket.java @@ -58,6 +58,10 @@ public class Socket extends Emitter { public static final String EVENT_RECONNECTING = Manager.EVENT_RECONNECTING; + public static final String EVENT_PING = Manager.EVENT_PING; + + public static final String EVENT_PONG = Manager.EVENT_PONG; + protected static Map events = new HashMap() {{ put(EVENT_CONNECT, 1); put(EVENT_CONNECT_ERROR, 1); @@ -70,6 +74,8 @@ public class Socket extends Emitter { put(EVENT_RECONNECT_FAILED, 1); put(EVENT_RECONNECT_ERROR, 1); put(EVENT_RECONNECTING, 1); + put(EVENT_PING, 1); + put(EVENT_PONG, 1); }}; /*package*/ String id; diff --git a/src/test/java/io/socket/client/SocketTest.java b/src/test/java/io/socket/client/SocketTest.java index 800ae1f..1744bd4 100644 --- a/src/test/java/io/socket/client/SocketTest.java +++ b/src/test/java/io/socket/client/SocketTest.java @@ -11,6 +11,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertThat; @RunWith(JUnit4.class) @@ -60,6 +61,43 @@ public class SocketTest extends Connection { assertThat(id.isPresent(), is(false)); } + @Test(timeout = TIMEOUT) + public void pingAndPongWithLatency() throws URISyntaxException, InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue(); + socket = client(); + socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... objects) { + final boolean[] pinged = new boolean[] { false }; + socket.once(Socket.EVENT_PING, new Emitter.Listener() { + @Override + public void call(Object... args) { + pinged[0] = true; + } + }); + socket.once(Socket.EVENT_PONG, new Emitter.Listener() { + @Override + public void call(Object... args) { + long ms = (long)args[0]; + values.offer(pinged[0]); + values.offer(ms); + } + }); + } + }); + socket.connect(); + + @SuppressWarnings("unchecked") + boolean pinged = (boolean)values.take(); + assertThat(pinged, is(true)); + + @SuppressWarnings("unchecked") + long ms = (long)values.take(); + assertThat(ms, greaterThan((long)0)); + + socket.disconnect(); + } + @Test(timeout = TIMEOUT) public void shouldChangeSocketIdUponReconnection() throws URISyntaxException, InterruptedException { final BlockingQueue values = new LinkedBlockingQueue(); diff --git a/src/test/resources/server.js b/src/test/resources/server.js index 6e8c14a..8f12fa3 100644 --- a/src/test/resources/server.js +++ b/src/test/resources/server.js @@ -10,7 +10,7 @@ if (process.env.SSL) { server = require('http').createServer(); } -var io = require('socket.io')(server); +var io = require('socket.io')(server, { pingInterval: 2000 }); var port = process.env.PORT || 3000; var nsp = process.argv[2] || '/'; var slice = Array.prototype.slice;