feat: emit with timeout
This feature allows to send a packet and expect an acknowledgement from
the server within the given delay.
Syntax:
```java
socket.emit("hello", "world", new AckWithTimeout(5000) {
@Override
public void onTimeout() {
// ...
}
@Override
public void onSuccess(Object... args) {
// ...
}
});
```
Related:
- https://github.com/socketio/socket.io-client-java/issues/309
- https://github.com/socketio/socket.io-client-java/pull/517
This commit is contained in:
35
src/main/java/io/socket/client/AckWithTimeout.java
Normal file
35
src/main/java/io/socket/client/AckWithTimeout.java
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
package io.socket.client;
|
||||||
|
|
||||||
|
import java.util.Timer;
|
||||||
|
import java.util.TimerTask;
|
||||||
|
|
||||||
|
public abstract class AckWithTimeout implements Ack {
|
||||||
|
private final long timeout;
|
||||||
|
private final Timer timer = new Timer();
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param timeout delay in milliseconds
|
||||||
|
*/
|
||||||
|
public AckWithTimeout(long timeout) {
|
||||||
|
this.timeout = timeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final void call(Object... args) {
|
||||||
|
this.timer.cancel();
|
||||||
|
this.onSuccess(args);
|
||||||
|
}
|
||||||
|
|
||||||
|
public final void schedule(TimerTask task) {
|
||||||
|
this.timer.schedule(task, this.timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
public final void cancelTimer() {
|
||||||
|
this.timer.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract void onSuccess(Object... args);
|
||||||
|
public abstract void onTimeout();
|
||||||
|
|
||||||
|
}
|
||||||
@@ -210,8 +210,32 @@ public class Socket extends Emitter {
|
|||||||
Packet<JSONArray> packet = new Packet<>(Parser.EVENT, jsonArgs);
|
Packet<JSONArray> packet = new Packet<>(Parser.EVENT, jsonArgs);
|
||||||
|
|
||||||
if (ack != null) {
|
if (ack != null) {
|
||||||
logger.fine(String.format("emitting packet with ack id %d", ids));
|
final int ackId = Socket.this.ids;
|
||||||
Socket.this.acks.put(ids, ack);
|
|
||||||
|
logger.fine(String.format("emitting packet with ack id %d", ackId));
|
||||||
|
|
||||||
|
if (ack instanceof AckWithTimeout) {
|
||||||
|
final AckWithTimeout ackWithTimeout = (AckWithTimeout) ack;
|
||||||
|
ackWithTimeout.schedule(new TimerTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
// remove the ack from the map (to prevent an actual acknowledgement)
|
||||||
|
acks.remove(ackId);
|
||||||
|
|
||||||
|
// remove the packet from the buffer (if applicable)
|
||||||
|
Iterator<Packet<JSONArray>> iterator = sendBuffer.iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
if (iterator.next().id == ackId) {
|
||||||
|
iterator.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ackWithTimeout.onTimeout();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
Socket.this.acks.put(ackId, ack);
|
||||||
packet.id = ids++;
|
packet.id = ids++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -405,6 +429,12 @@ public class Socket extends Emitter {
|
|||||||
this.subs = null;
|
this.subs = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (Ack ack : acks.values()) {
|
||||||
|
if (ack instanceof AckWithTimeout) {
|
||||||
|
((AckWithTimeout) ack).cancelTimer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
this.io.destroy();
|
this.io.destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,18 +4,17 @@ import io.socket.emitter.Emitter;
|
|||||||
import io.socket.util.Optional;
|
import io.socket.util.Optional;
|
||||||
import org.json.JSONException;
|
import org.json.JSONException;
|
||||||
import org.json.JSONObject;
|
import org.json.JSONObject;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.JUnit4;
|
import org.junit.runners.JUnit4;
|
||||||
|
|
||||||
import java.util.Timer;
|
import java.util.Timer;
|
||||||
import java.util.TimerTask;
|
import java.util.TimerTask;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
|
|
||||||
import static java.util.Collections.singletonMap;
|
import static java.util.Collections.singletonMap;
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.*;
|
||||||
import static org.hamcrest.CoreMatchers.not;
|
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
@@ -287,4 +286,113 @@ public class SocketTest extends Connection {
|
|||||||
assertThat(values.take(), is("first"));
|
assertThat(values.take(), is("first"));
|
||||||
assertThat(values.take(), is("second"));
|
assertThat(values.take(), is("second"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = TIMEOUT)
|
||||||
|
public void shouldTimeoutAfterTheGivenDelayWhenSocketIsNotConnected() throws InterruptedException {
|
||||||
|
final BlockingQueue<Boolean> values = new LinkedBlockingQueue<>();
|
||||||
|
|
||||||
|
socket = client();
|
||||||
|
|
||||||
|
socket.emit("event", new AckWithTimeout(50) {
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Object... args) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTimeout() {
|
||||||
|
values.offer(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertThat(values.take(), is(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = TIMEOUT)
|
||||||
|
public void shouldTimeoutWhenTheServerDoesNotAcknowledgeTheEvent() throws InterruptedException {
|
||||||
|
final BlockingQueue<Boolean> values = new LinkedBlockingQueue<>();
|
||||||
|
|
||||||
|
socket = client();
|
||||||
|
|
||||||
|
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
|
||||||
|
@Override
|
||||||
|
public void call(Object... args) {
|
||||||
|
socket.emit("unknown", new AckWithTimeout(50) {
|
||||||
|
@Override
|
||||||
|
public void onTimeout() {
|
||||||
|
values.offer(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Object... args) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
socket.connect();
|
||||||
|
|
||||||
|
assertThat(values.take(), is(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = TIMEOUT)
|
||||||
|
public void shouldTimeoutWhenTheServerDoesNotAcknowledgeTheEventInTime() throws InterruptedException {
|
||||||
|
final BlockingQueue<Boolean> values = new LinkedBlockingQueue<>();
|
||||||
|
|
||||||
|
socket = client();
|
||||||
|
|
||||||
|
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
|
||||||
|
@Override
|
||||||
|
public void call(Object... args) {
|
||||||
|
socket.emit("ack", new AckWithTimeout(0) {
|
||||||
|
@Override
|
||||||
|
public void onTimeout() {
|
||||||
|
values.offer(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Object... args) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
socket.connect();
|
||||||
|
|
||||||
|
assertThat(values.take(), is(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = TIMEOUT)
|
||||||
|
public void shouldNotTimeoutWhenTheServerDoesAcknowledgeTheEvent() throws InterruptedException {
|
||||||
|
final BlockingQueue<Object> values = new LinkedBlockingQueue<>();
|
||||||
|
|
||||||
|
socket = client();
|
||||||
|
|
||||||
|
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
|
||||||
|
@Override
|
||||||
|
public void call(Object... args) {
|
||||||
|
socket.emit("ack", 1, "2", new byte[] { 3 }, new AckWithTimeout(200) {
|
||||||
|
@Override
|
||||||
|
public void onTimeout() {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSuccess(Object... args) {
|
||||||
|
for (Object arg : args) {
|
||||||
|
values.offer(arg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
socket.connect();
|
||||||
|
|
||||||
|
assertThat((Integer) values.take(), is(1));
|
||||||
|
assertThat((String) values.take(), is("2"));
|
||||||
|
assertThat((byte[]) values.take(), is(new byte[] { 3 }));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user