diff --git a/src/main/java/com/github/nkzawa/engineio/client/Socket.java b/src/main/java/com/github/nkzawa/engineio/client/Socket.java index e7aaae2..bbec74b 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/Socket.java +++ b/src/main/java/com/github/nkzawa/engineio/client/Socket.java @@ -123,7 +123,7 @@ public class Socket extends Emitter { private SSLContext sslContext; private ReadyState readyState; - private ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(); + private ScheduledExecutorService heartbeatScheduler; public static void setDefaultSSLContext(SSLContext sslContext) { defaultSSLContext = sslContext; @@ -333,7 +333,7 @@ public class Socket extends Emitter { } else { logger.fine(String.format("probe transport '%s' failed", name)); EngineIOException err = new EngineIOException("probe error"); - //err.transport = transport[0].name; + err.transport = transport[0].name; self.emit(EVENT_UPGRADE_ERROR, err); } } @@ -494,7 +494,7 @@ public class Socket extends Emitter { private void onHeartbeat(long timeout) { if (this.pingTimeoutTimer != null) { - pingTimeoutTimer.cancel(true); + pingTimeoutTimer.cancel(false); } if (timeout <= 0) { @@ -502,7 +502,7 @@ public class Socket extends Emitter { } final Socket self = this; - this.pingTimeoutTimer = this.heartbeatScheduler.schedule(new Runnable() { + this.pingTimeoutTimer = this.getHeartbeatScheduler().schedule(new Runnable() { @Override public void run() { EventThread.exec(new Runnable() { @@ -518,11 +518,11 @@ public class Socket extends Emitter { private void setPing() { if (this.pingIntervalTimer != null) { - pingIntervalTimer.cancel(true); + pingIntervalTimer.cancel(false); } final Socket self = this; - this.pingIntervalTimer = this.heartbeatScheduler.schedule(new Runnable() { + this.pingIntervalTimer = this.getHeartbeatScheduler().schedule(new Runnable() { @Override public void run() { EventThread.exec(new Runnable() { @@ -697,10 +697,13 @@ public class Socket extends Emitter { // clear timers if (this.pingIntervalTimer != null) { - this.pingIntervalTimer.cancel(true); + this.pingIntervalTimer.cancel(false); } if (this.pingTimeoutTimer != null) { - this.pingTimeoutTimer.cancel(true); + this.pingTimeoutTimer.cancel(false); + } + if (this.heartbeatScheduler != null) { + this.heartbeatScheduler.shutdown(); } EventThread.nextTick(new Runnable() { @@ -742,6 +745,13 @@ public class Socket extends Emitter { return filteredUpgrades; } + private ScheduledExecutorService getHeartbeatScheduler() { + if (this.heartbeatScheduler == null || this.heartbeatScheduler.isShutdown()) { + this.heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(); + } + return this.heartbeatScheduler; + } + public static class Options extends Transport.Options { /** diff --git a/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java b/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java index 8e54ec4..193f009 100644 --- a/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java +++ b/src/main/java/com/github/nkzawa/engineio/client/transports/PollingXHR.java @@ -14,8 +14,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.logging.Logger; public class PollingXHR extends Polling { @@ -142,8 +140,6 @@ public class PollingXHR extends Polling { public static final String EVENT_REQUEST_HEADERS = "requestHeaders"; public static final String EVENT_RESPONSE_HEADERS = "responseHeaders"; - private static final ExecutorService xhrService = Executors.newCachedThreadPool(); - private String method; private String uri; private byte[] data; @@ -186,7 +182,7 @@ public class PollingXHR extends Polling { } logger.fine(String.format("sending xhr with url %s | data %s", this.uri, this.data)); - xhrService.submit(new Runnable() { + new Thread(new Runnable() { @Override public void run() { OutputStream output = null; @@ -253,7 +249,7 @@ public class PollingXHR extends Polling { } catch (IOException e) {} } } - }); + }).start(); } private void onSuccess() { diff --git a/src/main/java/com/github/nkzawa/thread/EventThread.java b/src/main/java/com/github/nkzawa/thread/EventThread.java index cdbe545..6f7c339 100644 --- a/src/main/java/com/github/nkzawa/thread/EventThread.java +++ b/src/main/java/com/github/nkzawa/thread/EventThread.java @@ -4,6 +4,7 @@ package com.github.nkzawa.thread; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; /** @@ -11,21 +12,32 @@ import java.util.concurrent.ThreadFactory; */ public class EventThread extends Thread { - private static final ExecutorService service = Executors.newSingleThreadExecutor(new ThreadFactory() { + private static final ThreadFactory THREAD_FACTORY = new ThreadFactory() { @Override public Thread newThread(Runnable runnable) { thread = new EventThread(runnable); return thread; } - }); + }; + + private static ExecutorService service; private static volatile EventThread thread; + private static AtomicInteger counter = new AtomicInteger(); + private EventThread(Runnable runnable) { super(runnable); } + private static ExecutorService getExecutorService() { + if (service == null || service.isShutdown()) { + service = Executors.newSingleThreadExecutor(THREAD_FACTORY); + } + return service; + } + /** * check if the current thread is EventThread. * @@ -44,7 +56,7 @@ public class EventThread extends Thread { if (isCurrent()) { task.run(); } else { - service.execute(task); + nextTick(task); } } @@ -53,8 +65,16 @@ public class EventThread extends Thread { * * @param task */ - public static void nextTick(Runnable task) { - service.execute(task); + public static void nextTick(final Runnable task) { + counter.incrementAndGet(); + getExecutorService().execute(new Runnable() { + @Override + public void run() { + task.run(); + if (counter.decrementAndGet() == 0) { + service.shutdown(); + } + } + }); } - } diff --git a/src/test/java/com/github/nkzawa/engineio/client/Connection.java b/src/test/java/com/github/nkzawa/engineio/client/Connection.java index e72b095..5f18d1f 100644 --- a/src/test/java/com/github/nkzawa/engineio/client/Connection.java +++ b/src/test/java/com/github/nkzawa/engineio/client/Connection.java @@ -65,8 +65,8 @@ public abstract class Connection { public void stopServer() throws InterruptedException { System.out.println("Stopping server ..."); serverProcess.destroy(); - serverOutout.cancel(true); - serverError.cancel(true); + serverOutout.cancel(false); + serverError.cancel(false); serverService.shutdown(); serverService.awaitTermination(3000, TimeUnit.MILLISECONDS); } diff --git a/src/test/java/com/github/nkzawa/thread/EventThreadTest.java b/src/test/java/com/github/nkzawa/thread/EventThreadTest.java index bf0736b..1deb88f 100644 --- a/src/test/java/com/github/nkzawa/thread/EventThreadTest.java +++ b/src/test/java/com/github/nkzawa/thread/EventThreadTest.java @@ -1,6 +1,5 @@ package com.github.nkzawa.thread; -import com.github.nkzawa.thread.EventThread; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -36,17 +35,14 @@ public class EventThreadTest { @Test public void exec() throws InterruptedException { final BlockingQueue queue = new LinkedBlockingQueue(); - final Set threads = new HashSet(); EventThread.exec(new Runnable() { @Override public void run() { - threads.add(Thread.currentThread()); queue.offer(0); EventThread.exec(new Runnable() { @Override public void run() { - threads.add(Thread.currentThread()); queue.offer(1); } }); @@ -57,7 +53,6 @@ public class EventThreadTest { EventThread.exec(new Runnable() { @Override public void run() { - threads.add(Thread.currentThread()); queue.offer(3); } }); @@ -65,7 +60,6 @@ public class EventThreadTest { for (int i = 0; i < 4; i++) { assertThat(queue.take(), is(i)); } - assertThat(threads.size(), is(1)); } @Test