From c7d50b8ae9787e9ebdff50aa5d36f88433fc50b9 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Fri, 8 Jul 2022 20:30:08 +0200 Subject: [PATCH] feat: implement catch-all listeners Syntax: ```java socket.onAnyIncoming(new Emitter.Listener() { @Override public void call(Object... args) { // ... } }); socket.onAnyOutgoing(new Emitter.Listener() { @Override public void call(Object... args) { // ... } }); ``` Related: - https://github.com/socketio/engine.io-client-java/issues/99 - https://github.com/socketio/socket.io-client-java/issues/243 - https://github.com/socketio/socket.io-client-java/issues/475 --- src/main/java/io/socket/client/Socket.java | 62 +++++++++++++++++++ .../java/io/socket/client/SocketTest.java | 55 ++++++++++++++++ 2 files changed, 117 insertions(+) diff --git a/src/main/java/io/socket/client/Socket.java b/src/main/java/io/socket/client/Socket.java index 7609b40..fe50c08 100644 --- a/src/main/java/io/socket/client/Socket.java +++ b/src/main/java/io/socket/client/Socket.java @@ -9,6 +9,7 @@ import org.json.JSONException; import org.json.JSONObject; import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Level; import java.util.logging.Logger; @@ -63,6 +64,9 @@ public class Socket extends Emitter { private final Queue> receiveBuffer = new LinkedList<>(); private final Queue> sendBuffer = new LinkedList<>(); + private ConcurrentLinkedQueue onAnyIncomingListeners = new ConcurrentLinkedQueue<>(); + private ConcurrentLinkedQueue onAnyOutgoingListeners = new ConcurrentLinkedQueue<>(); + public Socket(Manager io, String nsp, Manager.Options opts) { this.io = io; this.nsp = nsp; @@ -250,6 +254,14 @@ public class Socket extends Emitter { } private void packet(Packet packet) { + if (packet.type == Parser.EVENT) { + if (!onAnyOutgoingListeners.isEmpty()) { + Object[] argsAsArray = toArray((JSONArray) packet.data); + for (Listener listener : onAnyOutgoingListeners) { + listener.call(argsAsArray); + } + } + } packet.nsp = this.nsp; this.io.packet(packet); } @@ -340,6 +352,12 @@ public class Socket extends Emitter { if (this.connected) { if (args.isEmpty()) return; + if (!this.onAnyIncomingListeners.isEmpty()) { + Object[] argsAsArray = args.toArray(); + for (Listener listener : this.onAnyIncomingListeners) { + listener.call(argsAsArray); + } + } String event = args.remove(0).toString(); super.emit(event, args.toArray()); } else { @@ -507,5 +525,49 @@ public class Socket extends Emitter { } return data; } + + public Socket onAnyIncoming(Listener fn) { + this.onAnyIncomingListeners.add(fn); + return this; + } + + public Socket offAnyIncoming() { + this.onAnyIncomingListeners.clear(); + return this; + } + + public Socket offAnyIncoming(Listener fn) { + Iterator it = this.onAnyIncomingListeners.iterator(); + while (it.hasNext()) { + Listener listener = it.next(); + if (listener == fn) { + it.remove(); + break; + } + } + return this; + } + + public Socket onAnyOutgoing(Listener fn) { + this.onAnyOutgoingListeners.add(fn); + return this; + } + + public Socket offAnyOutgoing() { + this.onAnyOutgoingListeners.clear(); + return this; + } + + public Socket offAnyOutgoing(Listener fn) { + Iterator it = this.onAnyOutgoingListeners.iterator(); + while (it.hasNext()) { + Listener listener = it.next(); + if (listener == fn) { + it.remove(); + break; + } + } + return this; + } } diff --git a/src/test/java/io/socket/client/SocketTest.java b/src/test/java/io/socket/client/SocketTest.java index af9a55c..3db641d 100644 --- a/src/test/java/io/socket/client/SocketTest.java +++ b/src/test/java/io/socket/client/SocketTest.java @@ -395,4 +395,59 @@ public class SocketTest extends Connection { assertThat((String) values.take(), is("2")); assertThat((byte[]) values.take(), is(new byte[] { 3 })); } + + @Test(timeout = TIMEOUT) + public void shouldCallCatchAllListenerForIncomingPackets() throws InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue<>(); + + socket = client(); + + socket.on("message", new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.emit("echo", 1, "2", new byte[] { 3 }); + + socket.onAnyIncoming(new Emitter.Listener() { + @Override + public void call(Object... args) { + for (Object arg : args) { + values.offer(arg); + } + } + }); + } + }); + + socket.connect(); + + assertThat((String) values.take(), is("echoBack")); + assertThat((Integer) values.take(), is(1)); + assertThat((String) values.take(), is("2")); + assertThat((byte[]) values.take(), is(new byte[] { 3 })); + } + + @Test(timeout = TIMEOUT) + public void shouldCallCatchAllListenerForOutgoingPackets() throws InterruptedException { + final BlockingQueue values = new LinkedBlockingQueue<>(); + + socket = client(); + + socket.emit("echo", 1, "2", new byte[] { 3 }); + + socket.onAnyOutgoing(new Emitter.Listener() { + @Override + public void call(Object... args) { + for (Object arg : args) { + values.offer(arg); + } + } + }); + + socket.connect(); + + assertThat((String) values.take(), is("echo")); + assertThat((Integer) values.take(), is(1)); + assertThat((String) values.take(), is("2")); + assertThat((byte[]) values.take(), is(new byte[] { 3 })); + } }