move package name

This commit is contained in:
nkzawa
2015-08-31 03:23:03 +09:00
parent 742e824a49
commit d0039f7910
31 changed files with 60 additions and 60 deletions

View File

@@ -0,0 +1,54 @@
package io.socket.backo;
public class Backoff {
private long ms = 100;
private long max = 10000;
private int factor = 2;
private double jitter = 0.0;
private int attempts = 0;
public Backoff() {}
public long duration() {
long ms = this.ms * (long) Math.pow(this.factor, this.attempts++);
if (jitter != 0.0) {
double rand = Math.random();
int deviation = (int) Math.floor(rand * this.jitter * ms);
ms = (((int) Math.floor(rand * 10)) & 1) == 0 ? ms - deviation : ms + deviation;
}
if (ms < this.ms) {
// overflow happened
ms = Long.MAX_VALUE;
}
return Math.min(ms, this.max);
}
public void reset() {
this.attempts = 0;
}
public Backoff setMin(long min) {
this.ms = min;
return this;
}
public Backoff setMax(long max) {
this.max = max;
return this;
}
public Backoff setFactor(int factor) {
this.factor = factor;
return this;
}
public Backoff setJitter(double jitter) {
this.jitter = jitter;
return this;
}
public int getAttempts() {
return this.attempts;
}
}

View File

@@ -0,0 +1,11 @@
package io.socket.client;
/**
* Acknowledgement.
*/
public interface Ack {
public void call(Object... args);
}

View File

@@ -0,0 +1,94 @@
package io.socket.client;
import io.socket.parser.Parser;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
public class IO {
private static final Logger logger = Logger.getLogger(IO.class.getName());
private static final ConcurrentHashMap<String, Manager> managers = new ConcurrentHashMap<String, Manager>();
/**
* Protocol version.
*/
public static int protocol = Parser.protocol;
public static void setDefaultSSLContext(SSLContext sslContext) {
Manager.defaultSSLContext = sslContext;
}
public static void setDefaultHostnameVerifier(HostnameVerifier hostnameVerifier) {
Manager.defaultHostnameVerifier = hostnameVerifier;
}
private IO() {}
public static Socket socket(String uri) throws URISyntaxException {
return socket(uri, null);
}
public static Socket socket(String uri, Options opts) throws URISyntaxException {
return socket(new URI(uri), opts);
}
public static Socket socket(URI uri) {
return socket(uri, null);
}
/**
* Initializes a {@link Socket} from an existing {@link Manager} for multiplexing.
*
* @param uri uri to connect.
* @param opts options for socket.
* @return {@link Socket} instance.
*/
public static Socket socket(URI uri, Options opts) {
if (opts == null) {
opts = new Options();
}
URL parsed = Url.parse(uri);
URI source;
try {
source = parsed.toURI();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
Manager io;
if (opts.forceNew || !opts.multiplex) {
logger.fine(String.format("ignoring socket cache for %s", source));
io = new Manager(source, opts);
} else {
String id = Url.extractId(parsed);
if (!managers.containsKey(id)) {
logger.fine(String.format("new io instance for %s", source));
managers.putIfAbsent(id, new Manager(source, opts));
}
io = managers.get(id);
}
return io.socket(parsed.getPath());
}
public static class Options extends Manager.Options {
public boolean forceNew;
/**
* Whether to enable multiplexing. Default is true.
*/
public boolean multiplex = true;
}
}

View File

@@ -0,0 +1,581 @@
package io.socket.client;
import io.socket.backo.Backoff;
import com.github.nkzawa.emitter.Emitter;
import io.socket.parser.Packet;
import io.socket.parser.Parser;
import com.github.nkzawa.thread.EventThread;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import java.net.URI;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Manager class represents a connection to a given Socket.IO server.
*/
public class Manager extends Emitter {
private static final Logger logger = Logger.getLogger(Manager.class.getName());
/*package*/ enum ReadyState {
CLOSED, OPENING, OPEN
}
/**
* Called on a successful connection.
*/
public static final String EVENT_OPEN = "open";
/**
* Called on a disconnection.
*/
public static final String EVENT_CLOSE = "close";
public static final String EVENT_PACKET = "packet";
public static final String EVENT_ERROR = "error";
/**
* Called on a connection error.
*/
public static final String EVENT_CONNECT_ERROR = "connect_error";
/**
* Called on a connection timeout.
*/
public static final String EVENT_CONNECT_TIMEOUT = "connect_timeout";
/**
* Called on a successful reconnection.
*/
public static final String EVENT_RECONNECT = "reconnect";
/**
* Called on a reconnection attempt error.
*/
public static final String EVENT_RECONNECT_ERROR = "reconnect_error";
public static final String EVENT_RECONNECT_FAILED = "reconnect_failed";
public static final String EVENT_RECONNECT_ATTEMPT = "reconnect_attempt";
public static final String EVENT_RECONNECTING = "reconnecting";
/**
* Called when a new transport is created. (experimental)
*/
public static final String EVENT_TRANSPORT = Engine.EVENT_TRANSPORT;
/*package*/ static SSLContext defaultSSLContext;
/*package*/ static HostnameVerifier defaultHostnameVerifier;
/*package*/ ReadyState readyState = null;
private boolean _reconnection;
private boolean skipReconnect;
private boolean reconnecting;
private boolean encoding;
private int _reconnectionAttempts;
private long _reconnectionDelay;
private long _reconnectionDelayMax;
private double _randomizationFactor;
private Backoff backoff;
private long _timeout;
private Set<Socket> connected;
private URI uri;
private List<Packet> packetBuffer;
private Queue<On.Handle> subs;
private Options opts;
/*package*/ com.github.nkzawa.engineio.client.Socket engine;
private Parser.Encoder encoder;
private Parser.Decoder decoder;
/**
* This HashMap can be accessed from outside of EventThread.
*/
private ConcurrentHashMap<String, Socket> nsps;
public Manager() {
this(null, null);
}
public Manager(URI uri) {
this(uri, null);
}
public Manager(Options opts) {
this(null, opts);
}
public Manager(URI uri, Options opts) {
if (opts == null) {
opts = new Options();
}
if (opts.path == null) {
opts.path = "/socket.io";
}
if (opts.sslContext == null) {
opts.sslContext = defaultSSLContext;
}
if (opts.hostnameVerifier == null) {
opts.hostnameVerifier = defaultHostnameVerifier;
}
this.opts = opts;
this.nsps = new ConcurrentHashMap<String, Socket>();
this.subs = new LinkedList<On.Handle>();
this.reconnection(opts.reconnection);
this.reconnectionAttempts(opts.reconnectionAttempts != 0 ? opts.reconnectionAttempts : Integer.MAX_VALUE);
this.reconnectionDelay(opts.reconnectionDelay != 0 ? opts.reconnectionDelay : 1000);
this.reconnectionDelayMax(opts.reconnectionDelayMax != 0 ? opts.reconnectionDelayMax : 5000);
this.randomizationFactor(opts.randomizationFactor != 0.0 ? opts.randomizationFactor : 0.5);
this.backoff = new Backoff()
.setMin(this.reconnectionDelay())
.setMax(this.reconnectionDelayMax())
.setJitter(this.randomizationFactor());
this.timeout(opts.timeout);
this.readyState = ReadyState.CLOSED;
this.uri = uri;
this.connected = new HashSet<Socket>();
this.encoding = false;
this.packetBuffer = new ArrayList<Packet>();
this.encoder = new Parser.Encoder();
this.decoder = new Parser.Decoder();
}
private void emitAll(String event, Object... args) {
this.emit(event, args);
for (Socket socket : this.nsps.values()) {
socket.emit(event, args);
}
}
/**
* Update `socket.id` of all sockets
*/
private void updateSocketIds() {
for (Socket socket : this.nsps.values()) {
socket.id = this.engine.id();
}
}
public boolean reconnection() {
return this._reconnection;
}
public Manager reconnection(boolean v) {
this._reconnection = v;
return this;
}
public int reconnectionAttempts() {
return this._reconnectionAttempts;
}
public Manager reconnectionAttempts(int v) {
this._reconnectionAttempts = v;
return this;
}
public long reconnectionDelay() {
return this._reconnectionDelay;
}
public Manager reconnectionDelay(long v) {
this._reconnectionDelay = v;
if (this.backoff != null) {
this.backoff.setMin(v);
}
return this;
}
public double randomizationFactor() {
return this._randomizationFactor;
}
public Manager randomizationFactor(double v) {
this._randomizationFactor = v;
if (this.backoff != null) {
this.backoff.setJitter(v);
}
return this;
}
public long reconnectionDelayMax() {
return this._reconnectionDelayMax;
}
public Manager reconnectionDelayMax(long v) {
this._reconnectionDelayMax = v;
if (this.backoff != null) {
this.backoff.setMax(v);
}
return this;
}
public long timeout() {
return this._timeout;
}
public Manager timeout(long v) {
this._timeout = v;
return this;
}
private void maybeReconnectOnOpen() {
// Only try to reconnect if it's the first time we're connecting
if (!this.reconnecting && this._reconnection && this.backoff.getAttempts() == 0) {
this.reconnect();
}
}
public Manager open(){
return open(null);
}
/**
* Connects the client.
*
* @param fn callback.
* @return a reference to this object.
*/
public Manager open(final OpenCallback fn) {
EventThread.exec(new Runnable() {
@Override
public void run() {
logger.fine(String.format("readyState %s", Manager.this.readyState));
if (Manager.this.readyState == ReadyState.OPEN || Manager.this.readyState == ReadyState.OPENING) return;
logger.fine(String.format("opening %s", Manager.this.uri));
Manager.this.engine = new Engine(Manager.this.uri, Manager.this.opts);
final com.github.nkzawa.engineio.client.Socket socket = Manager.this.engine;
final Manager self = Manager.this;
Manager.this.readyState = ReadyState.OPENING;
Manager.this.skipReconnect = false;
// propagate transport event.
socket.on(Engine.EVENT_TRANSPORT, new Listener() {
@Override
public void call(Object... args) {
self.emit(Manager.EVENT_TRANSPORT, args);
}
});
final On.Handle openSub = On.on(socket, Engine.EVENT_OPEN, new Listener() {
@Override
public void call(Object... objects) {
self.onopen();
if (fn != null) fn.call(null);
}
});
On.Handle errorSub = On.on(socket, Engine.EVENT_ERROR, new Listener() {
@Override
public void call(Object... objects) {
Object data = objects.length > 0 ? objects[0] : null;
logger.fine("connect_error");
self.cleanup();
self.readyState = ReadyState.CLOSED;
self.emitAll(EVENT_CONNECT_ERROR, data);
if (fn != null) {
Exception err = new SocketIOException("Connection error",
data instanceof Exception ? (Exception) data : null);
fn.call(err);
} else {
// Only do this if there is no fn to handle the error
self.maybeReconnectOnOpen();
}
}
});
if (Manager.this._timeout >= 0) {
final long timeout = Manager.this._timeout;
logger.fine(String.format("connection attempt will timeout after %d", timeout));
final Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
EventThread.exec(new Runnable() {
@Override
public void run() {
logger.fine(String.format("connect attempt timed out after %d", timeout));
openSub.destroy();
socket.close();
socket.emit(Engine.EVENT_ERROR, new SocketIOException("timeout"));
self.emitAll(EVENT_CONNECT_TIMEOUT, timeout);
}
});
}
}, timeout);
Manager.this.subs.add(new On.Handle() {
@Override
public void destroy() {
timer.cancel();
}
});
}
Manager.this.subs.add(openSub);
Manager.this.subs.add(errorSub);
Manager.this.engine.open();
}
});
return this;
}
private void onopen() {
logger.fine("open");
this.cleanup();
this.readyState = ReadyState.OPEN;
this.emit(EVENT_OPEN);
final com.github.nkzawa.engineio.client.Socket socket = this.engine;
this.subs.add(On.on(socket, Engine.EVENT_DATA, new Listener() {
@Override
public void call(Object... objects) {
Object data = objects[0];
if (data instanceof String) {
Manager.this.ondata((String)data);
} else if (data instanceof byte[]) {
Manager.this.ondata((byte[])data);
}
}
}));
this.subs.add(On.on(this.decoder, Parser.Decoder.EVENT_DECODED, new Listener() {
@Override
public void call(Object... objects) {
Manager.this.ondecoded((Packet) objects[0]);
}
}));
this.subs.add(On.on(socket, Engine.EVENT_ERROR, new Listener() {
@Override
public void call(Object... objects) {
Manager.this.onerror((Exception)objects[0]);
}
}));
this.subs.add(On.on(socket, Engine.EVENT_CLOSE, new Listener() {
@Override
public void call(Object... objects) {
Manager.this.onclose((String)objects[0]);
}
}));
}
private void ondata(String data) {
this.decoder.add(data);
}
private void ondata(byte[] data) {
this.decoder.add(data);
}
private void ondecoded(Packet packet) {
this.emit(EVENT_PACKET, packet);
}
private void onerror(Exception err) {
logger.log(Level.FINE, "error", err);
this.emitAll(EVENT_ERROR, err);
}
/**
* Initializes {@link Socket} instances for each namespaces.
*
* @param nsp namespace.
* @return a socket instance for the namespace.
*/
public Socket socket(String nsp) {
Socket socket = this.nsps.get(nsp);
if (socket == null) {
socket = new Socket(this, nsp);
Socket _socket = this.nsps.putIfAbsent(nsp, socket);
if (_socket != null) {
socket = _socket;
} else {
final Manager self = this;
final Socket s = socket;
socket.on(Socket.EVENT_CONNECT, new Listener() {
@Override
public void call(Object... objects) {
s.id = self.engine.id();
self.connected.add(s);
}
});
}
}
return socket;
}
/*package*/ void destroy(Socket socket) {
this.connected.remove(socket);
if (this.connected.size() > 0) return;
this.close();
}
/*package*/ void packet(Packet packet) {
logger.fine(String.format("writing packet %s", packet));
final Manager self = this;
if (!self.encoding) {
self.encoding = true;
this.encoder.encode(packet, new Parser.Encoder.Callback() {
@Override
public void call(Object[] encodedPackets) {
for (Object packet : encodedPackets) {
if (packet instanceof String) {
self.engine.write((String)packet);
} else if (packet instanceof byte[]) {
self.engine.write((byte[])packet);
}
}
self.encoding = false;
self.processPacketQueue();
}
});
} else {
self.packetBuffer.add(packet);
}
}
private void processPacketQueue() {
if (this.packetBuffer.size() > 0 && !this.encoding) {
Packet pack = this.packetBuffer.remove(0);
this.packet(pack);
}
}
private void cleanup() {
On.Handle sub;
while ((sub = this.subs.poll()) != null) sub.destroy();
}
/*package*/ void close() {
if (this.readyState != ReadyState.OPEN) {
this.cleanup();
}
this.skipReconnect = true;
this.backoff.reset();
this.readyState = ReadyState.CLOSED;
if (this.engine != null) {
this.engine.close();
}
}
private void onclose(String reason) {
logger.fine("close");
this.cleanup();
this.backoff.reset();
this.readyState = ReadyState.CLOSED;
this.emit(EVENT_CLOSE, reason);
if (this._reconnection && !this.skipReconnect) {
this.reconnect();
}
}
private void reconnect() {
if (this.reconnecting || this.skipReconnect) return;
final Manager self = this;
if (this.backoff.getAttempts() >= this._reconnectionAttempts) {
logger.fine("reconnect failed");
this.backoff.reset();
this.emitAll(EVENT_RECONNECT_FAILED);
this.reconnecting = false;
} else {
long delay = this.backoff.duration();
logger.fine(String.format("will wait %dms before reconnect attempt", delay));
this.reconnecting = true;
final Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
EventThread.exec(new Runnable() {
@Override
public void run() {
if (self.skipReconnect) return;
logger.fine("attempting reconnect");
int attempts = self.backoff.getAttempts();
self.emitAll(EVENT_RECONNECT_ATTEMPT, attempts);
self.emitAll(EVENT_RECONNECTING, attempts);
// check again for the case socket closed in above events
if (self.skipReconnect) return;
self.open(new OpenCallback() {
@Override
public void call(Exception err) {
if (err != null) {
logger.fine("reconnect attempt error");
self.reconnecting = false;
self.reconnect();
self.emitAll(EVENT_RECONNECT_ERROR, err);
} else {
logger.fine("reconnect success");
self.onreconnect();
}
}
});
}
});
}
}, delay);
this.subs.add(new On.Handle() {
@Override
public void destroy() {
timer.cancel();
}
});
}
}
private void onreconnect() {
int attempts = this.backoff.getAttempts();
this.reconnecting = false;
this.backoff.reset();
this.updateSocketIds();
this.emitAll(EVENT_RECONNECT, attempts);
}
public static interface OpenCallback {
public void call(Exception err);
}
private static class Engine extends com.github.nkzawa.engineio.client.Socket {
Engine(URI uri, Options opts) {
super(uri, opts);
}
}
public static class Options extends com.github.nkzawa.engineio.client.Socket.Options {
public boolean reconnection = true;
public int reconnectionAttempts;
public long reconnectionDelay;
public long reconnectionDelayMax;
public double randomizationFactor;
/**
* Connection timeout (ms). Set -1 to disable.
*/
public long timeout = 20000;
}
}

View File

@@ -0,0 +1,23 @@
package io.socket.client;
import com.github.nkzawa.emitter.Emitter;
public class On {
private On() {}
public static Handle on(final Emitter obj, final String ev, final Emitter.Listener fn) {
obj.on(ev, fn);
return new Handle() {
@Override
public void destroy() {
obj.off(ev, fn);
}
};
}
public static interface Handle {
public void destroy();
}
}

View File

@@ -0,0 +1,462 @@
package io.socket.client;
import com.github.nkzawa.emitter.Emitter;
import io.socket.hasbinary.HasBinary;
import io.socket.parser.Packet;
import io.socket.parser.Parser;
import com.github.nkzawa.thread.EventThread;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import java.util.*;
import java.util.logging.Logger;
/**
* The socket class for Socket.IO Client.
*/
public class Socket extends Emitter {
private static final Logger logger = Logger.getLogger(Socket.class.getName());
/**
* Called on a connection.
*/
public static final String EVENT_CONNECT = "connect";
/**
* Called on a disconnection.
*/
public static final String EVENT_DISCONNECT = "disconnect";
/**
* Called on a connection error.
*
* <p>Parameters:</p>
* <ul>
* <li>(Exception) error data.</li>
* </ul>
*/
public static final String EVENT_ERROR = "error";
public static final String EVENT_MESSAGE = "message";
public static final String EVENT_CONNECT_ERROR = Manager.EVENT_CONNECT_ERROR;
public static final String EVENT_CONNECT_TIMEOUT = Manager.EVENT_CONNECT_TIMEOUT;
public static final String EVENT_RECONNECT = Manager.EVENT_RECONNECT;
public static final String EVENT_RECONNECT_ERROR = Manager.EVENT_RECONNECT_ERROR;
public static final String EVENT_RECONNECT_FAILED = Manager.EVENT_RECONNECT_FAILED;
public static final String EVENT_RECONNECT_ATTEMPT = Manager.EVENT_RECONNECT_ATTEMPT;
public static final String EVENT_RECONNECTING = Manager.EVENT_RECONNECTING;
protected static Map<String, Integer> events = new HashMap<String, Integer>() {{
put(EVENT_CONNECT, 1);
put(EVENT_CONNECT_ERROR, 1);
put(EVENT_CONNECT_TIMEOUT, 1);
put(EVENT_DISCONNECT, 1);
put(EVENT_ERROR, 1);
put(EVENT_RECONNECT, 1);
put(EVENT_RECONNECT_ATTEMPT, 1);
put(EVENT_RECONNECT_FAILED, 1);
put(EVENT_RECONNECT_ERROR, 1);
put(EVENT_RECONNECTING, 1);
}};
/*package*/ String id;
private volatile boolean connected;
private int ids;
private String nsp;
private Manager io;
private Map<Integer, Ack> acks = new HashMap<Integer, Ack>();
private Queue<On.Handle> subs;
private final Queue<List<Object>> receiveBuffer = new LinkedList<List<Object>>();
private final Queue<Packet<JSONArray>> sendBuffer = new LinkedList<Packet<JSONArray>>();
public Socket(Manager io, String nsp) {
this.io = io;
this.nsp = nsp;
}
private void subEvents() {
if (this.subs != null) return;
final Manager io = Socket.this.io;
Socket.this.subs = new LinkedList<On.Handle>() {{
add(On.on(io, Manager.EVENT_OPEN, new Listener() {
@Override
public void call(Object... args) {
Socket.this.onopen();
}
}));
add(On.on(io, Manager.EVENT_PACKET, new Listener() {
@Override
public void call(Object... args) {
Socket.this.onpacket((Packet) args[0]);
}
}));
add(On.on(io, Manager.EVENT_CLOSE, new Listener() {
@Override
public void call(Object... args) {
Socket.this.onclose(args.length > 0 ? (String) args[0] : null);
}
}));
}};
}
/**
* Connects the socket.
*/
public Socket open() {
EventThread.exec(new Runnable() {
@Override
public void run() {
if (Socket.this.connected) return;
Socket.this.subEvents();
Socket.this.io.open(); // ensure open
if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen();
}
});
return this;
}
/**
* Connects the socket.
*/
public Socket connect() {
return this.open();
}
/**
* Send messages.
*
* @param args data to send.
* @return a reference to this object.
*/
public Socket send(final Object... args) {
EventThread.exec(new Runnable() {
@Override
public void run() {
Socket.this.emit(EVENT_MESSAGE, args);
}
});
return this;
}
/**
* Emits an event. When you pass {@link Ack} at the last argument, then the acknowledge is done.
*
* @param event an event name.
* @param args data to send.
* @return a reference to this object.
*/
@Override
public Emitter emit(final String event, final Object... args) {
EventThread.exec(new Runnable() {
@Override
public void run() {
if (events.containsKey(event)) {
Socket.super.emit(event, args);
return;
}
List<Object> _args = new ArrayList<Object>(args.length + 1);
_args.add(event);
_args.addAll(Arrays.asList(args));
JSONArray jsonArgs = new JSONArray();
for (Object arg : _args) {
jsonArgs.put(arg);
}
int parserType = HasBinary.hasBinary(jsonArgs) ? Parser.BINARY_EVENT : Parser.EVENT;
Packet<JSONArray> packet = new Packet<JSONArray>(parserType, jsonArgs);
if (_args.get(_args.size() - 1) instanceof Ack) {
logger.fine(String.format("emitting packet with ack id %d", Socket.this.ids));
Socket.this.acks.put(Socket.this.ids, (Ack)_args.remove(_args.size() - 1));
jsonArgs = remove(jsonArgs, jsonArgs.length() - 1);
packet.data = jsonArgs;
packet.id = Socket.this.ids++;
}
if (Socket.this.connected) {
Socket.this.packet(packet);
} else {
Socket.this.sendBuffer.add(packet);
}
}
});
return this;
}
private static JSONArray remove(JSONArray a, int pos) {
JSONArray na = new JSONArray();
for (int i = 0; i < a.length(); i++){
if (i != pos) {
Object v;
try {
v = a.get(i);
} catch (JSONException e) {
v = null;
}
na.put(v);
}
}
return na;
}
/**
* Emits an event with an acknowledge.
*
* @param event an event name
* @param args data to send.
* @param ack the acknowledgement to be called
* @return a reference to this object.
*/
public Emitter emit(final String event, final Object[] args, final Ack ack) {
EventThread.exec(new Runnable() {
@Override
public void run() {
List<Object> _args = new ArrayList<Object>() {{
add(event);
if (args != null) {
addAll(Arrays.asList(args));
}
}};
JSONArray jsonArgs = new JSONArray();
for (Object _arg : _args) {
jsonArgs.put(_arg);
}
int parserType = HasBinary.hasBinary(jsonArgs) ? Parser.BINARY_EVENT : Parser.EVENT;
Packet<JSONArray> packet = new Packet<JSONArray>(parserType, jsonArgs);
logger.fine(String.format("emitting packet with ack id %d", ids));
Socket.this.acks.put(ids, ack);
packet.id = ids++;
Socket.this.packet(packet);
}
});
return this;
}
private void packet(Packet packet) {
packet.nsp = this.nsp;
this.io.packet(packet);
}
private void onopen() {
logger.fine("transport is open - connecting");
if (!"/".equals(this.nsp)) {
this.packet(new Packet(Parser.CONNECT));
}
}
private void onclose(String reason) {
logger.fine(String.format("close (%s)", reason));
this.connected = false;
this.id = null;
this.emit(EVENT_DISCONNECT, reason);
}
private void onpacket(Packet packet) {
if (!this.nsp.equals(packet.nsp)) return;
switch (packet.type) {
case Parser.CONNECT:
this.onconnect();
break;
case Parser.EVENT:
this.onevent(packet);
break;
case Parser.BINARY_EVENT:
this.onevent(packet);
break;
case Parser.ACK:
this.onack(packet);
break;
case Parser.BINARY_ACK:
this.onack(packet);
break;
case Parser.DISCONNECT:
this.ondisconnect();
break;
case Parser.ERROR:
this.emit(EVENT_ERROR, packet.data);
break;
}
}
private void onevent(Packet<JSONArray> packet) {
List<Object> args = new ArrayList<Object>(Arrays.asList(toArray(packet.data)));
logger.fine(String.format("emitting event %s", args));
if (packet.id >= 0) {
logger.fine("attaching ack callback to event");
args.add(this.ack(packet.id));
}
if (this.connected) {
if (args.size() == 0) return;
String event = args.remove(0).toString();
super.emit(event, args.toArray());
} else {
this.receiveBuffer.add(args);
}
}
private Ack ack(final int id) {
final Socket self = this;
final boolean[] sent = new boolean[] {false};
return new Ack() {
@Override
public void call(final Object... args) {
EventThread.exec(new Runnable() {
@Override
public void run() {
if (sent[0]) return;
sent[0] = true;
logger.fine(String.format("sending ack %s", args.length != 0 ? args : null));
int type = HasBinary.hasBinary(args) ? Parser.BINARY_ACK : Parser.ACK;
Packet<JSONArray> packet = new Packet<JSONArray>(type, new JSONArray(Arrays.asList(args)));
packet.id = id;
self.packet(packet);
}
});
}
};
}
private void onack(Packet<JSONArray> packet) {
Ack fn = this.acks.remove(packet.id);
if (fn != null) {
logger.fine(String.format("calling ack %s with %s", packet.id, packet.data));
fn.call(toArray(packet.data));
} else {
logger.fine(String.format("bad ack %s", packet.id));
}
}
private void onconnect() {
this.connected = true;
this.emit(EVENT_CONNECT);
this.emitBuffered();
}
private void emitBuffered() {
List<Object> data;
while ((data = this.receiveBuffer.poll()) != null) {
String event = (String)data.get(0);
super.emit(event, data.toArray());
}
this.receiveBuffer.clear();
Packet<JSONArray> packet;
while ((packet = this.sendBuffer.poll()) != null) {
this.packet(packet);
}
this.sendBuffer.clear();
}
private void ondisconnect() {
logger.fine(String.format("server disconnect (%s)", this.nsp));
this.destroy();
this.onclose("io server disconnect");
}
private void destroy() {
if (this.subs != null) {
// clean subscriptions to avoid reconnection
for (On.Handle sub : this.subs) {
sub.destroy();
}
this.subs = null;
}
this.io.destroy(this);
}
/**
* Disconnects the socket.
*
* @return a reference to this object.
*/
public Socket close() {
EventThread.exec(new Runnable() {
@Override
public void run() {
if (Socket.this.connected) {
logger.fine(String.format("performing disconnect (%s)", Socket.this.nsp));
Socket.this.packet(new Packet(Parser.DISCONNECT));
}
Socket.this.destroy();
if (Socket.this.connected) {
Socket.this.onclose("io client disconnect");
}
}
});
return this;
}
/**
* Disconnects the socket.
*
* @return a reference to this object.
*/
public Socket disconnect() {
return this.close();
}
public Manager io() {
return this.io;
}
public boolean connected() {
return this.connected;
}
/**
* A property on the socket instance that is equal to the underlying engine.io socket id.
*
* The value is present once the socket has connected, is removed when the socket disconnects and is updated if the socket reconnects.
*
* @return a socket id
*/
public String id() {
return this.id;
}
private static Object[] toArray(JSONArray array) {
int length = array.length();
Object[] data = new Object[length];
for (int i = 0; i < length; i++) {
Object v;
try {
v = array.get(i);
} catch (JSONException e) {
v = null;
}
data[i] = v == JSONObject.NULL ? null : v;
}
return data;
}
}

View File

@@ -0,0 +1,20 @@
package io.socket.client;
public class SocketIOException extends Exception {
public SocketIOException() {
super();
}
public SocketIOException(String message) {
super(message);
}
public SocketIOException(String message, Throwable cause) {
super(message, cause);
}
public SocketIOException(Throwable cause) {
super(cause);
}
}

View File

@@ -0,0 +1,73 @@
package io.socket.client;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.regex.Pattern;
public class Url {
private static Pattern PATTERN_HTTP = Pattern.compile("^http|ws$");
private static Pattern PATTERN_HTTPS = Pattern.compile("^(http|ws)s$");
private Url() {}
public static URL parse(String uri) throws URISyntaxException {
return parse(new URI(uri));
}
public static URL parse(URI uri) {
String protocol = uri.getScheme();
if (protocol == null || !protocol.matches("^https?|wss?$")) {
protocol = "https";
}
int port = uri.getPort();
if (port == -1) {
if (PATTERN_HTTP.matcher(protocol).matches()) {
port = 80;
} else if (PATTERN_HTTPS.matcher(protocol).matches()) {
port = 443;
}
}
String path = uri.getRawPath();
if (path == null || path.length() == 0) {
path = "/";
}
String userInfo = uri.getRawUserInfo();
String query = uri.getRawQuery();
String fragment = uri.getRawFragment();
try {
return new URL(protocol + "://"
+ (userInfo != null ? userInfo + "@" : "")
+ uri.getHost()
+ (port != -1 ? ":" + port : "")
+ path
+ (query != null ? "?" + query : "")
+ (fragment != null ? "#" + fragment : ""));
} catch (MalformedURLException e) {
throw new RuntimeException(e);
}
}
public static String extractId(String url) throws MalformedURLException {
return extractId(new URL(url));
}
public static String extractId(URL url) {
String protocol = url.getProtocol();
int port = url.getPort();
if (port == -1) {
if (PATTERN_HTTP.matcher(protocol).matches()) {
port = 80;
} else if (PATTERN_HTTPS.matcher(protocol).matches()) {
port = 443;
}
}
return protocol + "://" + url.getHost() + ":" + port;
}
}

View File

@@ -0,0 +1,57 @@
package io.socket.hasbinary;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import java.util.Iterator;
public class HasBinary {
private HasBinary() {}
public static boolean hasBinary(Object data) {
return _hasBinary(data);
}
private static boolean _hasBinary(Object obj) {
if (obj == null) return false;
if (obj instanceof byte[]) {
return true;
}
if (obj instanceof JSONArray) {
JSONArray _obj = (JSONArray)obj;
int length = _obj.length();
for (int i = 0; i < length; i++) {
Object v;
try {
v = _obj.isNull(i) ? null : _obj.get(i);
} catch (JSONException e) {
return false;
}
if (_hasBinary(v)) {
return true;
}
}
} else if (obj instanceof JSONObject) {
JSONObject _obj = (JSONObject)obj;
Iterator keys = _obj.keys();
while (keys.hasNext()) {
String key = (String)keys.next();
Object v;
try {
v = _obj.get(key);
} catch (JSONException e) {
return false;
}
if (_hasBinary(v)) {
return true;
}
}
}
return false;
}
}

View File

@@ -0,0 +1,117 @@
package io.socket.parser;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class Binary {
private static final String KEY_PLACEHOLDER = "_placeholder";
private static final String KEY_NUM = "num";
public static DeconstructedPacket deconstructPacket(Packet packet) {
List<byte[]> buffers = new ArrayList<byte[]>();
packet.data = _deconstructPacket(packet.data, buffers);
packet.attachments = buffers.size();
DeconstructedPacket result = new DeconstructedPacket();
result.packet = packet;
result.buffers = buffers.toArray(new byte[buffers.size()][]);
return result;
}
private static Object _deconstructPacket(Object data, List<byte[]> buffers) {
if (data == null) return null;
if (data instanceof byte[]) {
JSONObject placeholder = new JSONObject();
try {
placeholder.put(KEY_PLACEHOLDER, true);
placeholder.put(KEY_NUM, buffers.size());
} catch (JSONException e) {
return null;
}
buffers.add((byte[])data);
return placeholder;
} else if (data instanceof JSONArray) {
JSONArray newData = new JSONArray();
JSONArray _data = (JSONArray)data;
int len = _data.length();
for (int i = 0; i < len; i ++) {
try {
newData.put(i, _deconstructPacket(_data.get(i), buffers));
} catch (JSONException e) {
return null;
}
}
return newData;
} else if (data instanceof JSONObject) {
JSONObject newData = new JSONObject();
JSONObject _data = (JSONObject)data;
Iterator<?> iterator = _data.keys();
while (iterator.hasNext()) {
String key = (String)iterator.next();
try {
newData.put(key, _deconstructPacket(_data.get(key), buffers));
} catch (JSONException e) {
return null;
}
}
return newData;
}
return data;
}
public static Packet reconstructPacket(Packet packet, byte[][] buffers) {
packet.data = _reconstructPacket(packet.data, buffers);
packet.attachments = -1;
return packet;
}
private static Object _reconstructPacket(Object data, byte[][] buffers) {
if (data instanceof JSONArray) {
JSONArray _data = (JSONArray)data;
int len = _data.length();
for (int i = 0; i < len; i ++) {
try {
_data.put(i, _reconstructPacket(_data.get(i), buffers));
} catch (JSONException e) {
return null;
}
}
return _data;
} else if (data instanceof JSONObject) {
JSONObject _data = (JSONObject)data;
if (_data.optBoolean(KEY_PLACEHOLDER)) {
int num = _data.optInt(KEY_NUM, -1);
return num >= 0 && num < buffers.length ? buffers[num] : null;
}
Iterator<?> iterator = _data.keys();
while (iterator.hasNext()) {
String key = (String)iterator.next();
try {
_data.put(key, _reconstructPacket(_data.get(key), buffers));
} catch (JSONException e) {
return null;
}
}
return _data;
}
return data;
}
public static class DeconstructedPacket {
public Packet packet;
public byte[][] buffers;
}
}

View File

@@ -0,0 +1,22 @@
package io.socket.parser;
public class Packet<T> {
public int type = -1;
public int id = -1;
public String nsp;
public T data;
public int attachments;
public Packet() {}
public Packet(int type) {
this.type = type;
}
public Packet(int type, T data) {
this.type = type;
this.data = data;
}
}

View File

@@ -0,0 +1,275 @@
package io.socket.parser;
import com.github.nkzawa.emitter.Emitter;
import org.json.JSONException;
import org.json.JSONTokener;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.logging.Logger;
public class Parser {
private static final Logger logger = Logger.getLogger(Parser.class.getName());
/**
* Packet type `connect`.
*/
public static final int CONNECT = 0;
/**
* Packet type `disconnect`.
*/
public static final int DISCONNECT = 1;
/**
* Packet type `event`.
*/
public static final int EVENT = 2;
/**
* Packet type `ack`.
*/
public static final int ACK = 3;
/**
* Packet type `error`.
*/
public static final int ERROR = 4;
/**
* Packet type `binary event`.
*/
public static final int BINARY_EVENT = 5;
/**
* Packet type `binary ack`.
*/
public static final int BINARY_ACK = 6;
public static int protocol = 4;
/**
* Packet types.
*/
public static String[] types = new String[] {
"CONNECT",
"DISCONNECT",
"EVENT",
"ACK",
"ERROR",
"BINARY_EVENT",
"BINARY_ACK"
};
private Parser() {}
private static Packet<String> error() {
return new Packet<String>(ERROR, "parser error");
}
public static class Encoder {
public Encoder() {}
public void encode(Packet obj, Callback callback) {
logger.fine(String.format("encoding packet %s", obj));
if (BINARY_EVENT == obj.type || BINARY_ACK == obj.type) {
encodeAsBinary(obj, callback);
} else {
String encoding = encodeAsString(obj);
callback.call(new String[] {encoding});
}
}
private String encodeAsString(Packet obj) {
StringBuilder str = new StringBuilder();
boolean nsp = false;
str.append(obj.type);
if (BINARY_EVENT == obj.type || BINARY_ACK == obj.type) {
str.append(obj.attachments);
str.append("-");
}
if (obj.nsp != null && obj.nsp.length() != 0 && !"/".equals(obj.nsp)) {
nsp = true;
str.append(obj.nsp);
}
if (obj.id >= 0) {
if (nsp) {
str.append(",");
nsp = false;
}
str.append(obj.id);
}
if (obj.data != null) {
if (nsp) str.append(",");
str.append(obj.data);
}
logger.fine(String.format("encoded %s as %s", obj, str));
return str.toString();
}
private void encodeAsBinary(Packet obj, Callback callback) {
Binary.DeconstructedPacket deconstruction = Binary.deconstructPacket(obj);
String pack = encodeAsString(deconstruction.packet);
List<Object> buffers = new ArrayList<Object>(Arrays.asList(deconstruction.buffers));
buffers.add(0, pack);
callback.call(buffers.toArray());
}
public interface Callback {
public void call(Object[] data);
}
}
public static class Decoder extends Emitter {
public static String EVENT_DECODED = "decoded";
/*package*/ BinaryReconstructor reconstructor;
public Decoder() {
this.reconstructor = null;
}
public void add(String obj) {
Packet packet = decodeString(obj);
if (BINARY_EVENT == packet.type || BINARY_ACK == packet.type) {
this.reconstructor = new BinaryReconstructor(packet);
if (this.reconstructor.reconPack.attachments == 0) {
this.emit(EVENT_DECODED, packet);
}
} else {
this.emit(EVENT_DECODED, packet);
}
}
public void add(byte[] obj) {
if (this.reconstructor == null) {
throw new RuntimeException("got binary data when not reconstructing a packet");
} else {
Packet packet = this.reconstructor.takeBinaryData(obj);
if (packet != null) {
this.reconstructor = null;
this.emit(EVENT_DECODED, packet);
}
}
}
private static Packet decodeString(String str) {
Packet p = new Packet();
int i = 0;
int length = str.length();
p.type = Character.getNumericValue(str.charAt(0));
if (p.type < 0 || p.type > types.length - 1) return error();
if (BINARY_EVENT == p.type || BINARY_ACK == p.type) {
if (!str.contains("-") || length <= i + 1) return error();
StringBuilder attachments = new StringBuilder();
while (str.charAt(++i) != '-') {
attachments.append(str.charAt(i));
}
p.attachments = Integer.parseInt(attachments.toString());
}
if (length > i + 1 && '/' == str.charAt(i + 1)) {
StringBuilder nsp = new StringBuilder();
while (true) {
++i;
char c = str.charAt(i);
if (',' == c) break;
nsp.append(c);
if (i + 1 == length) break;
}
p.nsp = nsp.toString();
} else {
p.nsp = "/";
}
if (length > i + 1){
Character next = str.charAt(i + 1);
if (Character.getNumericValue(next) > -1) {
StringBuilder id = new StringBuilder();
while (true) {
++i;
char c = str.charAt(i);
if (Character.getNumericValue(c) < 0) {
--i;
break;
}
id.append(c);
if (i + 1 == length) break;
}
try {
p.id = Integer.parseInt(id.toString());
} catch (NumberFormatException e){
return error();
}
}
}
if (length > i + 1){
try {
str.charAt(++i);
p.data = new JSONTokener(str.substring(i)).nextValue();
} catch (JSONException e) {
return error();
}
}
logger.fine(String.format("decoded %s as %s", str, p));
return p;
}
public void destroy() {
if (this.reconstructor != null) {
this.reconstructor.finishReconstruction();
}
}
}
/*package*/ static class BinaryReconstructor {
public Packet reconPack;
/*package*/ List<byte[]> buffers;
BinaryReconstructor(Packet packet) {
this.reconPack = packet;
this.buffers = new ArrayList<byte[]>();
}
public Packet takeBinaryData(byte[] binData) {
this.buffers.add(binData);
if (this.buffers.size() == this.reconPack.attachments) {
Packet packet = Binary.reconstructPacket(this.reconPack,
this.buffers.toArray(new byte[this.buffers.size()][]));
this.finishReconstruction();
return packet;
}
return null;
}
public void finishReconstruction () {
this.reconPack = null;
this.buffers = new ArrayList<byte[]>();
}
}
}