support protocol ver 3 and sending byte data

This commit is contained in:
Naoyuki Kanezawa
2014-03-30 18:46:31 +09:00
parent 808ad8c4f1
commit 19c820334d
9 changed files with 456 additions and 117 deletions

View File

@@ -13,7 +13,10 @@ import com.google.gson.Gson;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
@@ -298,7 +301,7 @@ public abstract class Socket extends Emitter {
if (failed[0]) return;
logger.fine(String.format("probe transport '%s' opened", name));
Packet packet = new Packet(Packet.PING, "probe");
Packet<String> packet = new Packet<String>(Packet.PING, "probe");
transport[0].send(new Packet[] {packet});
transport[0].once(Transport.EVENT_PACKET, new Listener() {
@Override
@@ -395,7 +398,7 @@ public abstract class Socket extends Emitter {
this.emit(EVENT_HEARTBEAT);
if (Packet.OPEN.equals(packet.type)) {
this.onHandshake(gson.fromJson(packet.data, HandshakeData.class));
this.onHandshake(gson.fromJson((String)packet.data, HandshakeData.class));
} else if (Packet.PONG.equals(packet.type)) {
this.setPing();
} else if (Packet.ERROR.equals(packet.type)) {
@@ -406,7 +409,11 @@ public abstract class Socket extends Emitter {
} else if (Packet.MESSAGE.equals(packet.type)) {
this.emit(EVENT_DATA, packet.data);
this.emit(EVENT_MESSAGE, packet.data);
this.onmessage(packet.data);
if (packet.data instanceof String) {
this.onmessage((String)packet.data);
} else if (packet.data instanceof byte[]) {
this.onmessage((byte[])packet.data);
}
}
} else {
logger.fine(String.format("packet received with socket readyState '%s'", this.readyState));
@@ -530,6 +537,14 @@ public abstract class Socket extends Emitter {
this.send(msg, fn);
}
public void write(byte[] msg) {
this.write(msg, null);
}
public void write(byte[] msg, Runnable fn) {
this.send(msg, fn);
}
/**
* Sends a message.
*
@@ -539,6 +554,10 @@ public abstract class Socket extends Emitter {
this.send(msg, null);
}
public void send(byte[] msg) {
this.send(msg, null);
}
/**
* Sends a message.
*
@@ -554,17 +573,35 @@ public abstract class Socket extends Emitter {
});
}
public void send(final byte[] msg, final Runnable fn) {
EventThread.exec(new Runnable() {
@Override
public void run() {
Socket.this.sendPacket(Packet.MESSAGE, msg, fn);
}
});
}
private void sendPacket(String type) {
this.sendPacket(type, null, null);
this.sendPacket(new Packet(type), null);
}
private void sendPacket(String type, String data, Runnable fn) {
Packet<String> packet = new Packet<String>(type, data);
sendPacket(packet, fn);
}
private void sendPacket(String type, byte[] data, Runnable fn) {
Packet<byte[]> packet = new Packet<byte[]>(type, data);
sendPacket(packet, fn);
}
private void sendPacket(Packet packet, Runnable fn) {
if (fn == null) {
// ConcurrentLinkedList does not permit `null`.
fn = noop;
}
Packet packet = new Packet(type, data);
this.emit(EVENT_PACKET_CREATE, packet);
this.writeBuffer.offer(packet);
this.callbackBuffer.offer(fn);
@@ -653,6 +690,8 @@ public abstract class Socket extends Emitter {
return filteredUpgrades;
}
public void onmessage(byte[] data) {};
public abstract void onopen();
public abstract void onmessage(String data);