use Semaphore instead of BlockingQueue

This commit is contained in:
Naoyuki Kanezawa
2014-07-12 23:11:23 +09:00
parent a928fae69e
commit d2ac57b2c2

View File

@@ -7,8 +7,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
@@ -22,53 +20,52 @@ public class ServerConnectionTest extends Connection {
@Test(timeout = TIMEOUT) @Test(timeout = TIMEOUT)
public void openAndClose() throws URISyntaxException, InterruptedException { public void openAndClose() throws URISyntaxException, InterruptedException {
final BlockingQueue<String> events = new LinkedBlockingQueue<String>(); final Semaphore semaphore = new Semaphore(0);
socket = client(); socket = client();
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override @Override
public void call(Object... objects) { public void call(Object... objects) {
System.out.println("connect:"); socket.disconnect();
events.offer("connect");
} }
}).on(Socket.EVENT_DISCONNECT, new Emitter.Listener() { }).on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
@Override @Override
public void call(Object... objects) { public void call(Object... objects) {
System.out.println("disconnect:"); semaphore.release();
events.offer("disconnect");
} }
}); });
socket.connect(); socket.connect();
semaphore.acquire();
assertThat(events.take(), is("connect"));
socket.disconnect();
assertThat(events.take(), is("disconnect"));
} }
@Test(timeout = TIMEOUT) @Test(timeout = TIMEOUT)
public void message() throws URISyntaxException, InterruptedException { public void message() throws URISyntaxException, InterruptedException {
final BlockingQueue<Object[]> events = new LinkedBlockingQueue<Object[]>(); final Semaphore semaphore = new Semaphore(0);
final int[] count = new int[] {0};
socket = client(); socket = client();
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override @Override
public void call(Object... objects) { public void call(Object... objects) {
System.out.println("connect:");
socket.send("foo", "bar"); socket.send("foo", "bar");
} }
}).on(Socket.EVENT_MESSAGE, new Emitter.Listener() { }).on(Socket.EVENT_MESSAGE, new Emitter.Listener() {
@Override @Override
public void call(Object... objects) { public void call(Object... objects) {
System.out.println(String.format( switch (count[0]++) {
objects.length > 1 ? "message: %s, %s" : "message: %s", objects)); case 0:
events.offer(objects); assertThat(objects, is(new Object[] {"hello client"}));
break;
case 1:
assertThat(objects, is(new Object[] {"foo", "bar"}));
socket.disconnect();
semaphore.release();
break;
}
} }
}); });
socket.connect(); socket.connect();
semaphore.acquire();
assertThat(events.take(), is(new Object[] {"hello client"}));
assertThat(events.take(), is(new Object[] {"foo", "bar"}));
socket.disconnect();
} }
@Test(timeout = TIMEOUT) @Test(timeout = TIMEOUT)
@@ -132,7 +129,7 @@ public class ServerConnectionTest extends Connection {
@Test(timeout = TIMEOUT) @Test(timeout = TIMEOUT)
public void ackWithoutArgs() throws URISyntaxException, InterruptedException { public void ackWithoutArgs() throws URISyntaxException, InterruptedException {
final BlockingQueue<Object[]> events = new LinkedBlockingQueue<Object[]>(); final Semaphore semaphore = new Semaphore(0);
socket = client(); socket = client();
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@@ -141,15 +138,14 @@ public class ServerConnectionTest extends Connection {
socket.emit("ack", null, new Ack() { socket.emit("ack", null, new Ack() {
@Override @Override
public void call(Object... args) { public void call(Object... args) {
System.out.println("ack: " + args); assertThat(args, is(new Object[] {}));
events.offer(args); socket.disconnect();
semaphore.release();
} }
}); });
} }
}); });
socket.connect(); socket.connect();
semaphore.acquire();
assertThat(events.take(), is(new Object[] {}));
socket.disconnect();
} }
} }