feat: implement catch-all listeners

Syntax:

```java
socket.onAnyIncoming(new Emitter.Listener() {
    @Override
    public void call(Object... args) {
        // ...
    }
});

socket.onAnyOutgoing(new Emitter.Listener() {
    @Override
    public void call(Object... args) {
        // ...
    }
});
```

Related:

- https://github.com/socketio/engine.io-client-java/issues/99
- https://github.com/socketio/socket.io-client-java/issues/243
- https://github.com/socketio/socket.io-client-java/issues/475
This commit is contained in:
Damien Arrachequesne
2022-07-08 20:30:08 +02:00
parent fca3b9507d
commit c7d50b8ae9
2 changed files with 117 additions and 0 deletions

View File

@@ -9,6 +9,7 @@ import org.json.JSONException;
import org.json.JSONObject; import org.json.JSONObject;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@@ -63,6 +64,9 @@ public class Socket extends Emitter {
private final Queue<List<Object>> receiveBuffer = new LinkedList<>(); private final Queue<List<Object>> receiveBuffer = new LinkedList<>();
private final Queue<Packet<JSONArray>> sendBuffer = new LinkedList<>(); private final Queue<Packet<JSONArray>> sendBuffer = new LinkedList<>();
private ConcurrentLinkedQueue<Listener> onAnyIncomingListeners = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<Listener> onAnyOutgoingListeners = new ConcurrentLinkedQueue<>();
public Socket(Manager io, String nsp, Manager.Options opts) { public Socket(Manager io, String nsp, Manager.Options opts) {
this.io = io; this.io = io;
this.nsp = nsp; this.nsp = nsp;
@@ -250,6 +254,14 @@ public class Socket extends Emitter {
} }
private void packet(Packet packet) { private void packet(Packet packet) {
if (packet.type == Parser.EVENT) {
if (!onAnyOutgoingListeners.isEmpty()) {
Object[] argsAsArray = toArray((JSONArray) packet.data);
for (Listener listener : onAnyOutgoingListeners) {
listener.call(argsAsArray);
}
}
}
packet.nsp = this.nsp; packet.nsp = this.nsp;
this.io.packet(packet); this.io.packet(packet);
} }
@@ -340,6 +352,12 @@ public class Socket extends Emitter {
if (this.connected) { if (this.connected) {
if (args.isEmpty()) return; if (args.isEmpty()) return;
if (!this.onAnyIncomingListeners.isEmpty()) {
Object[] argsAsArray = args.toArray();
for (Listener listener : this.onAnyIncomingListeners) {
listener.call(argsAsArray);
}
}
String event = args.remove(0).toString(); String event = args.remove(0).toString();
super.emit(event, args.toArray()); super.emit(event, args.toArray());
} else { } else {
@@ -507,5 +525,49 @@ public class Socket extends Emitter {
} }
return data; return data;
} }
public Socket onAnyIncoming(Listener fn) {
this.onAnyIncomingListeners.add(fn);
return this;
}
public Socket offAnyIncoming() {
this.onAnyIncomingListeners.clear();
return this;
}
public Socket offAnyIncoming(Listener fn) {
Iterator<Listener> it = this.onAnyIncomingListeners.iterator();
while (it.hasNext()) {
Listener listener = it.next();
if (listener == fn) {
it.remove();
break;
}
}
return this;
}
public Socket onAnyOutgoing(Listener fn) {
this.onAnyOutgoingListeners.add(fn);
return this;
}
public Socket offAnyOutgoing() {
this.onAnyOutgoingListeners.clear();
return this;
}
public Socket offAnyOutgoing(Listener fn) {
Iterator<Listener> it = this.onAnyOutgoingListeners.iterator();
while (it.hasNext()) {
Listener listener = it.next();
if (listener == fn) {
it.remove();
break;
}
}
return this;
}
} }

View File

@@ -395,4 +395,59 @@ public class SocketTest extends Connection {
assertThat((String) values.take(), is("2")); assertThat((String) values.take(), is("2"));
assertThat((byte[]) values.take(), is(new byte[] { 3 })); assertThat((byte[]) values.take(), is(new byte[] { 3 }));
} }
@Test(timeout = TIMEOUT)
public void shouldCallCatchAllListenerForIncomingPackets() throws InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<>();
socket = client();
socket.on("message", new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.emit("echo", 1, "2", new byte[] { 3 });
socket.onAnyIncoming(new Emitter.Listener() {
@Override
public void call(Object... args) {
for (Object arg : args) {
values.offer(arg);
}
}
});
}
});
socket.connect();
assertThat((String) values.take(), is("echoBack"));
assertThat((Integer) values.take(), is(1));
assertThat((String) values.take(), is("2"));
assertThat((byte[]) values.take(), is(new byte[] { 3 }));
}
@Test(timeout = TIMEOUT)
public void shouldCallCatchAllListenerForOutgoingPackets() throws InterruptedException {
final BlockingQueue<Object> values = new LinkedBlockingQueue<>();
socket = client();
socket.emit("echo", 1, "2", new byte[] { 3 });
socket.onAnyOutgoing(new Emitter.Listener() {
@Override
public void call(Object... args) {
for (Object arg : args) {
values.offer(arg);
}
}
});
socket.connect();
assertThat((String) values.take(), is("echo"));
assertThat((Integer) values.take(), is(1));
assertThat((String) values.take(), is("2"));
assertThat((byte[]) values.take(), is(new byte[] { 3 }));
}
} }