From fca3b9507d5bc79d3c41ab6e119efccd23669ca6 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Fri, 8 Jul 2022 19:46:05 +0200 Subject: [PATCH] feat: emit with timeout This feature allows to send a packet and expect an acknowledgement from the server within the given delay. Syntax: ```java socket.emit("hello", "world", new AckWithTimeout(5000) { @Override public void onTimeout() { // ... } @Override public void onSuccess(Object... args) { // ... } }); ``` Related: - https://github.com/socketio/socket.io-client-java/issues/309 - https://github.com/socketio/socket.io-client-java/pull/517 --- .../java/io/socket/client/AckWithTimeout.java | 35 ++++++ src/main/java/io/socket/client/Socket.java | 34 ++++- .../java/io/socket/client/SocketTest.java | 116 +++++++++++++++++- 3 files changed, 179 insertions(+), 6 deletions(-) create mode 100644 src/main/java/io/socket/client/AckWithTimeout.java diff --git a/src/main/java/io/socket/client/AckWithTimeout.java b/src/main/java/io/socket/client/AckWithTimeout.java new file mode 100644 index 0000000..88c43c7 --- /dev/null +++ b/src/main/java/io/socket/client/AckWithTimeout.java @@ -0,0 +1,35 @@ +package io.socket.client; + +import java.util.Timer; +import java.util.TimerTask; + +public abstract class AckWithTimeout implements Ack { + private final long timeout; + private final Timer timer = new Timer(); + + /** + * + * @param timeout delay in milliseconds + */ + public AckWithTimeout(long timeout) { + this.timeout = timeout; + } + + @Override + public final void call(Object... args) { + this.timer.cancel(); + this.onSuccess(args); + } + + public final void schedule(TimerTask task) { + this.timer.schedule(task, this.timeout); + } + + public final void cancelTimer() { + this.timer.cancel(); + } + + public abstract void onSuccess(Object... args); + public abstract void onTimeout(); + +} diff --git a/src/main/java/io/socket/client/Socket.java b/src/main/java/io/socket/client/Socket.java index 05feff3..7609b40 100644 --- a/src/main/java/io/socket/client/Socket.java +++ b/src/main/java/io/socket/client/Socket.java @@ -210,8 +210,32 @@ public class Socket extends Emitter { Packet packet = new Packet<>(Parser.EVENT, jsonArgs); if (ack != null) { - logger.fine(String.format("emitting packet with ack id %d", ids)); - Socket.this.acks.put(ids, ack); + final int ackId = Socket.this.ids; + + logger.fine(String.format("emitting packet with ack id %d", ackId)); + + if (ack instanceof AckWithTimeout) { + final AckWithTimeout ackWithTimeout = (AckWithTimeout) ack; + ackWithTimeout.schedule(new TimerTask() { + @Override + public void run() { + // remove the ack from the map (to prevent an actual acknowledgement) + acks.remove(ackId); + + // remove the packet from the buffer (if applicable) + Iterator> iterator = sendBuffer.iterator(); + while (iterator.hasNext()) { + if (iterator.next().id == ackId) { + iterator.remove(); + } + } + + ackWithTimeout.onTimeout(); + } + }); + } + + Socket.this.acks.put(ackId, ack); packet.id = ids++; } @@ -405,6 +429,12 @@ public class Socket extends Emitter { this.subs = null; } + for (Ack ack : acks.values()) { + if (ack instanceof AckWithTimeout) { + ((AckWithTimeout) ack).cancelTimer(); + } + } + this.io.destroy(); } diff --git a/src/test/java/io/socket/client/SocketTest.java b/src/test/java/io/socket/client/SocketTest.java index 7cc76fd..af9a55c 100644 --- a/src/test/java/io/socket/client/SocketTest.java +++ b/src/test/java/io/socket/client/SocketTest.java @@ -4,18 +4,17 @@ import io.socket.emitter.Emitter; import io.socket.util.Optional; import org.json.JSONException; import org.json.JSONObject; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.util.Timer; import java.util.TimerTask; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.*; import static java.util.Collections.singletonMap; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.*; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -287,4 +286,113 @@ public class SocketTest extends Connection { assertThat(values.take(), is("first")); assertThat(values.take(), is("second")); } + + @Test(timeout = TIMEOUT) + public void shouldTimeoutAfterTheGivenDelayWhenSocketIsNotConnected() throws InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue<>(); + + socket = client(); + + socket.emit("event", new AckWithTimeout(50) { + @Override + public void onSuccess(Object... args) { + fail(); + } + + @Override + public void onTimeout() { + values.offer(true); + } + }); + + assertThat(values.take(), is(true)); + } + + @Test(timeout = TIMEOUT) + public void shouldTimeoutWhenTheServerDoesNotAcknowledgeTheEvent() throws InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue<>(); + + socket = client(); + + socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.emit("unknown", new AckWithTimeout(50) { + @Override + public void onTimeout() { + values.offer(true); + } + + @Override + public void onSuccess(Object... args) { + fail(); + } + }); + } + }); + + socket.connect(); + + assertThat(values.take(), is(true)); + } + + @Test(timeout = TIMEOUT) + public void shouldTimeoutWhenTheServerDoesNotAcknowledgeTheEventInTime() throws InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue<>(); + + socket = client(); + + socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.emit("ack", new AckWithTimeout(0) { + @Override + public void onTimeout() { + values.offer(true); + } + + @Override + public void onSuccess(Object... args) { + fail(); + } + }); + } + }); + + socket.connect(); + + assertThat(values.take(), is(true)); + } + + @Test(timeout = TIMEOUT) + public void shouldNotTimeoutWhenTheServerDoesAcknowledgeTheEvent() throws InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue<>(); + + socket = client(); + + socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.emit("ack", 1, "2", new byte[] { 3 }, new AckWithTimeout(200) { + @Override + public void onTimeout() { + fail(); + } + + @Override + public void onSuccess(Object... args) { + for (Object arg : args) { + values.offer(arg); + } + } + }); + } + }); + + socket.connect(); + + assertThat((Integer) values.take(), is(1)); + assertThat((String) values.take(), is("2")); + assertThat((byte[]) values.take(), is(new byte[] { 3 })); + } }