compatible with engine.io-client 1.2.2

This commit is contained in:
Naoyuki Kanezawa
2014-06-01 20:43:31 +09:00
parent 7c67fa5d9b
commit cdc2c87f49
9 changed files with 409 additions and 143 deletions

View File

@@ -458,6 +458,8 @@ public class Socket extends Emitter {
this.pingInterval = data.pingInterval; this.pingInterval = data.pingInterval;
this.pingTimeout = data.pingTimeout; this.pingTimeout = data.pingTimeout;
this.onOpen(); this.onOpen();
// In case open handler closes socket
if (ReadyState.CLOSED == this.readyState) return;
this.setPing(); this.setPing();
this.off(EVENT_HEARTBEAT, this.onHeartbeatAsListener); this.off(EVENT_HEARTBEAT, this.onHeartbeatAsListener);

View File

@@ -0,0 +1,93 @@
package com.github.nkzawa.engineio.client;
import com.github.nkzawa.emitter.Emitter;
import com.github.nkzawa.engineio.client.transports.Polling;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.concurrent.Semaphore;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@RunWith(JUnit4.class)
public class BinaryPollingTest extends Connection {
private Socket socket;
@Test(timeout = TIMEOUT)
public void receiveBinaryData() throws InterruptedException {
final Semaphore semaphore = new Semaphore(0);
final byte[] binaryData = new byte[5];
for (int i = 0; i < binaryData.length; i++) {
binaryData[i] = (byte)i;
}
Socket.Options opts = new Socket.Options();
opts.port = PORT;
opts.transports = new String[] {Polling.NAME};
socket = new Socket(opts);
socket.on(Socket.EVENT_OPEN, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.send(binaryData);
socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
@Override
public void call(Object... args) {
if ("hi".equals(args[0])) return;
assertThat(args[0], instanceOf(byte[].class));
assertThat((byte[])args[0], is(binaryData));
socket.close();
semaphore.release();
}
});
}
});
socket.open();
semaphore.acquire();
}
@Test(timeout = TIMEOUT)
public void receiveBinaryDataAndMultibyteUTF8String() throws InterruptedException {
final Semaphore semaphore = new Semaphore(0);
final byte[] binaryData = new byte[5];
for (int i = 0; i < binaryData.length; i++) {
binaryData[i] = (byte)i;
}
final int[] msg = new int[] {0};
Socket.Options opts = new Socket.Options();
opts.port = PORT;
opts.transports = new String[] {Polling.NAME};
socket = new Socket(opts);
socket.on(Socket.EVENT_OPEN, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.send(binaryData);
socket.send("cash money €€€");
socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
@Override
public void call(Object... args) {
if ("hi".equals(args[0])) return;
if (msg[0] == 0) {
assertThat(args[0], instanceOf(byte[].class));
assertThat((byte[])args[0], is(binaryData));
msg[0]++;
} else {
assertThat(args[0], instanceOf(String.class));
assertThat((String)args[0], is("cash money €€€"));
socket.close();
semaphore.release();
}
}
});
}
});
socket.open();
semaphore.acquire();
}
}

View File

@@ -0,0 +1,99 @@
package com.github.nkzawa.engineio.client;
import com.github.nkzawa.emitter.Emitter;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.concurrent.Semaphore;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@RunWith(JUnit4.class)
public class BinaryWSTest extends Connection {
private Socket socket;
@Test(timeout = TIMEOUT)
public void receiveBinaryData() throws InterruptedException {
final Semaphore semaphore = new Semaphore(0);
final byte[] binaryData = new byte[5];
for (int i = 0; i < binaryData.length; i++) {
binaryData[i] = (byte)i;
}
Socket.Options opts = new Socket.Options();
opts.port = PORT;
socket = new Socket(opts);
socket.on(Socket.EVENT_OPEN, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.on(Socket.EVENT_UPGRADE, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.send(binaryData);
socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
@Override
public void call(Object... args) {
if (args[0] instanceof String) return;
assertThat(args[0], instanceOf(byte[].class));
assertThat((byte[])args[0], is(binaryData));
socket.close();
semaphore.release();
}
});
}
});
}
});
socket.open();
semaphore.acquire();
}
@Test(timeout = TIMEOUT)
public void receiveBinaryDataAndMultiplebyteUTF8String() throws InterruptedException {
final Semaphore semaphore = new Semaphore(0);
final byte[] binaryData = new byte[5];
for (int i = 0; i < binaryData.length; i++) {
binaryData[i] = (byte)i;
}
final int[] msg = new int[] {0};
Socket.Options opts = new Socket.Options();
opts.port = PORT;
socket = new Socket(opts);
socket.on(Socket.EVENT_OPEN, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.on(Socket.EVENT_UPGRADE, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.send(binaryData);
socket.send("cash money €€€");
socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
@Override
public void call(Object... args) {
if ("hi".equals(args[0])) return;
if (msg[0] == 0) {
assertThat(args[0], instanceOf(byte[].class));
assertThat((byte[])args[0], is(binaryData));
msg[0]++;
} else {
assertThat((String)args[0], is("cash money €€€"));
socket.close();
semaphore.release();
}
}
});
}
});
}
});
socket.open();
semaphore.acquire();
}
}

View File

@@ -0,0 +1,73 @@
package com.github.nkzawa.engineio.client;
import org.junit.After;
import org.junit.Before;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.*;
public abstract class Connection {
final static int TIMEOUT = 3000;
final static int PORT = 3000;
private Process serverProcess;
private ExecutorService serverService;
private Future serverOutout;
private Future serverError;
@Before
public void startServer() throws IOException, InterruptedException {
System.out.println("Starting server ...");
final CountDownLatch latch = new CountDownLatch(1);
serverProcess = Runtime.getRuntime().exec(
"node src/test/resources/index.js " + PORT, new String[] {"DEBUG=engine*"});
serverService = Executors.newCachedThreadPool();
serverOutout = serverService.submit(new Runnable() {
@Override
public void run() {
BufferedReader reader = new BufferedReader(
new InputStreamReader(serverProcess.getInputStream()));
String line;
try {
line = reader.readLine();
latch.countDown();
do {
System.out.println("SERVER OUT: " + line);
} while ((line = reader.readLine()) != null);
} catch (IOException e) {
e.printStackTrace();
}
}
});
serverError = serverService.submit(new Runnable() {
@Override
public void run() {
BufferedReader reader = new BufferedReader(
new InputStreamReader(serverProcess.getErrorStream()));
String line;
try {
while ((line = reader.readLine()) != null) {
System.err.println("SERVER ERR: " + line);
}
} catch (IOException e) {
e.printStackTrace();
}
}
});
latch.await(3000, TimeUnit.MILLISECONDS);
}
@After
public void stopServer() throws InterruptedException {
System.out.println("Stopping server ...");
serverProcess.destroy();
serverOutout.cancel(true);
serverError.cancel(true);
serverService.shutdown();
serverService.awaitTermination(3000, TimeUnit.MILLISECONDS);
}
}

View File

@@ -0,0 +1,128 @@
package com.github.nkzawa.engineio.client;
import com.github.nkzawa.emitter.Emitter;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@RunWith(JUnit4.class)
public class ConnectionTest extends Connection {
private Socket socket;
@Test(timeout = TIMEOUT)
public void connectToLocalhost() throws InterruptedException {
final Semaphore semaphore = new Semaphore(0);
Socket.Options opts = new Socket.Options();
opts.port = PORT;
socket = new Socket(opts);
socket.on(Socket.EVENT_OPEN, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
@Override
public void call(Object... args) {
assertThat((String)args[0], is("hi"));
socket.close();
semaphore.release();
}
});
}
});
socket.open();
semaphore.acquire();
}
@Test(timeout = TIMEOUT)
public void receiveMultibyteUTF8StringsWithPolling() throws InterruptedException {
final Semaphore semaphore = new Semaphore(0);
Socket.Options opts = new Socket.Options();
opts.port = PORT;
socket = new Socket(opts);
socket.on(Socket.EVENT_OPEN, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.send("cash money €€€");
socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
@Override
public void call(Object... args) {
if ("hi".equals(args[0])) return;
assertThat((String)args[0], is("cash money €€€"));
socket.close();
semaphore.release();
}
});
}
});
socket.open();
semaphore.acquire();
}
@Test(timeout = TIMEOUT)
public void receiveEmoji() throws InterruptedException {
final Semaphore semaphore = new Semaphore(0);
Socket.Options opts = new Socket.Options();
opts.port = PORT;
socket = new Socket(opts);
socket.on(Socket.EVENT_OPEN, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.send("\uD800-\uDB7F\uDB80-\uDBFF\uDC00-\uDFFF\uE000-\uF8FF");
socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
@Override
public void call(Object... args) {
if ("hi".equals(args[0])) return;
assertThat((String)args[0], is("\uD800-\uDB7F\uDB80-\uDBFF\uDC00-\uDFFF\uE000-\uF8FF"));
socket.close();
semaphore.release();
}
});
}
});
socket.open();
semaphore.acquire();
}
@Test(timeout = TIMEOUT)
public void notSendPacketsIfSocketCloses() throws InterruptedException {
final Semaphore semaphore = new Semaphore(0);
Socket.Options opts = new Socket.Options();
opts.port = PORT;
socket = new Socket(opts);
socket.on(Socket.EVENT_OPEN, new Emitter.Listener() {
@Override
public void call(Object... args) {
final boolean[] noPacket = new boolean[] {true};
socket.on(Socket.EVENT_PACKET_CREATE, new Emitter.Listener() {
@Override
public void call(Object... args) {
noPacket[0] = false;
}
});
socket.close();
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
assertThat(noPacket[0], is(true));
semaphore.release();
}
}, 1200);
}
});
socket.open();
semaphore.acquire();
}
}

View File

@@ -4,18 +4,15 @@ import com.github.nkzawa.emitter.Emitter;
import com.github.nkzawa.engineio.client.transports.Polling; import com.github.nkzawa.engineio.client.transports.Polling;
import com.github.nkzawa.engineio.client.transports.WebSocket; import com.github.nkzawa.engineio.client.transports.WebSocket;
import com.github.nkzawa.thread.EventThread; import com.github.nkzawa.thread.EventThread;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.not;
@@ -25,70 +22,10 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class ServerConnectionTest { public class ServerConnectionTest extends Connection {
final static int TIMEOUT = 3000;
final static int PORT = 3000;
private Process serverProcess;
private ExecutorService serverService;
private Future serverOutout;
private Future serverError;
private Socket socket; private Socket socket;
@Before
public void startServer() throws IOException, InterruptedException {
System.out.println("Starting server ...");
final CountDownLatch latch = new CountDownLatch(1);
serverProcess = Runtime.getRuntime().exec(
"node src/test/resources/index.js " + PORT, new String[] {"DEBUG=engine*"});
serverService = Executors.newCachedThreadPool();
serverOutout = serverService.submit(new Runnable() {
@Override
public void run() {
BufferedReader reader = new BufferedReader(
new InputStreamReader(serverProcess.getInputStream()));
String line;
try {
line = reader.readLine();
latch.countDown();
do {
System.out.println("SERVER OUT: " + line);
} while ((line = reader.readLine()) != null);
} catch (IOException e) {
e.printStackTrace();
}
}
});
serverError = serverService.submit(new Runnable() {
@Override
public void run() {
BufferedReader reader = new BufferedReader(
new InputStreamReader(serverProcess.getErrorStream()));
String line;
try {
while ((line = reader.readLine()) != null) {
System.err.println("SERVER ERR: " + line);
}
} catch (IOException e) {
e.printStackTrace();
}
}
});
latch.await(3000, TimeUnit.MILLISECONDS);
}
@After
public void stopServer() throws InterruptedException {
System.out.println("Stopping server ...");
serverProcess.destroy();
serverOutout.cancel(true);
serverError.cancel(true);
serverService.shutdown();
serverService.awaitTermination(3000, TimeUnit.MILLISECONDS);
}
@Test(timeout = TIMEOUT) @Test(timeout = TIMEOUT)
public void openAndClose() throws URISyntaxException, InterruptedException { public void openAndClose() throws URISyntaxException, InterruptedException {
final BlockingQueue<String> events = new LinkedBlockingQueue<String>(); final BlockingQueue<String> events = new LinkedBlockingQueue<String>();
@@ -120,7 +57,7 @@ public class ServerConnectionTest {
socket.on(Socket.EVENT_OPEN, new Emitter.Listener() { socket.on(Socket.EVENT_OPEN, new Emitter.Listener() {
@Override @Override
public void call(Object... args) { public void call(Object... args) {
socket.send("hi"); socket.send("hello");
} }
}).on(Socket.EVENT_MESSAGE, new Emitter.Listener() { }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
@Override @Override
@@ -130,8 +67,8 @@ public class ServerConnectionTest {
}); });
socket.open(); socket.open();
assertThat(events.take(), is("hello client"));
assertThat(events.take(), is("hi")); assertThat(events.take(), is("hi"));
assertThat(events.take(), is("hello"));
socket.close(); socket.close();
} }
@@ -337,72 +274,4 @@ public class ServerConnectionTest {
}); });
semaphore.acquire(); semaphore.acquire();
} }
@Test(timeout = TIMEOUT)
public void sendAndReceiveBinaryDataWhenPolling() throws InterruptedException {
final Semaphore semaphore = new Semaphore(0);
final byte[] binaryData = new byte[5];
for (int i = 0; i < binaryData.length; i++) {
binaryData[i] = (byte)i;
}
Socket.Options opts = new Socket.Options();
opts.port = PORT;
opts.transports = new String[] {Polling.NAME};
socket = new Socket(opts);
socket.on(Socket.EVENT_OPEN, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.send(binaryData);
}
}).on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
@Override
public void call(Object... args) {
if (args[0] instanceof byte[]) {
assertThat((byte[])args[0], is(binaryData));
socket.close();
semaphore.release();
}
}
});
socket.open();
semaphore.acquire();
}
@Test(timeout = TIMEOUT)
public void sendAndReceiveBinaryDataWhenWS() throws InterruptedException {
final Semaphore semaphore = new Semaphore(0);
final byte[] binaryData = new byte[5];
for (int i = 0; i < binaryData.length; i++) {
binaryData[i] = (byte)i;
}
Socket.Options opts = new Socket.Options();
opts.port = PORT;
socket = new Socket(opts);
socket.on(Socket.EVENT_OPEN, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.on(Socket.EVENT_UPGRADE, new Emitter.Listener() {
@Override
public void call(Object... args) {
socket.send(binaryData);
}
});
}
}).on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
@Override
public void call(Object... args) {
if (args[0] instanceof byte[]) {
assertThat((byte[])args[0], is(binaryData));
socket.close();
semaphore.release();
}
}
});
socket.open();
semaphore.acquire();
}
} }

View File

@@ -1,12 +1,14 @@
var http = require('http').Server();
var engine = require('engine.io'); var engine = require('engine.io');
var server = engine.attach(http, {pingInterval: 500});
var port = parseInt(process.argv[2], 10) || 3000 var port = parseInt(process.argv[2], 10) || 3000
var server = engine.listen(port, function() { http.listen(port, function() {
console.log('Engine.IO server listening on port', port); console.log('Engine.IO server listening on port', port);
}); });
server.on('connection', function(socket) { server.on('connection', function(socket) {
socket.send('hello client'); socket.send('hi');
socket.on('message', function(message) { socket.on('message', function(message) {
socket.send(message); socket.send(message);

View File

@@ -3,6 +3,6 @@
"version": "0.0.0", "version": "0.0.0",
"private": true, "private": true,
"dependencies": { "dependencies": {
"engine.io": "1.1.0" "engine.io": "1.2.2"
} }
} }