fix #4 gracefully shutdown all ExecutorServices

This commit is contained in:
Naoyuki Kanezawa
2014-08-16 20:04:20 +09:00
parent 46196d57dc
commit f3057dd5d3
5 changed files with 48 additions and 28 deletions

View File

@@ -123,7 +123,7 @@ public class Socket extends Emitter {
private SSLContext sslContext; private SSLContext sslContext;
private ReadyState readyState; private ReadyState readyState;
private ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(); private ScheduledExecutorService heartbeatScheduler;
public static void setDefaultSSLContext(SSLContext sslContext) { public static void setDefaultSSLContext(SSLContext sslContext) {
defaultSSLContext = sslContext; defaultSSLContext = sslContext;
@@ -333,7 +333,7 @@ public class Socket extends Emitter {
} else { } else {
logger.fine(String.format("probe transport '%s' failed", name)); logger.fine(String.format("probe transport '%s' failed", name));
EngineIOException err = new EngineIOException("probe error"); EngineIOException err = new EngineIOException("probe error");
//err.transport = transport[0].name; err.transport = transport[0].name;
self.emit(EVENT_UPGRADE_ERROR, err); self.emit(EVENT_UPGRADE_ERROR, err);
} }
} }
@@ -494,7 +494,7 @@ public class Socket extends Emitter {
private void onHeartbeat(long timeout) { private void onHeartbeat(long timeout) {
if (this.pingTimeoutTimer != null) { if (this.pingTimeoutTimer != null) {
pingTimeoutTimer.cancel(true); pingTimeoutTimer.cancel(false);
} }
if (timeout <= 0) { if (timeout <= 0) {
@@ -502,7 +502,7 @@ public class Socket extends Emitter {
} }
final Socket self = this; final Socket self = this;
this.pingTimeoutTimer = this.heartbeatScheduler.schedule(new Runnable() { this.pingTimeoutTimer = this.getHeartbeatScheduler().schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
EventThread.exec(new Runnable() { EventThread.exec(new Runnable() {
@@ -518,11 +518,11 @@ public class Socket extends Emitter {
private void setPing() { private void setPing() {
if (this.pingIntervalTimer != null) { if (this.pingIntervalTimer != null) {
pingIntervalTimer.cancel(true); pingIntervalTimer.cancel(false);
} }
final Socket self = this; final Socket self = this;
this.pingIntervalTimer = this.heartbeatScheduler.schedule(new Runnable() { this.pingIntervalTimer = this.getHeartbeatScheduler().schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
EventThread.exec(new Runnable() { EventThread.exec(new Runnable() {
@@ -697,10 +697,13 @@ public class Socket extends Emitter {
// clear timers // clear timers
if (this.pingIntervalTimer != null) { if (this.pingIntervalTimer != null) {
this.pingIntervalTimer.cancel(true); this.pingIntervalTimer.cancel(false);
} }
if (this.pingTimeoutTimer != null) { if (this.pingTimeoutTimer != null) {
this.pingTimeoutTimer.cancel(true); this.pingTimeoutTimer.cancel(false);
}
if (this.heartbeatScheduler != null) {
this.heartbeatScheduler.shutdown();
} }
EventThread.nextTick(new Runnable() { EventThread.nextTick(new Runnable() {
@@ -742,6 +745,13 @@ public class Socket extends Emitter {
return filteredUpgrades; return filteredUpgrades;
} }
private ScheduledExecutorService getHeartbeatScheduler() {
if (this.heartbeatScheduler == null || this.heartbeatScheduler.isShutdown()) {
this.heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();
}
return this.heartbeatScheduler;
}
public static class Options extends Transport.Options { public static class Options extends Transport.Options {
/** /**

View File

@@ -14,8 +14,6 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger; import java.util.logging.Logger;
public class PollingXHR extends Polling { public class PollingXHR extends Polling {
@@ -142,8 +140,6 @@ public class PollingXHR extends Polling {
public static final String EVENT_REQUEST_HEADERS = "requestHeaders"; public static final String EVENT_REQUEST_HEADERS = "requestHeaders";
public static final String EVENT_RESPONSE_HEADERS = "responseHeaders"; public static final String EVENT_RESPONSE_HEADERS = "responseHeaders";
private static final ExecutorService xhrService = Executors.newCachedThreadPool();
private String method; private String method;
private String uri; private String uri;
private byte[] data; private byte[] data;
@@ -186,7 +182,7 @@ public class PollingXHR extends Polling {
} }
logger.fine(String.format("sending xhr with url %s | data %s", this.uri, this.data)); logger.fine(String.format("sending xhr with url %s | data %s", this.uri, this.data));
xhrService.submit(new Runnable() { new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
OutputStream output = null; OutputStream output = null;
@@ -253,7 +249,7 @@ public class PollingXHR extends Polling {
} catch (IOException e) {} } catch (IOException e) {}
} }
} }
}); }).start();
} }
private void onSuccess() { private void onSuccess() {

View File

@@ -4,6 +4,7 @@ package com.github.nkzawa.thread;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
@@ -11,21 +12,32 @@ import java.util.concurrent.ThreadFactory;
*/ */
public class EventThread extends Thread { public class EventThread extends Thread {
private static final ExecutorService service = Executors.newSingleThreadExecutor(new ThreadFactory() { private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() {
@Override @Override
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
thread = new EventThread(runnable); thread = new EventThread(runnable);
return thread; return thread;
} }
}); };
private static ExecutorService service;
private static volatile EventThread thread; private static volatile EventThread thread;
private static AtomicInteger counter = new AtomicInteger();
private EventThread(Runnable runnable) { private EventThread(Runnable runnable) {
super(runnable); super(runnable);
} }
private static ExecutorService getExecutorService() {
if (service == null || service.isShutdown()) {
service = Executors.newSingleThreadExecutor(THREAD_FACTORY);
}
return service;
}
/** /**
* check if the current thread is EventThread. * check if the current thread is EventThread.
* *
@@ -44,7 +56,7 @@ public class EventThread extends Thread {
if (isCurrent()) { if (isCurrent()) {
task.run(); task.run();
} else { } else {
service.execute(task); nextTick(task);
} }
} }
@@ -53,8 +65,16 @@ public class EventThread extends Thread {
* *
* @param task * @param task
*/ */
public static void nextTick(Runnable task) { public static void nextTick(final Runnable task) {
service.execute(task); counter.incrementAndGet();
getExecutorService().execute(new Runnable() {
@Override
public void run() {
task.run();
if (counter.decrementAndGet() == 0) {
service.shutdown();
}
}
});
} }
} }

View File

@@ -65,8 +65,8 @@ public abstract class Connection {
public void stopServer() throws InterruptedException { public void stopServer() throws InterruptedException {
System.out.println("Stopping server ..."); System.out.println("Stopping server ...");
serverProcess.destroy(); serverProcess.destroy();
serverOutout.cancel(true); serverOutout.cancel(false);
serverError.cancel(true); serverError.cancel(false);
serverService.shutdown(); serverService.shutdown();
serverService.awaitTermination(3000, TimeUnit.MILLISECONDS); serverService.awaitTermination(3000, TimeUnit.MILLISECONDS);
} }

View File

@@ -1,6 +1,5 @@
package com.github.nkzawa.thread; package com.github.nkzawa.thread;
import com.github.nkzawa.thread.EventThread;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
@@ -36,17 +35,14 @@ public class EventThreadTest {
@Test @Test
public void exec() throws InterruptedException { public void exec() throws InterruptedException {
final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>(); final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();
final Set<Thread> threads = new HashSet<Thread>();
EventThread.exec(new Runnable() { EventThread.exec(new Runnable() {
@Override @Override
public void run() { public void run() {
threads.add(Thread.currentThread());
queue.offer(0); queue.offer(0);
EventThread.exec(new Runnable() { EventThread.exec(new Runnable() {
@Override @Override
public void run() { public void run() {
threads.add(Thread.currentThread());
queue.offer(1); queue.offer(1);
} }
}); });
@@ -57,7 +53,6 @@ public class EventThreadTest {
EventThread.exec(new Runnable() { EventThread.exec(new Runnable() {
@Override @Override
public void run() { public void run() {
threads.add(Thread.currentThread());
queue.offer(3); queue.offer(3);
} }
}); });
@@ -65,7 +60,6 @@ public class EventThreadTest {
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
assertThat(queue.take(), is(i)); assertThat(queue.take(), is(i));
} }
assertThat(threads.size(), is(1));
} }
@Test @Test