thread constraint

This commit is contained in:
Naoyuki Kanezawa
2013-05-07 01:47:59 +09:00
parent ee4f0622e2
commit 8257f68bab
3 changed files with 192 additions and 147 deletions

View File

@@ -7,15 +7,15 @@ import java.net.MalformedURLException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
public class IO { public class IO {
private static final Map<String, Manager> managers = new HashMap<String, Manager>(); private static final ConcurrentHashMap<String, Manager> managers = new ConcurrentHashMap<String, Manager>();
public static int protocol = Parser.protocol; public static int protocol = Parser.protocol;
private IO() {} private IO() {}
public static Socket socket(String uri) throws URISyntaxException { public static Socket socket(String uri) throws URISyntaxException {
@@ -49,7 +49,7 @@ public class IO {
} else { } else {
String id = Url.extractId(parsed); String id = Url.extractId(parsed);
if (!managers.containsKey(id)) { if (!managers.containsKey(id)) {
managers.put(id, new Manager(href, opts)); managers.putIfAbsent(id, new Manager(href, opts));
} }
io = managers.get(id); io = managers.get(id);
} }

View File

@@ -1,14 +1,14 @@
package com.github.nkzawa.socketio.client; package com.github.nkzawa.socketio.client;
import com.github.nkzawa.emitter.Emitter; import com.github.nkzawa.emitter.Emitter;
import com.github.nkzawa.engineio.client.EventThread;
import com.github.nkzawa.socketio.parser.Packet; import com.github.nkzawa.socketio.parser.Packet;
import com.github.nkzawa.socketio.parser.Parser; import com.github.nkzawa.socketio.parser.Parser;
import java.net.URI; import java.net.URI;
import java.util.Map; import java.util.LinkedList;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger; import java.util.logging.Logger;
public class Manager extends Emitter { public class Manager extends Emitter {
@@ -38,15 +38,20 @@ public class Manager extends Emitter {
private long _reconnectionDelay; private long _reconnectionDelay;
private long _reconnectionDelayMax; private long _reconnectionDelayMax;
private long _timeout; private long _timeout;
private AtomicInteger connected = new AtomicInteger(); private int connected;
private AtomicInteger attempts = new AtomicInteger(); private int attempts;
private Map<String, Socket> nsps = new ConcurrentHashMap<String, Socket>(); private Queue<On.Handle> subs = new LinkedList<On.Handle>();
private Queue<On.Handle> subs = new ConcurrentLinkedQueue<On.Handle>();
private com.github.nkzawa.engineio.client.Socket engine; private com.github.nkzawa.engineio.client.Socket engine;
/**
* This HashMap can be accessed from outside of EventThread.
*/
private ConcurrentHashMap<String, Socket> nsps = new ConcurrentHashMap<String, Socket>();
private ScheduledExecutorService timeoutScheduler = Executors.newSingleThreadScheduledExecutor(); private ScheduledExecutorService timeoutScheduler = Executors.newSingleThreadScheduledExecutor();
private ScheduledExecutorService reconnectScheduler = Executors.newSingleThreadScheduledExecutor(); private ScheduledExecutorService reconnectScheduler = Executors.newSingleThreadScheduledExecutor();
public Manager(URI uri, IO.Options opts) { public Manager(URI uri, IO.Options opts) {
opts = initOptions(opts); opts = initOptions(opts);
this.engine = new Engine(uri, opts); this.engine = new Engine(uri, opts);
@@ -122,65 +127,74 @@ public class Manager extends Emitter {
} }
public Manager open(final OpenCallback fn) { public Manager open(final OpenCallback fn) {
if (this.readyState == OPEN && !this.reconnecting) return this; EventThread.exec(new Runnable() {
final com.github.nkzawa.engineio.client.Socket socket = this.engine;
final Manager self = this;
this.readyState = OPENING;
final On.Handle openSub = On.on(socket, Engine.EVENT_OPEN, new Listener() {
@Override @Override
public void call(Object... objects) { public void run() {
self.onopen(); if (Manager.this.readyState == OPEN && !Manager.this.reconnecting) return;
if (fn != null) fn.call(null);
final com.github.nkzawa.engineio.client.Socket socket = Manager.this.engine;
final Manager self = Manager.this;
Manager.this.readyState = OPENING;
final On.Handle openSub = On.on(socket, Engine.EVENT_OPEN, new Listener() {
@Override
public void call(Object... objects) {
self.onopen();
if (fn != null) fn.call(null);
}
});
On.Handle errorSub = On.on(socket, Engine.EVENT_ERROR, new Listener() {
@Override
public void call(Object... objects) {
Object data = objects.length > 0 ? objects[0] : null;
self.cleanup();
self.emit(EVENT_CONNECT_ERROR, data);
if (fn != null) {
Exception err = new SocketIOException("Connection error",
data instanceof Exception ? (Exception)data : null);
fn.call(err);
}
}
});
if (Manager.this._timeout >= 0) {
final long timeout = Manager.this._timeout;
logger.fine(String.format("connection attempt will timeout after %d", timeout));
final Future timer = timeoutScheduler.schedule(new Runnable() {
@Override
public void run() {
EventThread.exec(new Runnable() {
@Override
public void run() {
logger.fine(String.format("connect attempt timed out after %d", timeout));
openSub.destroy();
socket.close();
socket.emit(Engine.EVENT_ERROR, new SocketIOException("timeout"));
self.emit(EVENT_CONNECT_TIMEOUT, timeout);
}
});
}
}, timeout, TimeUnit.MILLISECONDS);
On.Handle timeSub = new On.Handle() {
@Override
public void destroy() {
timer.cancel(false);
}
};
Manager.this.subs.add(timeSub);
}
Manager.this.subs.add(openSub);
Manager.this.subs.add(errorSub);
Manager.this.engine.open();
} }
}); });
On.Handle errorSub = On.on(socket, Engine.EVENT_ERROR, new Listener() {
@Override
public void call(Object... objects) {
Object data = objects.length > 0 ? objects[0] : null;
self.cleanup();
self.emit(EVENT_CONNECT_ERROR, data);
if (fn != null) {
Exception err = new SocketIOException("Connection error",
data instanceof Exception ? (Exception)data : null);
fn.call(err);
}
}
});
if (this._timeout >= 0) {
final long timeout = this._timeout;
logger.fine(String.format("connection attempt will timeout after %d", timeout));
final Future timer = timeoutScheduler.schedule(new Runnable() {
@Override
public void run() {
logger.fine(String.format("connect attempt timed out after %d", timeout));
openSub.destroy();
socket.close();
socket.emit(Engine.EVENT_ERROR, new SocketIOException("timeout"));
self.emit(EVENT_CONNECT_TIMEOUT, timeout);
}
}, timeout, TimeUnit.MILLISECONDS);
On.Handle timeSub = new On.Handle() {
@Override
public void destroy() {
timer.cancel(false);
}
};
this.subs.add(timeSub);
}
this.subs.add(openSub);
this.subs.add(errorSub);
this.engine.open();
return this; return this;
} }
@@ -223,21 +237,24 @@ public class Manager extends Emitter {
Socket socket = this.nsps.get(nsp); Socket socket = this.nsps.get(nsp);
if (socket == null) { if (socket == null) {
socket = new Socket(this, nsp); socket = new Socket(this, nsp);
this.nsps.put(nsp, socket); Socket _socket = this.nsps.putIfAbsent(nsp, socket);
final Manager self = this; if (_socket != null) {
socket.on(Socket.EVENT_CONNECT, new Listener() { socket = _socket;
@Override } else {
public void call(Object... objects) { final Manager self = this;
self.connected.incrementAndGet(); socket.on(Socket.EVENT_CONNECT, new Listener() {
} @Override
}); public void call(Object... objects) {
self.connected++;
}
});
}
} }
return socket; return socket;
} }
/*package*/ void destroy(Socket socket) { /*package*/ void destroy(Socket socket) {
int connected = this.connected.decrementAndGet(); this.connected--;
if (connected == 0) { if (connected == 0) {
this.close(); this.close();
} }
@@ -269,13 +286,13 @@ public class Manager extends Emitter {
private void reconnect() { private void reconnect() {
final Manager self = this; final Manager self = this;
int attempts = this.attempts.incrementAndGet(); this.attempts++;
if (attempts > this._reconnectionAttempts) { if (attempts > this._reconnectionAttempts) {
this.emit(EVENT_RECONNECT_FAILED); this.emit(EVENT_RECONNECT_FAILED);
this.reconnecting = false; this.reconnecting = false;
} else { } else {
long delay = this.attempts.get() * this.reconnectionDelay(); long delay = this.attempts * this.reconnectionDelay();
delay = Math.min(delay, this.reconnectionDelayMax()); delay = Math.min(delay, this.reconnectionDelayMax());
logger.fine(String.format("will wait %dms before reconnect attempt", delay)); logger.fine(String.format("will wait %dms before reconnect attempt", delay));
@@ -283,18 +300,23 @@ public class Manager extends Emitter {
final Future timer = this.reconnectScheduler.schedule(new Runnable() { final Future timer = this.reconnectScheduler.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
logger.fine("attempting reconnect"); EventThread.exec(new Runnable() {
self.open(new OpenCallback() {
@Override @Override
public void call(Exception err) { public void run() {
if (err != null) { logger.fine("attempting reconnect");
logger.fine("reconnect attempt error"); self.open(new OpenCallback() {
self.reconnect(); @Override
self.emit(EVENT_RECONNECT_ERROR, err); public void call(Exception err) {
} else { if (err != null) {
logger.fine("reconnect success"); logger.fine("reconnect attempt error");
self.onreconnect(); self.reconnect();
} self.emit(EVENT_RECONNECT_ERROR, err);
} else {
logger.fine("reconnect success");
self.onreconnect();
}
}
});
} }
}); });
} }
@@ -310,12 +332,13 @@ public class Manager extends Emitter {
} }
private void onreconnect() { private void onreconnect() {
int attempts = this.attempts.get(); int attempts = this.attempts;
this.attempts.set(0); this.attempts = 0;
this.reconnecting = false; this.reconnecting = false;
this.emit(EVENT_RECONNECT, attempts); this.emit(EVENT_RECONNECT, attempts);
} }
public static interface OpenCallback { public static interface OpenCallback {
public void call(Exception err); public void call(Exception err);

View File

@@ -1,6 +1,7 @@
package com.github.nkzawa.socketio.client; package com.github.nkzawa.socketio.client;
import com.github.nkzawa.emitter.Emitter; import com.github.nkzawa.emitter.Emitter;
import com.github.nkzawa.engineio.client.EventThread;
import com.github.nkzawa.socketio.parser.Packet; import com.github.nkzawa.socketio.parser.Packet;
import com.github.nkzawa.socketio.parser.Parser; import com.github.nkzawa.socketio.parser.Parser;
import com.google.gson.Gson; import com.google.gson.Gson;
@@ -8,9 +9,6 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement; import com.google.gson.JsonElement;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger; import java.util.logging.Logger;
public class Socket extends Emitter { public class Socket extends Emitter {
@@ -32,12 +30,13 @@ public class Socket extends Emitter {
private boolean connected; private boolean connected;
private boolean disconnected = true; private boolean disconnected = true;
private AtomicInteger ids = new AtomicInteger(); private int ids;
private String nsp; private String nsp;
private Manager io; private Manager io;
private Map<Integer, Ack> acks = new ConcurrentHashMap<Integer, Ack>(); private Map<Integer, Ack> acks = new HashMap<Integer, Ack>();
private Queue<On.Handle> subs; private Queue<On.Handle> subs;
private final Queue<LinkedList<Object>> buffer = new ConcurrentLinkedQueue<LinkedList<Object>>(); private final Queue<LinkedList<Object>> buffer = new LinkedList<LinkedList<Object>>();
public Socket(Manager io, String nsp) { public Socket(Manager io, String nsp) {
this.io = io; this.io = io;
@@ -45,50 +44,65 @@ public class Socket extends Emitter {
} }
public void open() { public void open() {
final Manager io = this.io; EventThread.exec(new Runnable() {
this.subs = new ConcurrentLinkedQueue<On.Handle>() {{ @Override
add(On.on(io, Manager.EVENT_OPEN, new Listener() { public void run() {
@Override final Manager io = Socket.this.io;
public void call(Object... objects) { Socket.this.subs = new LinkedList<On.Handle>() {{
Socket.this.onopen(); add(On.on(io, Manager.EVENT_OPEN, new Listener() {
} @Override
})); public void call(Object... objects) {
add(On.on(io, Manager.EVENT_ERROR, new Listener() { Socket.this.onopen();
@Override }
public void call(Object... objects) { }));
Socket.this.onerror(objects.length > 0 ? (Exception) objects[0] : null); add(On.on(io, Manager.EVENT_ERROR, new Listener() {
} @Override
})); public void call(Object... objects) {
}}; Socket.this.onerror(objects.length > 0 ? (Exception) objects[0] : null);
if (this.io.readyState == Manager.OPEN) this.onopen(); }
io.open(); }));
}};
if (Socket.this.io.readyState == Manager.OPEN) Socket.this.onopen();
io.open();
}
});
} }
public void connect() { public void connect() {
this.open(); this.open();
} }
public Socket send(Object... args) { public Socket send(final Object... args) {
this.emit(EVENT_MESSAGE, args); EventThread.exec(new Runnable() {
@Override
public void run() {
Socket.this.emit(EVENT_MESSAGE, args);
}
});
return this; return this;
} }
@Override @Override
public Emitter emit(String event, Object... args) { public Emitter emit(final String event, final Object... args) {
if (events.contains(event)) { EventThread.exec(new Runnable() {
super.emit(event, args); @Override
} else { public void run() {
LinkedList<Object> _args = new LinkedList<Object>(Arrays.asList(args)); if (events.contains(event)) {
if (_args.peekLast() instanceof Ack) { Socket.super.emit(event, args);
Ack ack = (Ack)_args.pollLast(); } else {
return this.emit(event, _args.toArray(), ack); LinkedList<Object> _args = new LinkedList<Object>(Arrays.asList(args));
if (_args.peekLast() instanceof Ack) {
Ack ack = (Ack)_args.pollLast();
Socket.this.emit(event, _args.toArray(), ack);
return;
}
_args.offerFirst(event);
Packet packet = new Packet(Parser.EVENT, toJsonArray(_args));
Socket.this.packet(packet);
}
} }
});
_args.offerFirst(event);
Packet packet = new Packet(Parser.EVENT, toJsonArray(_args));
this.packet(packet);
}
return this; return this;
} }
@@ -100,20 +114,23 @@ public class Socket extends Emitter {
* @param ack * @param ack
* @return * @return
*/ */
public Emitter emit(final String event, final Object[] args, Ack ack) { public Emitter emit(final String event, final Object[] args, final Ack ack) {
List<Object> _args = new ArrayList<Object>() {{ EventThread.exec(new Runnable() {
add(event); @Override
addAll(Arrays.asList(args)); public void run() {
}}; List<Object> _args = new ArrayList<Object>() {{
Packet packet = new Packet(Parser.EVENT, toJsonArray(_args)); add(event);
addAll(Arrays.asList(args));
}};
Packet packet = new Packet(Parser.EVENT, toJsonArray(_args));
int ids = this.ids.getAndIncrement(); logger.fine(String.format("emitting packet with ack id %d", ids));
logger.fine(String.format("emitting packet with ack id %d", ids)); Socket.this.acks.put(ids, ack);
this.acks.put(ids, ack); packet.id = ids++;
packet.id = ids;
this.packet(packet);
Socket.this.packet(packet);
}
});
return this; return this;
} }
@@ -255,14 +272,19 @@ public class Socket extends Emitter {
} }
public Socket close() { public Socket close() {
if (!this.connected) return this; EventThread.exec(new Runnable() {
@Override
public void run() {
if (!Socket.this.connected) return;
logger.fine(String.format("performing disconnect (%s)", this.nsp)); logger.fine(String.format("performing disconnect (%s)", Socket.this.nsp));
this.packet(new Packet(Parser.DISCONNECT)); Socket.this.packet(new Packet(Parser.DISCONNECT));
this.destroy(); Socket.this.destroy();
this.onclose("io client disconnect"); Socket.this.onclose("io client disconnect");
}
});
return this; return this;
} }