Merge pull request #10 from sergio91pt/master

Refactor Emitter to remove a ConcurrentHashMap.
This commit is contained in:
Naoyuki Kanezawa
2014-08-29 12:59:03 +09:00
2 changed files with 48 additions and 28 deletions

View File

@@ -2,6 +2,7 @@ package com.github.nkzawa.emitter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
@@ -18,9 +19,6 @@ public class Emitter {
private ConcurrentMap<String, ConcurrentLinkedQueue<Listener>> callbacks private ConcurrentMap<String, ConcurrentLinkedQueue<Listener>> callbacks
= new ConcurrentHashMap<String, ConcurrentLinkedQueue<Listener>>(); = new ConcurrentHashMap<String, ConcurrentLinkedQueue<Listener>>();
private ConcurrentMap<Listener, Listener> onceCallbacks = new ConcurrentHashMap<Listener, Listener>();
/** /**
* Listens on the event. * Listens on the event.
* @param event event name. * @param event event name.
@@ -48,16 +46,7 @@ public class Emitter {
* @return a reference to this object. * @return a reference to this object.
*/ */
public Emitter once(final String event, final Listener fn) { public Emitter once(final String event, final Listener fn) {
Listener on = new Listener() { this.on(event, new OnceListener(event, fn));
@Override
public void call(Object... args) {
Emitter.this.off(event, this);
fn.call(args);
}
};
this.onceCallbacks.put(fn, on);
this.on(event, on);
return this; return this;
} }
@@ -68,7 +57,6 @@ public class Emitter {
*/ */
public Emitter off() { public Emitter off() {
this.callbacks.clear(); this.callbacks.clear();
this.onceCallbacks.clear();
return this; return this;
} }
@@ -79,12 +67,7 @@ public class Emitter {
* @return a reference to this object. * @return a reference to this object.
*/ */
public Emitter off(String event) { public Emitter off(String event) {
ConcurrentLinkedQueue<Listener> callbacks = this.callbacks.remove(event); this.callbacks.remove(event);
if (callbacks != null) {
for (Listener fn : callbacks) {
this.onceCallbacks.remove(fn);
}
}
return this; return this;
} }
@@ -98,12 +81,28 @@ public class Emitter {
public Emitter off(String event, Listener fn) { public Emitter off(String event, Listener fn) {
ConcurrentLinkedQueue<Listener> callbacks = this.callbacks.get(event); ConcurrentLinkedQueue<Listener> callbacks = this.callbacks.get(event);
if (callbacks != null) { if (callbacks != null) {
Listener off = this.onceCallbacks.remove(fn); Iterator<Listener> it = callbacks.iterator();
callbacks.remove(off != null ? off : fn); while (it.hasNext()) {
Listener internal = it.next();
if (Emitter.sameAs(fn, internal)) {
it.remove();
break;
}
}
} }
return this; return this;
} }
private static boolean sameAs(Listener fn, Listener internal) {
if (fn.equals(internal)) {
return true;
} else if (internal instanceof OnceListener) {
return fn.equals(((OnceListener) internal).fn);
} else {
return false;
}
}
/** /**
* Executes each of listeners with the given args. * Executes each of listeners with the given args.
* *
@@ -114,7 +113,6 @@ public class Emitter {
public Emitter emit(String event, Object... args) { public Emitter emit(String event, Object... args) {
ConcurrentLinkedQueue<Listener> callbacks = this.callbacks.get(event); ConcurrentLinkedQueue<Listener> callbacks = this.callbacks.get(event);
if (callbacks != null) { if (callbacks != null) {
callbacks = new ConcurrentLinkedQueue<Listener>(callbacks);
for (Listener fn : callbacks) { for (Listener fn : callbacks) {
fn.call(args); fn.call(args);
} }
@@ -131,7 +129,7 @@ public class Emitter {
public List<Listener> listeners(String event) { public List<Listener> listeners(String event) {
ConcurrentLinkedQueue<Listener> callbacks = this.callbacks.get(event); ConcurrentLinkedQueue<Listener> callbacks = this.callbacks.get(event);
return callbacks != null ? return callbacks != null ?
new ArrayList<Listener>(callbacks) : new ArrayList<Listener>(); new ArrayList<Listener>(callbacks) : new ArrayList<Listener>(0);
} }
/** /**
@@ -141,11 +139,29 @@ public class Emitter {
* @return a reference to this object. * @return a reference to this object.
*/ */
public boolean hasListeners(String event) { public boolean hasListeners(String event) {
return !this.listeners(event).isEmpty(); ConcurrentLinkedQueue<Listener> callbacks = this.callbacks.get(event);
return callbacks != null && !callbacks.isEmpty();
} }
public static interface Listener { public static interface Listener {
public void call(Object... args); public void call(Object... args);
} }
private class OnceListener implements Listener {
public final String event;
public final Listener fn;
public OnceListener(String event, Listener fn) {
this.event = event;
this.fn = fn;
}
@Override
public void call(Object... args) {
Emitter.this.off(this.event, this);
this.fn.call(args);
}
}
} }

View File

@@ -15,11 +15,12 @@ public class EventThread extends Thread {
@Override @Override
public Thread newThread(Runnable runnable) { public Thread newThread(Runnable runnable) {
thread = new EventThread(runnable); thread = new EventThread(runnable);
thread.setName("EventThread");
return thread; return thread;
} }
}; };
private static volatile EventThread thread; private static EventThread thread;
private static ExecutorService service; private static ExecutorService service;
@@ -58,14 +59,16 @@ public class EventThread extends Thread {
* @param task * @param task
*/ */
public static void nextTick(final Runnable task) { public static void nextTick(final Runnable task) {
ExecutorService executor;
synchronized (EventThread.class) { synchronized (EventThread.class) {
counter++; counter++;
if (service == null || service.isShutdown()) { if (service == null) {
service = Executors.newSingleThreadExecutor(THREAD_FACTORY); service = Executors.newSingleThreadExecutor(THREAD_FACTORY);
} }
executor = service;
} }
service.execute(new Runnable() { executor.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
@@ -75,6 +78,7 @@ public class EventThread extends Thread {
counter--; counter--;
if (counter == 0) { if (counter == 0) {
service.shutdown(); service.shutdown();
service = null;
thread = null; thread = null;
} }
} }