diff --git a/src/main/java/com/github/nkzawa/socketio/client/IO.java b/src/main/java/com/github/nkzawa/socketio/client/IO.java index d8119e1..0f50175 100644 --- a/src/main/java/com/github/nkzawa/socketio/client/IO.java +++ b/src/main/java/com/github/nkzawa/socketio/client/IO.java @@ -7,15 +7,15 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class IO { - private static final Map managers = new HashMap(); + private static final ConcurrentHashMap managers = new ConcurrentHashMap(); public static int protocol = Parser.protocol; + private IO() {} public static Socket socket(String uri) throws URISyntaxException { @@ -49,7 +49,7 @@ public class IO { } else { String id = Url.extractId(parsed); if (!managers.containsKey(id)) { - managers.put(id, new Manager(href, opts)); + managers.putIfAbsent(id, new Manager(href, opts)); } io = managers.get(id); } diff --git a/src/main/java/com/github/nkzawa/socketio/client/Manager.java b/src/main/java/com/github/nkzawa/socketio/client/Manager.java index 284e856..dcbe41e 100644 --- a/src/main/java/com/github/nkzawa/socketio/client/Manager.java +++ b/src/main/java/com/github/nkzawa/socketio/client/Manager.java @@ -1,14 +1,14 @@ package com.github.nkzawa.socketio.client; import com.github.nkzawa.emitter.Emitter; +import com.github.nkzawa.engineio.client.EventThread; import com.github.nkzawa.socketio.parser.Packet; import com.github.nkzawa.socketio.parser.Parser; import java.net.URI; -import java.util.Map; +import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; public class Manager extends Emitter { @@ -38,15 +38,20 @@ public class Manager extends Emitter { private long _reconnectionDelay; private long _reconnectionDelayMax; private long _timeout; - private AtomicInteger connected = new AtomicInteger(); - private AtomicInteger attempts = new AtomicInteger(); - private Map nsps = new ConcurrentHashMap(); - private Queue subs = new ConcurrentLinkedQueue(); + private int connected; + private int attempts; + private Queue subs = new LinkedList(); private com.github.nkzawa.engineio.client.Socket engine; + /** + * This HashMap can be accessed from outside of EventThread. + */ + private ConcurrentHashMap nsps = new ConcurrentHashMap(); + private ScheduledExecutorService timeoutScheduler = Executors.newSingleThreadScheduledExecutor(); private ScheduledExecutorService reconnectScheduler = Executors.newSingleThreadScheduledExecutor(); + public Manager(URI uri, IO.Options opts) { opts = initOptions(opts); this.engine = new Engine(uri, opts); @@ -122,65 +127,74 @@ public class Manager extends Emitter { } public Manager open(final OpenCallback fn) { - if (this.readyState == OPEN && !this.reconnecting) return this; - - final com.github.nkzawa.engineio.client.Socket socket = this.engine; - final Manager self = this; - - this.readyState = OPENING; - - final On.Handle openSub = On.on(socket, Engine.EVENT_OPEN, new Listener() { + EventThread.exec(new Runnable() { @Override - public void call(Object... objects) { - self.onopen(); - if (fn != null) fn.call(null); + public void run() { + if (Manager.this.readyState == OPEN && !Manager.this.reconnecting) return; + + final com.github.nkzawa.engineio.client.Socket socket = Manager.this.engine; + final Manager self = Manager.this; + + Manager.this.readyState = OPENING; + + final On.Handle openSub = On.on(socket, Engine.EVENT_OPEN, new Listener() { + @Override + public void call(Object... objects) { + self.onopen(); + if (fn != null) fn.call(null); + } + }); + + On.Handle errorSub = On.on(socket, Engine.EVENT_ERROR, new Listener() { + @Override + public void call(Object... objects) { + Object data = objects.length > 0 ? objects[0] : null; + self.cleanup(); + self.emit(EVENT_CONNECT_ERROR, data); + if (fn != null) { + Exception err = new SocketIOException("Connection error", + data instanceof Exception ? (Exception)data : null); + fn.call(err); + } + } + }); + + if (Manager.this._timeout >= 0) { + final long timeout = Manager.this._timeout; + logger.fine(String.format("connection attempt will timeout after %d", timeout)); + + final Future timer = timeoutScheduler.schedule(new Runnable() { + @Override + public void run() { + EventThread.exec(new Runnable() { + @Override + public void run() { + logger.fine(String.format("connect attempt timed out after %d", timeout)); + openSub.destroy(); + socket.close(); + socket.emit(Engine.EVENT_ERROR, new SocketIOException("timeout")); + self.emit(EVENT_CONNECT_TIMEOUT, timeout); + } + }); + } + }, timeout, TimeUnit.MILLISECONDS); + + On.Handle timeSub = new On.Handle() { + @Override + public void destroy() { + timer.cancel(false); + } + }; + + Manager.this.subs.add(timeSub); + } + + Manager.this.subs.add(openSub); + Manager.this.subs.add(errorSub); + + Manager.this.engine.open(); } }); - - On.Handle errorSub = On.on(socket, Engine.EVENT_ERROR, new Listener() { - @Override - public void call(Object... objects) { - Object data = objects.length > 0 ? objects[0] : null; - self.cleanup(); - self.emit(EVENT_CONNECT_ERROR, data); - if (fn != null) { - Exception err = new SocketIOException("Connection error", - data instanceof Exception ? (Exception)data : null); - fn.call(err); - } - } - }); - - if (this._timeout >= 0) { - final long timeout = this._timeout; - logger.fine(String.format("connection attempt will timeout after %d", timeout)); - - final Future timer = timeoutScheduler.schedule(new Runnable() { - @Override - public void run() { - logger.fine(String.format("connect attempt timed out after %d", timeout)); - openSub.destroy(); - socket.close(); - socket.emit(Engine.EVENT_ERROR, new SocketIOException("timeout")); - self.emit(EVENT_CONNECT_TIMEOUT, timeout); - } - }, timeout, TimeUnit.MILLISECONDS); - - On.Handle timeSub = new On.Handle() { - @Override - public void destroy() { - timer.cancel(false); - } - }; - - this.subs.add(timeSub); - } - - this.subs.add(openSub); - this.subs.add(errorSub); - - this.engine.open(); - return this; } @@ -223,21 +237,24 @@ public class Manager extends Emitter { Socket socket = this.nsps.get(nsp); if (socket == null) { socket = new Socket(this, nsp); - this.nsps.put(nsp, socket); - final Manager self = this; - socket.on(Socket.EVENT_CONNECT, new Listener() { - @Override - public void call(Object... objects) { - self.connected.incrementAndGet(); - } - }); - + Socket _socket = this.nsps.putIfAbsent(nsp, socket); + if (_socket != null) { + socket = _socket; + } else { + final Manager self = this; + socket.on(Socket.EVENT_CONNECT, new Listener() { + @Override + public void call(Object... objects) { + self.connected++; + } + }); + } } return socket; } /*package*/ void destroy(Socket socket) { - int connected = this.connected.decrementAndGet(); + this.connected--; if (connected == 0) { this.close(); } @@ -269,13 +286,13 @@ public class Manager extends Emitter { private void reconnect() { final Manager self = this; - int attempts = this.attempts.incrementAndGet(); + this.attempts++; if (attempts > this._reconnectionAttempts) { this.emit(EVENT_RECONNECT_FAILED); this.reconnecting = false; } else { - long delay = this.attempts.get() * this.reconnectionDelay(); + long delay = this.attempts * this.reconnectionDelay(); delay = Math.min(delay, this.reconnectionDelayMax()); logger.fine(String.format("will wait %dms before reconnect attempt", delay)); @@ -283,18 +300,23 @@ public class Manager extends Emitter { final Future timer = this.reconnectScheduler.schedule(new Runnable() { @Override public void run() { - logger.fine("attempting reconnect"); - self.open(new OpenCallback() { + EventThread.exec(new Runnable() { @Override - public void call(Exception err) { - if (err != null) { - logger.fine("reconnect attempt error"); - self.reconnect(); - self.emit(EVENT_RECONNECT_ERROR, err); - } else { - logger.fine("reconnect success"); - self.onreconnect(); - } + public void run() { + logger.fine("attempting reconnect"); + self.open(new OpenCallback() { + @Override + public void call(Exception err) { + if (err != null) { + logger.fine("reconnect attempt error"); + self.reconnect(); + self.emit(EVENT_RECONNECT_ERROR, err); + } else { + logger.fine("reconnect success"); + self.onreconnect(); + } + } + }); } }); } @@ -310,12 +332,13 @@ public class Manager extends Emitter { } private void onreconnect() { - int attempts = this.attempts.get(); - this.attempts.set(0); + int attempts = this.attempts; + this.attempts = 0; this.reconnecting = false; this.emit(EVENT_RECONNECT, attempts); } + public static interface OpenCallback { public void call(Exception err); diff --git a/src/main/java/com/github/nkzawa/socketio/client/Socket.java b/src/main/java/com/github/nkzawa/socketio/client/Socket.java index 151e682..66234e1 100644 --- a/src/main/java/com/github/nkzawa/socketio/client/Socket.java +++ b/src/main/java/com/github/nkzawa/socketio/client/Socket.java @@ -1,6 +1,7 @@ package com.github.nkzawa.socketio.client; import com.github.nkzawa.emitter.Emitter; +import com.github.nkzawa.engineio.client.EventThread; import com.github.nkzawa.socketio.parser.Packet; import com.github.nkzawa.socketio.parser.Parser; import com.google.gson.Gson; @@ -8,9 +9,6 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; public class Socket extends Emitter { @@ -32,12 +30,13 @@ public class Socket extends Emitter { private boolean connected; private boolean disconnected = true; - private AtomicInteger ids = new AtomicInteger(); + private int ids; private String nsp; private Manager io; - private Map acks = new ConcurrentHashMap(); + private Map acks = new HashMap(); private Queue subs; - private final Queue> buffer = new ConcurrentLinkedQueue>(); + private final Queue> buffer = new LinkedList>(); + public Socket(Manager io, String nsp) { this.io = io; @@ -45,50 +44,65 @@ public class Socket extends Emitter { } public void open() { - final Manager io = this.io; - this.subs = new ConcurrentLinkedQueue() {{ - add(On.on(io, Manager.EVENT_OPEN, new Listener() { - @Override - public void call(Object... objects) { - Socket.this.onopen(); - } - })); - add(On.on(io, Manager.EVENT_ERROR, new Listener() { - @Override - public void call(Object... objects) { - Socket.this.onerror(objects.length > 0 ? (Exception) objects[0] : null); - } - })); - }}; - if (this.io.readyState == Manager.OPEN) this.onopen(); - io.open(); + EventThread.exec(new Runnable() { + @Override + public void run() { + final Manager io = Socket.this.io; + Socket.this.subs = new LinkedList() {{ + add(On.on(io, Manager.EVENT_OPEN, new Listener() { + @Override + public void call(Object... objects) { + Socket.this.onopen(); + } + })); + add(On.on(io, Manager.EVENT_ERROR, new Listener() { + @Override + public void call(Object... objects) { + Socket.this.onerror(objects.length > 0 ? (Exception) objects[0] : null); + } + })); + }}; + if (Socket.this.io.readyState == Manager.OPEN) Socket.this.onopen(); + io.open(); + } + }); } public void connect() { this.open(); } - public Socket send(Object... args) { - this.emit(EVENT_MESSAGE, args); + public Socket send(final Object... args) { + EventThread.exec(new Runnable() { + @Override + public void run() { + Socket.this.emit(EVENT_MESSAGE, args); + } + }); return this; } @Override - public Emitter emit(String event, Object... args) { - if (events.contains(event)) { - super.emit(event, args); - } else { - LinkedList _args = new LinkedList(Arrays.asList(args)); - if (_args.peekLast() instanceof Ack) { - Ack ack = (Ack)_args.pollLast(); - return this.emit(event, _args.toArray(), ack); + public Emitter emit(final String event, final Object... args) { + EventThread.exec(new Runnable() { + @Override + public void run() { + if (events.contains(event)) { + Socket.super.emit(event, args); + } else { + LinkedList _args = new LinkedList(Arrays.asList(args)); + if (_args.peekLast() instanceof Ack) { + Ack ack = (Ack)_args.pollLast(); + Socket.this.emit(event, _args.toArray(), ack); + return; + } + + _args.offerFirst(event); + Packet packet = new Packet(Parser.EVENT, toJsonArray(_args)); + Socket.this.packet(packet); + } } - - _args.offerFirst(event); - Packet packet = new Packet(Parser.EVENT, toJsonArray(_args)); - this.packet(packet); - } - + }); return this; } @@ -100,20 +114,23 @@ public class Socket extends Emitter { * @param ack * @return */ - public Emitter emit(final String event, final Object[] args, Ack ack) { - List _args = new ArrayList() {{ - add(event); - addAll(Arrays.asList(args)); - }}; - Packet packet = new Packet(Parser.EVENT, toJsonArray(_args)); + public Emitter emit(final String event, final Object[] args, final Ack ack) { + EventThread.exec(new Runnable() { + @Override + public void run() { + List _args = new ArrayList() {{ + add(event); + addAll(Arrays.asList(args)); + }}; + Packet packet = new Packet(Parser.EVENT, toJsonArray(_args)); - int ids = this.ids.getAndIncrement(); - logger.fine(String.format("emitting packet with ack id %d", ids)); - this.acks.put(ids, ack); - packet.id = ids; - - this.packet(packet); + logger.fine(String.format("emitting packet with ack id %d", ids)); + Socket.this.acks.put(ids, ack); + packet.id = ids++; + Socket.this.packet(packet); + } + }); return this; } @@ -255,14 +272,19 @@ public class Socket extends Emitter { } public Socket close() { - if (!this.connected) return this; + EventThread.exec(new Runnable() { + @Override + public void run() { + if (!Socket.this.connected) return; - logger.fine(String.format("performing disconnect (%s)", this.nsp)); - this.packet(new Packet(Parser.DISCONNECT)); + logger.fine(String.format("performing disconnect (%s)", Socket.this.nsp)); + Socket.this.packet(new Packet(Parser.DISCONNECT)); - this.destroy(); + Socket.this.destroy(); - this.onclose("io client disconnect"); + Socket.this.onclose("io client disconnect"); + } + }); return this; }