From 474188641445f400938fd8d0694e7ad0597e7ab2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Faria?= Date: Thu, 28 Aug 2014 20:35:40 +0100 Subject: [PATCH 1/2] Refactor Emitter to remove a ConcurrentHashMap. --- .../com/github/nkzawa/emitter/Emitter.java | 66 ++++++++++++------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/src/main/java/com/github/nkzawa/emitter/Emitter.java b/src/main/java/com/github/nkzawa/emitter/Emitter.java index 9ea9ee2..a681dd4 100644 --- a/src/main/java/com/github/nkzawa/emitter/Emitter.java +++ b/src/main/java/com/github/nkzawa/emitter/Emitter.java @@ -2,6 +2,7 @@ package com.github.nkzawa.emitter; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -18,9 +19,6 @@ public class Emitter { private ConcurrentMap> callbacks = new ConcurrentHashMap>(); - private ConcurrentMap onceCallbacks = new ConcurrentHashMap(); - - /** * Listens on the event. * @param event event name. @@ -48,16 +46,7 @@ public class Emitter { * @return a reference to this object. */ public Emitter once(final String event, final Listener fn) { - Listener on = new Listener() { - @Override - public void call(Object... args) { - Emitter.this.off(event, this); - fn.call(args); - } - }; - - this.onceCallbacks.put(fn, on); - this.on(event, on); + this.on(event, new OnceListener(event, fn)); return this; } @@ -68,7 +57,6 @@ public class Emitter { */ public Emitter off() { this.callbacks.clear(); - this.onceCallbacks.clear(); return this; } @@ -79,12 +67,7 @@ public class Emitter { * @return a reference to this object. */ public Emitter off(String event) { - ConcurrentLinkedQueue callbacks = this.callbacks.remove(event); - if (callbacks != null) { - for (Listener fn : callbacks) { - this.onceCallbacks.remove(fn); - } - } + this.callbacks.remove(event); return this; } @@ -98,12 +81,28 @@ public class Emitter { public Emitter off(String event, Listener fn) { ConcurrentLinkedQueue callbacks = this.callbacks.get(event); if (callbacks != null) { - Listener off = this.onceCallbacks.remove(fn); - callbacks.remove(off != null ? off : fn); + Iterator it = callbacks.iterator(); + while (it.hasNext()) { + Listener internal = it.next(); + if (Emitter.sameAs(fn, internal)) { + it.remove(); + break; + } + } } return this; } + private static boolean sameAs(Listener fn, Listener internal) { + if (fn.equals(internal)) { + return true; + } else if (internal instanceof OnceListener) { + return fn.equals(((OnceListener) internal).fn); + } else { + return false; + } + } + /** * Executes each of listeners with the given args. * @@ -114,7 +113,6 @@ public class Emitter { public Emitter emit(String event, Object... args) { ConcurrentLinkedQueue callbacks = this.callbacks.get(event); if (callbacks != null) { - callbacks = new ConcurrentLinkedQueue(callbacks); for (Listener fn : callbacks) { fn.call(args); } @@ -131,7 +129,7 @@ public class Emitter { public List listeners(String event) { ConcurrentLinkedQueue callbacks = this.callbacks.get(event); return callbacks != null ? - new ArrayList(callbacks) : new ArrayList(); + new ArrayList(callbacks) : new ArrayList(0); } /** @@ -141,11 +139,29 @@ public class Emitter { * @return a reference to this object. */ public boolean hasListeners(String event) { - return !this.listeners(event).isEmpty(); + ConcurrentLinkedQueue callbacks = this.callbacks.get(event); + return callbacks != null && !callbacks.isEmpty(); } public static interface Listener { public void call(Object... args); } + + private class OnceListener implements Listener { + + public final String event; + public final Listener fn; + + public OnceListener(String event, Listener fn) { + this.event = event; + this.fn = fn; + } + + @Override + public void call(Object... args) { + Emitter.this.off(this.event, this); + this.fn.call(args); + } + } } From 794e24e879539f9cef535577c8593e78a01638a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9rgio=20Faria?= Date: Thu, 28 Aug 2014 21:42:10 +0100 Subject: [PATCH 2/2] The current EventThread doesn't need to be volatile MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Citing the JLS (Java 7) ยง17.4.5: > A call to start() on a thread happens-before any actions in > the started thread. Other threads calling isCurrent(), can see stale values of the static variable, as it doesn't affect the result. Nulling the thread variable, cannot be reordered with the new Thread because a synchronized(EventThread.class) precedes the first task submission on a new Executor, causing a happens-before relationship that ensures the null is already visible to the thread on netTick (that will create the EventThread). --- .../java/com/github/nkzawa/thread/EventThread.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/github/nkzawa/thread/EventThread.java b/src/main/java/com/github/nkzawa/thread/EventThread.java index 8921188..5ae896b 100644 --- a/src/main/java/com/github/nkzawa/thread/EventThread.java +++ b/src/main/java/com/github/nkzawa/thread/EventThread.java @@ -15,11 +15,12 @@ public class EventThread extends Thread { @Override public Thread newThread(Runnable runnable) { thread = new EventThread(runnable); + thread.setName("EventThread"); return thread; } }; - private static volatile EventThread thread; + private static EventThread thread; private static ExecutorService service; @@ -58,14 +59,16 @@ public class EventThread extends Thread { * @param task */ public static void nextTick(final Runnable task) { + ExecutorService executor; synchronized (EventThread.class) { counter++; - if (service == null || service.isShutdown()) { + if (service == null) { service = Executors.newSingleThreadExecutor(THREAD_FACTORY); } + executor = service; } - service.execute(new Runnable() { + executor.execute(new Runnable() { @Override public void run() { try { @@ -75,6 +78,7 @@ public class EventThread extends Thread { counter--; if (counter == 0) { service.shutdown(); + service = null; thread = null; } }