diff --git a/src/main/java/com/github/nkzawa/engineio/client/Socket.java b/src/main/java/com/github/nkzawa/engineio/client/Socket.java index 775a878..77828fb 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/Socket.java +++ b/src/main/java/com/github/nkzawa/engineio/client/Socket.java @@ -458,6 +458,8 @@ public class Socket extends Emitter { this.pingInterval = data.pingInterval; this.pingTimeout = data.pingTimeout; this.onOpen(); + // In case open handler closes socket + if (ReadyState.CLOSED == this.readyState) return; this.setPing(); this.off(EVENT_HEARTBEAT, this.onHeartbeatAsListener); diff --git a/src/test/java/com/github/nkzawa/engineio/client/BinaryPollingTest.java b/src/test/java/com/github/nkzawa/engineio/client/BinaryPollingTest.java new file mode 100644 index 0000000..e9735c1 --- /dev/null +++ b/src/test/java/com/github/nkzawa/engineio/client/BinaryPollingTest.java @@ -0,0 +1,93 @@ +package com.github.nkzawa.engineio.client; + +import com.github.nkzawa.emitter.Emitter; +import com.github.nkzawa.engineio.client.transports.Polling; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.concurrent.Semaphore; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +@RunWith(JUnit4.class) +public class BinaryPollingTest extends Connection { + + private Socket socket; + + @Test(timeout = TIMEOUT) + public void receiveBinaryData() throws InterruptedException { + final Semaphore semaphore = new Semaphore(0); + final byte[] binaryData = new byte[5]; + for (int i = 0; i < binaryData.length; i++) { + binaryData[i] = (byte)i; + } + Socket.Options opts = new Socket.Options(); + opts.port = PORT; + opts.transports = new String[] {Polling.NAME}; + + socket = new Socket(opts); + socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.send(binaryData); + socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() { + @Override + public void call(Object... args) { + if ("hi".equals(args[0])) return; + + assertThat(args[0], instanceOf(byte[].class)); + assertThat((byte[])args[0], is(binaryData)); + socket.close(); + semaphore.release(); + } + }); + } + }); + socket.open(); + semaphore.acquire(); + } + + @Test(timeout = TIMEOUT) + public void receiveBinaryDataAndMultibyteUTF8String() throws InterruptedException { + final Semaphore semaphore = new Semaphore(0); + final byte[] binaryData = new byte[5]; + for (int i = 0; i < binaryData.length; i++) { + binaryData[i] = (byte)i; + } + + final int[] msg = new int[] {0}; + Socket.Options opts = new Socket.Options(); + opts.port = PORT; + opts.transports = new String[] {Polling.NAME}; + socket = new Socket(opts); + socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.send(binaryData); + socket.send("cash money €€€"); + socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() { + @Override + public void call(Object... args) { + if ("hi".equals(args[0])) return; + + if (msg[0] == 0) { + assertThat(args[0], instanceOf(byte[].class)); + assertThat((byte[])args[0], is(binaryData)); + msg[0]++; + } else { + assertThat(args[0], instanceOf(String.class)); + assertThat((String)args[0], is("cash money €€€")); + socket.close(); + semaphore.release(); + } + } + }); + } + }); + socket.open(); + semaphore.acquire(); + } +} diff --git a/src/test/java/com/github/nkzawa/engineio/client/BinaryWSTest.java b/src/test/java/com/github/nkzawa/engineio/client/BinaryWSTest.java new file mode 100644 index 0000000..33985b9 --- /dev/null +++ b/src/test/java/com/github/nkzawa/engineio/client/BinaryWSTest.java @@ -0,0 +1,99 @@ +package com.github.nkzawa.engineio.client; + +import com.github.nkzawa.emitter.Emitter; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.concurrent.Semaphore; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +@RunWith(JUnit4.class) +public class BinaryWSTest extends Connection { + + private Socket socket; + + @Test(timeout = TIMEOUT) + public void receiveBinaryData() throws InterruptedException { + final Semaphore semaphore = new Semaphore(0); + final byte[] binaryData = new byte[5]; + for (int i = 0; i < binaryData.length; i++) { + binaryData[i] = (byte)i; + } + Socket.Options opts = new Socket.Options(); + opts.port = PORT; + socket = new Socket(opts); + socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.on(Socket.EVENT_UPGRADE, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.send(binaryData); + socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() { + @Override + public void call(Object... args) { + if (args[0] instanceof String) return; + + assertThat(args[0], instanceOf(byte[].class)); + assertThat((byte[])args[0], is(binaryData)); + + socket.close(); + semaphore.release(); + } + }); + } + }); + } + }); + socket.open(); + semaphore.acquire(); + } + + @Test(timeout = TIMEOUT) + public void receiveBinaryDataAndMultiplebyteUTF8String() throws InterruptedException { + final Semaphore semaphore = new Semaphore(0); + final byte[] binaryData = new byte[5]; + for (int i = 0; i < binaryData.length; i++) { + binaryData[i] = (byte)i; + } + + final int[] msg = new int[] {0}; + Socket.Options opts = new Socket.Options(); + opts.port = PORT; + socket = new Socket(opts); + socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.on(Socket.EVENT_UPGRADE, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.send(binaryData); + socket.send("cash money €€€"); + socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() { + @Override + public void call(Object... args) { + if ("hi".equals(args[0])) return; + + if (msg[0] == 0) { + assertThat(args[0], instanceOf(byte[].class)); + assertThat((byte[])args[0], is(binaryData)); + msg[0]++; + } else { + assertThat((String)args[0], is("cash money €€€")); + socket.close(); + semaphore.release(); + } + } + }); + } + }); + } + }); + socket.open(); + semaphore.acquire(); + } +} diff --git a/src/test/java/com/github/nkzawa/engineio/client/Connection.java b/src/test/java/com/github/nkzawa/engineio/client/Connection.java new file mode 100644 index 0000000..91fc024 --- /dev/null +++ b/src/test/java/com/github/nkzawa/engineio/client/Connection.java @@ -0,0 +1,73 @@ +package com.github.nkzawa.engineio.client; + +import org.junit.After; +import org.junit.Before; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.concurrent.*; + +public abstract class Connection { + + final static int TIMEOUT = 3000; + final static int PORT = 3000; + + private Process serverProcess; + private ExecutorService serverService; + private Future serverOutout; + private Future serverError; + + @Before + public void startServer() throws IOException, InterruptedException { + System.out.println("Starting server ..."); + + final CountDownLatch latch = new CountDownLatch(1); + serverProcess = Runtime.getRuntime().exec( + "node src/test/resources/index.js " + PORT, new String[] {"DEBUG=engine*"}); + serverService = Executors.newCachedThreadPool(); + serverOutout = serverService.submit(new Runnable() { + @Override + public void run() { + BufferedReader reader = new BufferedReader( + new InputStreamReader(serverProcess.getInputStream())); + String line; + try { + line = reader.readLine(); + latch.countDown(); + do { + System.out.println("SERVER OUT: " + line); + } while ((line = reader.readLine()) != null); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + serverError = serverService.submit(new Runnable() { + @Override + public void run() { + BufferedReader reader = new BufferedReader( + new InputStreamReader(serverProcess.getErrorStream())); + String line; + try { + while ((line = reader.readLine()) != null) { + System.err.println("SERVER ERR: " + line); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + latch.await(3000, TimeUnit.MILLISECONDS); + } + + @After + public void stopServer() throws InterruptedException { + System.out.println("Stopping server ..."); + serverProcess.destroy(); + serverOutout.cancel(true); + serverError.cancel(true); + serverService.shutdown(); + serverService.awaitTermination(3000, TimeUnit.MILLISECONDS); + } +} diff --git a/src/test/java/com/github/nkzawa/engineio/client/ConnectionTest.java b/src/test/java/com/github/nkzawa/engineio/client/ConnectionTest.java new file mode 100644 index 0000000..58f03a8 --- /dev/null +++ b/src/test/java/com/github/nkzawa/engineio/client/ConnectionTest.java @@ -0,0 +1,128 @@ +package com.github.nkzawa.engineio.client; + +import com.github.nkzawa.emitter.Emitter; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.Semaphore; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +@RunWith(JUnit4.class) +public class ConnectionTest extends Connection { + + private Socket socket; + + @Test(timeout = TIMEOUT) + public void connectToLocalhost() throws InterruptedException { + final Semaphore semaphore = new Semaphore(0); + + Socket.Options opts = new Socket.Options(); + opts.port = PORT; + socket = new Socket(opts); + socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() { + @Override + public void call(Object... args) { + assertThat((String)args[0], is("hi")); + socket.close(); + semaphore.release(); + } + }); + } + }); + socket.open(); + semaphore.acquire(); + } + + @Test(timeout = TIMEOUT) + public void receiveMultibyteUTF8StringsWithPolling() throws InterruptedException { + final Semaphore semaphore = new Semaphore(0); + + Socket.Options opts = new Socket.Options(); + opts.port = PORT; + socket = new Socket(opts); + socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.send("cash money €€€"); + socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() { + @Override + public void call(Object... args) { + if ("hi".equals(args[0])) return; + assertThat((String)args[0], is("cash money €€€")); + socket.close(); + semaphore.release(); + } + }); + } + }); + socket.open(); + semaphore.acquire(); + } + + @Test(timeout = TIMEOUT) + public void receiveEmoji() throws InterruptedException { + final Semaphore semaphore = new Semaphore(0); + + Socket.Options opts = new Socket.Options(); + opts.port = PORT; + socket = new Socket(opts); + socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { + @Override + public void call(Object... args) { + socket.send("\uD800-\uDB7F\uDB80-\uDBFF\uDC00-\uDFFF\uE000-\uF8FF"); + socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() { + @Override + public void call(Object... args) { + if ("hi".equals(args[0])) return; + assertThat((String)args[0], is("\uD800-\uDB7F\uDB80-\uDBFF\uDC00-\uDFFF\uE000-\uF8FF")); + socket.close(); + semaphore.release(); + } + }); + } + }); + socket.open(); + semaphore.acquire(); + } + + @Test(timeout = TIMEOUT) + public void notSendPacketsIfSocketCloses() throws InterruptedException { + final Semaphore semaphore = new Semaphore(0); + + Socket.Options opts = new Socket.Options(); + opts.port = PORT; + socket = new Socket(opts); + socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { + @Override + public void call(Object... args) { + final boolean[] noPacket = new boolean[] {true}; + socket.on(Socket.EVENT_PACKET_CREATE, new Emitter.Listener() { + @Override + public void call(Object... args) { + noPacket[0] = false; + } + }); + socket.close(); + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + assertThat(noPacket[0], is(true)); + semaphore.release(); + } + }, 1200); + + } + }); + socket.open(); + semaphore.acquire(); + } +} diff --git a/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java b/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java index 981fdb2..7168d7d 100644 --- a/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java +++ b/src/test/java/com/github/nkzawa/engineio/client/ServerConnectionTest.java @@ -4,18 +4,15 @@ import com.github.nkzawa.emitter.Emitter; import com.github.nkzawa.engineio.client.transports.Polling; import com.github.nkzawa.engineio.client.transports.WebSocket; import com.github.nkzawa.thread.EventThread; -import org.junit.After; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; import java.net.URISyntaxException; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; @@ -25,70 +22,10 @@ import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertThat; @RunWith(JUnit4.class) -public class ServerConnectionTest { +public class ServerConnectionTest extends Connection { - final static int TIMEOUT = 3000; - final static int PORT = 3000; - - private Process serverProcess; - private ExecutorService serverService; - private Future serverOutout; - private Future serverError; private Socket socket; - @Before - public void startServer() throws IOException, InterruptedException { - System.out.println("Starting server ..."); - - final CountDownLatch latch = new CountDownLatch(1); - serverProcess = Runtime.getRuntime().exec( - "node src/test/resources/index.js " + PORT, new String[] {"DEBUG=engine*"}); - serverService = Executors.newCachedThreadPool(); - serverOutout = serverService.submit(new Runnable() { - @Override - public void run() { - BufferedReader reader = new BufferedReader( - new InputStreamReader(serverProcess.getInputStream())); - String line; - try { - line = reader.readLine(); - latch.countDown(); - do { - System.out.println("SERVER OUT: " + line); - } while ((line = reader.readLine()) != null); - } catch (IOException e) { - e.printStackTrace(); - } - } - }); - serverError = serverService.submit(new Runnable() { - @Override - public void run() { - BufferedReader reader = new BufferedReader( - new InputStreamReader(serverProcess.getErrorStream())); - String line; - try { - while ((line = reader.readLine()) != null) { - System.err.println("SERVER ERR: " + line); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - }); - latch.await(3000, TimeUnit.MILLISECONDS); - } - - @After - public void stopServer() throws InterruptedException { - System.out.println("Stopping server ..."); - serverProcess.destroy(); - serverOutout.cancel(true); - serverError.cancel(true); - serverService.shutdown(); - serverService.awaitTermination(3000, TimeUnit.MILLISECONDS); - } - @Test(timeout = TIMEOUT) public void openAndClose() throws URISyntaxException, InterruptedException { final BlockingQueue events = new LinkedBlockingQueue(); @@ -120,7 +57,7 @@ public class ServerConnectionTest { socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { @Override public void call(Object... args) { - socket.send("hi"); + socket.send("hello"); } }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() { @Override @@ -130,8 +67,8 @@ public class ServerConnectionTest { }); socket.open(); - assertThat(events.take(), is("hello client")); assertThat(events.take(), is("hi")); + assertThat(events.take(), is("hello")); socket.close(); } @@ -337,72 +274,4 @@ public class ServerConnectionTest { }); semaphore.acquire(); } - - @Test(timeout = TIMEOUT) - public void sendAndReceiveBinaryDataWhenPolling() throws InterruptedException { - final Semaphore semaphore = new Semaphore(0); - final byte[] binaryData = new byte[5]; - for (int i = 0; i < binaryData.length; i++) { - binaryData[i] = (byte)i; - } - - Socket.Options opts = new Socket.Options(); - opts.port = PORT; - opts.transports = new String[] {Polling.NAME}; - - socket = new Socket(opts); - socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { - @Override - public void call(Object... args) { - socket.send(binaryData); - } - }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() { - @Override - public void call(Object... args) { - if (args[0] instanceof byte[]) { - assertThat((byte[])args[0], is(binaryData)); - socket.close(); - semaphore.release(); - } - } - }); - socket.open(); - semaphore.acquire(); - } - - @Test(timeout = TIMEOUT) - public void sendAndReceiveBinaryDataWhenWS() throws InterruptedException { - final Semaphore semaphore = new Semaphore(0); - final byte[] binaryData = new byte[5]; - for (int i = 0; i < binaryData.length; i++) { - binaryData[i] = (byte)i; - } - - Socket.Options opts = new Socket.Options(); - opts.port = PORT; - - socket = new Socket(opts); - socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { - @Override - public void call(Object... args) { - socket.on(Socket.EVENT_UPGRADE, new Emitter.Listener() { - @Override - public void call(Object... args) { - socket.send(binaryData); - } - }); - } - }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() { - @Override - public void call(Object... args) { - if (args[0] instanceof byte[]) { - assertThat((byte[])args[0], is(binaryData)); - socket.close(); - semaphore.release(); - } - } - }); - socket.open(); - semaphore.acquire(); - } } diff --git a/src/test/java/com/github/nkzawa/engineio/client/TransportTest.java b/src/test/java/com/github/nkzawa/engineio/client/TransportTest.java index 5d03ff2..5b0cb05 100644 --- a/src/test/java/com/github/nkzawa/engineio/client/TransportTest.java +++ b/src/test/java/com/github/nkzawa/engineio/client/TransportTest.java @@ -20,7 +20,7 @@ public class TransportTest { @Test public void uri() { Transport.Options opt = new Transport.Options(); - opt.path ="/engine.io"; + opt.path = "/engine.io"; opt.hostname = "localhost"; opt.secure = false; opt.query = new HashMap() {{ @@ -104,7 +104,7 @@ public class TransportTest { @Test public void wssUri() { Transport.Options opt = new Transport.Options(); - opt.path ="/engine.io"; + opt.path = "/engine.io"; opt.hostname = "test"; opt.secure = true; opt.timestampRequests = false; @@ -115,7 +115,7 @@ public class TransportTest { @Test public void wsTimestampedUri() { Transport.Options opt = new Transport.Options(); - opt.path ="/engine.io"; + opt.path = "/engine.io"; opt.hostname = "localhost"; opt.timestampParam = "woot"; opt.timestampRequests = true; diff --git a/src/test/resources/index.js b/src/test/resources/index.js index 47711db..03a5d26 100644 --- a/src/test/resources/index.js +++ b/src/test/resources/index.js @@ -1,12 +1,14 @@ +var http = require('http').Server(); var engine = require('engine.io'); +var server = engine.attach(http, {pingInterval: 500}); var port = parseInt(process.argv[2], 10) || 3000 -var server = engine.listen(port, function() { +http.listen(port, function() { console.log('Engine.IO server listening on port', port); }); server.on('connection', function(socket) { - socket.send('hello client'); + socket.send('hi'); socket.on('message', function(message) { socket.send(message); diff --git a/src/test/resources/package.json b/src/test/resources/package.json index 7e06042..8f332d2 100644 --- a/src/test/resources/package.json +++ b/src/test/resources/package.json @@ -3,6 +3,6 @@ "version": "0.0.0", "private": true, "dependencies": { - "engine.io": "1.1.0" + "engine.io": "1.2.2" } }