fix termination #85

This commit is contained in:
Naoyuki Kanezawa
2015-02-01 21:44:02 +09:00
parent 8a768805dc
commit 90ae65d4ca
6 changed files with 159 additions and 31 deletions

View File

@@ -97,9 +97,6 @@ public class Manager extends Emitter {
*/ */
private ConcurrentHashMap<String, Socket> nsps; private ConcurrentHashMap<String, Socket> nsps;
private ScheduledExecutorService timeoutScheduler;
private ScheduledExecutorService reconnectScheduler;
public Manager() { public Manager() {
this(null, null); this(null, null);
@@ -294,7 +291,8 @@ public class Manager extends Emitter {
final long timeout = Manager.this._timeout; final long timeout = Manager.this._timeout;
logger.fine(String.format("connection attempt will timeout after %d", timeout)); logger.fine(String.format("connection attempt will timeout after %d", timeout));
final Future timer = getTimeoutScheduler().schedule(new Runnable() { final Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override @Override
public void run() { public void run() {
EventThread.exec(new Runnable() { EventThread.exec(new Runnable() {
@@ -308,12 +306,12 @@ public class Manager extends Emitter {
} }
}); });
} }
}, timeout, TimeUnit.MILLISECONDS); }, timeout);
Manager.this.subs.add(new On.Handle() { Manager.this.subs.add(new On.Handle() {
@Override @Override
public void destroy() { public void destroy() {
timer.cancel(false); timer.cancel();
} }
}); });
} }
@@ -457,6 +455,9 @@ public class Manager extends Emitter {
} }
/*package*/ void close() { /*package*/ void close() {
if (this.readyState != ReadyState.OPEN) {
this.cleanup();
}
this.skipReconnect = true; this.skipReconnect = true;
this.backoff.reset(); this.backoff.reset();
this.readyState = ReadyState.CLOSED; this.readyState = ReadyState.CLOSED;
@@ -472,13 +473,6 @@ public class Manager extends Emitter {
this.readyState = ReadyState.CLOSED; this.readyState = ReadyState.CLOSED;
this.emit(EVENT_CLOSE, reason); this.emit(EVENT_CLOSE, reason);
if (this.timeoutScheduler != null) {
this.timeoutScheduler.shutdown();
}
if (this.reconnectScheduler != null) {
this.reconnectScheduler.shutdown();
}
if (this._reconnection && !this.skipReconnect) { if (this._reconnection && !this.skipReconnect) {
this.reconnect(); this.reconnect();
} }
@@ -499,7 +493,8 @@ public class Manager extends Emitter {
logger.fine(String.format("will wait %dms before reconnect attempt", delay)); logger.fine(String.format("will wait %dms before reconnect attempt", delay));
this.reconnecting = true; this.reconnecting = true;
final Future timer = this.getReconnectScheduler().schedule(new Runnable() { final Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override @Override
public void run() { public void run() {
EventThread.exec(new Runnable() { EventThread.exec(new Runnable() {
@@ -532,12 +527,12 @@ public class Manager extends Emitter {
} }
}); });
} }
}, delay, TimeUnit.MILLISECONDS); }, delay);
this.subs.add(new On.Handle() { this.subs.add(new On.Handle() {
@Override @Override
public void destroy() { public void destroy() {
timer.cancel(false); timer.cancel();
} }
}); });
} }
@@ -551,20 +546,6 @@ public class Manager extends Emitter {
this.emitAll(EVENT_RECONNECT, attempts); this.emitAll(EVENT_RECONNECT, attempts);
} }
private ScheduledExecutorService getTimeoutScheduler() {
if (this.timeoutScheduler == null || this.timeoutScheduler.isShutdown()) {
this.timeoutScheduler = Executors.newSingleThreadScheduledExecutor();
}
return timeoutScheduler;
}
private ScheduledExecutorService getReconnectScheduler() {
if (this.reconnectScheduler == null || this.reconnectScheduler.isShutdown()) {
this.reconnectScheduler = Executors.newSingleThreadScheduledExecutor();
}
return this.reconnectScheduler;
}
public static interface OpenCallback { public static interface OpenCallback {

View File

@@ -7,6 +7,8 @@ import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*; import java.util.concurrent.*;
public abstract class Connection { public abstract class Connection {
@@ -95,6 +97,16 @@ public abstract class Connection {
} }
String[] createEnv() { String[] createEnv() {
return new String[] {"DEBUG=socket.io:*", "PORT=" + PORT}; Map<String, String> env = new HashMap<String, String>(System.getenv());
env.put("DEBUG", "socket.io:*");
env.put("PORT", String.valueOf(PORT));
String[] _env = new String[env.size()];
int i = 0;
for (String key : env.keySet()) {
_env[i] = key + "=" + env.get(key);
i++;
}
return _env;
} }
} }

View File

@@ -0,0 +1,46 @@
package com.github.nkzawa.socketio.client;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@RunWith(JUnit4.class)
public class ExecutionTest extends Connection {
final static int TIMEOUT = 30 * 1000;
@Test(timeout = TIMEOUT)
public void execConnection() throws InterruptedException, IOException {
exec("com.github.nkzawa.socketio.client.executions.Connection");
}
@Test(timeout = TIMEOUT)
public void execConnectionFailure() throws InterruptedException, IOException {
exec("com.github.nkzawa.socketio.client.executions.ConnectionFailure");
}
@Test(timeout = TIMEOUT)
public void execImmediateClose() throws InterruptedException, IOException {
exec("com.github.nkzawa.socketio.client.executions.ImmediateClose");
}
private void exec(String mainClass) throws InterruptedException, IOException {
Process process = Runtime.getRuntime().exec(String.format("mvn --quiet exec:java" +
" -Dexec.mainClass=%s -Dexec.classpathScope=test", mainClass), createEnv());
BufferedReader input = new BufferedReader(
new InputStreamReader(process.getInputStream()));
String line;
while ((line = input.readLine()) != null) {
System.out.println("EXEC OUT: " + line);
}
process.waitFor();
assertThat(process.exitValue(), is(0));
}
}

View File

@@ -0,0 +1,24 @@
package com.github.nkzawa.socketio.client.executions;
import com.github.nkzawa.emitter.Emitter;
import com.github.nkzawa.socketio.client.IO;
import com.github.nkzawa.socketio.client.Socket;
import java.net.URISyntaxException;
public class Connection {
public static void main(String[] args) throws URISyntaxException {
IO.Options options = new IO.Options();
options.forceNew = true;
final Socket socket = IO.socket("http://localhost:" + System.getenv("PORT"), options);
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
System.out.println("connect");
socket.close();
}
});
socket.open();
}
}

View File

@@ -0,0 +1,36 @@
package com.github.nkzawa.socketio.client.executions;
import com.github.nkzawa.emitter.Emitter;
import com.github.nkzawa.socketio.client.IO;
import com.github.nkzawa.socketio.client.Socket;
import java.net.URISyntaxException;
public class ConnectionFailure {
public static void main(String[] args) throws URISyntaxException {
int port = Integer.parseInt(System.getenv("PORT"));
port++;
IO.Options options = new IO.Options();
options.forceNew = true;
options.reconnection = false;
final Socket socket = IO.socket("http://localhost:" + port, options);
socket.on(Socket.EVENT_CONNECT_TIMEOUT, new Emitter.Listener() {
@Override
public void call(Object... args) {
System.out.println("connect timeout");
}
}).on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() {
@Override
public void call(Object... args) {
System.out.println("connect error");
}
}).on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
System.out.println("disconnect");
}
});
socket.open();
}
}

View File

@@ -0,0 +1,29 @@
package com.github.nkzawa.socketio.client.executions;
import com.github.nkzawa.emitter.Emitter;
import com.github.nkzawa.socketio.client.IO;
import com.github.nkzawa.socketio.client.Socket;
import java.net.URISyntaxException;
public class ImmediateClose {
public static void main(String[] args) throws URISyntaxException {
IO.Options options = new IO.Options();
options.forceNew = true;
final Socket socket = IO.socket("http://localhost:" + System.getenv("PORT"), options);
socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
System.out.println("connect");
}
}).on(Socket.EVENT_DISCONNECT, new Emitter.Listener() {
@Override
public void call(Object... args) {
System.out.println("disconnect");
}
});
socket.connect();
socket.disconnect();
}
}