diff options
7 files changed, 533 insertions, 618 deletions
diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncTask.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncTask.java new file mode 100644 index 00000000..6901e4ee --- /dev/null +++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncTask.java @@ -0,0 +1,98 @@ +package org.bukkit.craftbukkit.scheduler; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; + +import org.apache.commons.lang.UnhandledException; +import org.bukkit.plugin.Plugin; +import org.bukkit.scheduler.BukkitWorker; + + +class CraftAsyncTask extends CraftTask { + + private final LinkedList<BukkitWorker> workers = new LinkedList<BukkitWorker>(); + private final Map<Integer, CraftTask> runners; + + CraftAsyncTask(final Map<Integer, CraftTask> runners, final Plugin plugin, final Runnable task, final int id, final long delay) { + super(plugin, task, id, delay); + this.runners = runners; + } + + @Override + public boolean isSync() { + return false; + } + + @Override + public void run() { + final Thread thread = Thread.currentThread(); + synchronized(workers) { + if (getPeriod() == -2) { + // Never continue running after cancelled. + // Checking this with the lock is important! + return; + } + workers.add( + new BukkitWorker() { + public Thread getThread() { + return thread; + } + + public int getTaskId() { + return CraftAsyncTask.this.getTaskId(); + } + + public Plugin getOwner() { + return CraftAsyncTask.this.getOwner(); + } + }); + } + Throwable thrown = null; + try { + super.run(); + } catch (final Throwable t) { + thrown = t; + throw new UnhandledException( + String.format( + "Plugin %s generated an exception while executing task %s", + getOwner().getDescription().getFullName(), + getTaskId()), + thrown); + } finally { + // Cleanup is important for any async task, otherwise ghost tasks are everywhere + synchronized(workers) { + try { + final Iterator<BukkitWorker> workers = this.workers.iterator(); + boolean removed = false; + while (workers.hasNext()) { + if (workers.next().getThread() == thread) { + workers.remove(); + removed = true; // Don't throw exception + break; + } + } + if (!removed) { + throw new IllegalStateException( + String.format( + "Unable to remove worker %s on task %s for %s", + thread.getName(), + getTaskId(), + getOwner().getDescription().getFullName()), + thrown); // We don't want to lose the original exception, if any + } + } finally { + if (getPeriod() < 0 && workers.isEmpty()) { + // At this spot, we know we are the final async task being executed! + // Because we have the lock, nothing else is running or will run because delay < 0 + runners.remove(getTaskId()); + } + } + } + } + } + + LinkedList<BukkitWorker> getWorkers() { + return workers; + } +} diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftFuture.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftFuture.java index c30d00a7..de96ec9b 100644 --- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftFuture.java +++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftFuture.java @@ -7,100 +7,88 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -public class CraftFuture<T> implements Runnable, Future<T> { +import org.bukkit.plugin.Plugin; + +class CraftFuture<T> extends CraftTask implements Future<T> { - private final CraftScheduler craftScheduler; private final Callable<T> callable; - private final ObjectContainer<T> returnStore = new ObjectContainer<T>(); - private boolean done = false; - private boolean running = false; - private boolean cancelled = false; - private Exception e = null; - private int taskId = -1; + private T value; + private Exception exception = null; - CraftFuture(CraftScheduler craftScheduler, Callable<T> callable) { + CraftFuture(final Callable<T> callable, final Plugin plugin, final int id) { + super(plugin, null, id, -1l); this.callable = callable; - this.craftScheduler = craftScheduler; - } - - public void run() { - synchronized (this) { - if (cancelled) { - return; - } - running = true; - } - try { - returnStore.setObject(callable.call()); - } catch (Exception e) { - this.e = e; - } - synchronized (this) { - running = false; - done = true; - this.notify(); - } } - public T get() throws InterruptedException, ExecutionException { - try { - return get(0L, TimeUnit.MILLISECONDS); - } catch (TimeoutException te) {} - return null; - } - - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - synchronized (this) { - if (isDone()) { - return getResult(); - } - this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit)); - return getResult(); + public synchronized boolean cancel(final boolean mayInterruptIfRunning) { + if (getPeriod() != -1l) { + return false; } + setPeriod(-2l); + return true; } - public T getResult() throws ExecutionException { - if (cancelled) { - throw new CancellationException(); - } - if (e != null) { - throw new ExecutionException(e); - } - return returnStore.getObject(); + public boolean isCancelled() { + return getPeriod() == -2l; } public boolean isDone() { - synchronized (this) { - return done; - } + final long period = this.getPeriod(); + return period != -1l && period != -3l; } - public boolean isCancelled() { - synchronized (this) { - return cancelled; + public T get() throws CancellationException, InterruptedException, ExecutionException { + try { + return get(0, TimeUnit.MILLISECONDS); + } catch (final TimeoutException e) { + throw new Error(e); } } - public boolean cancel(boolean mayInterruptIfRunning) { - synchronized (this) { - if (cancelled) { - return false; + public synchronized T get(long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + timeout = unit.toMillis(timeout); + long period = this.getPeriod(); + while (true) { + if (period == -1l || period == -3l) { + this.wait(unit.toMillis(timeout)); + period = this.getPeriod(); + if (period == -1l || period == -3l) { + if (timeout == 0l) { + continue; + } + throw new TimeoutException(); + } } - cancelled = true; - if (taskId != -1) { - craftScheduler.cancelTask(taskId); + if (period == -2l) { + throw new CancellationException(); } - if (!running && !done) { - return true; - } else { - return false; + if (period == -4l) { + if (exception == null) { + return value; + } + throw new ExecutionException(exception); } + throw new IllegalStateException("Expected " + -1l + " to " + -4l + ", got " + period); } } - public void setTaskId(int taskId) { + @Override + public void run() { synchronized (this) { - this.taskId = taskId; + if (getPeriod() == -2l) { + return; + } + setPeriod(-3l); + } + try { + value = callable.call(); + } catch (final Exception e) { + exception = e; + } finally { + synchronized (this) { + setPeriod(-4l); + this.notifyAll(); + } } } } diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java index e2ecd6cf..6b70a78b 100644 --- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java +++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java @@ -1,385 +1,421 @@ package org.bukkit.craftbukkit.scheduler; import java.util.ArrayList; +import java.util.Comparator; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; -import java.util.TreeMap; +import java.util.PriorityQueue; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Level; -import java.util.logging.Logger; +import org.apache.commons.lang.Validate; import org.bukkit.plugin.Plugin; import org.bukkit.scheduler.BukkitScheduler; import org.bukkit.scheduler.BukkitTask; import org.bukkit.scheduler.BukkitWorker; -public class CraftScheduler implements BukkitScheduler, Runnable { - - private static final Logger logger = Logger.getLogger("Minecraft"); - - private final CraftThreadManager craftThreadManager = new CraftThreadManager(); - - private final LinkedList<CraftTask> mainThreadQueue = new LinkedList<CraftTask>(); - private final LinkedList<CraftTask> syncedTasks = new LinkedList<CraftTask>(); - - private final TreeMap<CraftTask, Boolean> schedulerQueue = new TreeMap<CraftTask, Boolean>(); +/** + * The fundamental concepts for this implementation: + * <li>Main thread owns {@link #head} and {@link #currentTick}, but it may be read from any thread</li> + * <li>Main thread exclusively controls {@link #temp} and {@link #pending}. + * They are never to be accessed outside of the main thread; alternatives exist to prevent locking.</li> + * <li>{@link #head} to {@link #tail} act as a linked list/queue, with 1 consumer and infinite producers. + * Adding to the tail is atomic and very efficient; utility method is {@link #handle(CraftTask, long)} or {@link #addTask(CraftTask)}. </li> + * <li>Changing the period on a task is delicate. + * Any future task needs to notify waiting threads. + * Async tasks must be synchronized to make sure that any thread that's finishing will remove itself from {@link #runners}. + * Another utility method is provided for this, {@link #cancelTask(CraftTask)}</li> + * <li>{@link #runners} provides a moderately up-to-date view of active tasks. + * If the linked head to tail set is read, all remaining tasks that were active at the time execution started will be located in runners.</li> + * <li>Async tasks are responsible for removing themselves from runners</li> + * <li>Sync tasks are only to be removed from runners on the main thread when coupled with a removal from pending and temp.</li> + * <li>Most of the design in this scheduler relies on queuing special tasks to perform any data changes on the main thread. + * When executed from inside a synchronous method, the scheduler will be updated before next execution by virtue of the frequent {@link #parsePending()} calls.</li> + */ +public class CraftScheduler implements BukkitScheduler { + + /** + * Counter for IDs. Order doesn't matter, only uniqueness. + */ + private final AtomicInteger ids = new AtomicInteger(1); + /** + * Current head of linked-list. This reference is always stale, {@link CraftTask#next} is the live reference. + */ + private volatile CraftTask head = new CraftTask(); + /** + * Tail of a linked-list. AtomicReference only matters when adding to queue + */ + private final AtomicReference<CraftTask> tail = new AtomicReference<CraftTask>(head); + /** + * Main thread logic only + */ + private final PriorityQueue<CraftTask> pending = new PriorityQueue<CraftTask>(10, + new Comparator<CraftTask>() { + public int compare(final CraftTask o1, final CraftTask o2) { + return (int) (o1.getNextRun() - o2.getNextRun()); + } + }); + /** + * Main thread logic only + */ + private final List<CraftTask> temp = new ArrayList<CraftTask>(); + /** + * These are tasks that are currently active. It's provided for 'viewing' the current state. + */ + private final ConcurrentHashMap<Integer, CraftTask> runners = new ConcurrentHashMap<Integer, CraftTask>(); + private volatile int currentTick = -1; + private final Executor executor = Executors.newCachedThreadPool(); + + public int scheduleSyncDelayedTask(final Plugin plugin, final Runnable task) { + return this.scheduleSyncDelayedTask(plugin, task, 0l); + } - private Long currentTick = 0L; + public int scheduleAsyncDelayedTask(final Plugin plugin, final Runnable task) { + return this.scheduleAsyncDelayedTask(plugin, task, 0l); + } - // This lock locks the mainThreadQueue and the currentTick value - private final Lock mainThreadLock = new ReentrantLock(); - private final Lock syncedTasksLock = new ReentrantLock(); + public int scheduleSyncDelayedTask(final Plugin plugin, final Runnable task, final long delay) { + return this.scheduleSyncRepeatingTask(plugin, task, delay, -1l); + } - public CraftScheduler() { - Thread t = new Thread(this); - t.start(); + public int scheduleAsyncDelayedTask(final Plugin plugin, final Runnable task, final long delay) { + return this.scheduleAsyncRepeatingTask(plugin, task, delay, -1l); } - public void run() { + public int scheduleSyncRepeatingTask(final Plugin plugin, final Runnable runnable, long delay, long period) { + validate(plugin, runnable); + if (delay < 0l) { + delay = 0; + } + if (period == 0l) { + period = 1l; + } else if (period < -1l) { + period = -1l; + } + return handle(new CraftTask(plugin, runnable, nextId(), period), delay); + } - while (true) { - boolean stop = false; - long firstTick = -1; - long currentTick = -1; - CraftTask first = null; - do { - synchronized (schedulerQueue) { - first = null; - if (!schedulerQueue.isEmpty()) { - first = schedulerQueue.firstKey(); - if (first != null) { - currentTick = getCurrentTick(); + public int scheduleAsyncRepeatingTask(final Plugin plugin, final Runnable runnable, long delay, long period) { + validate(plugin, runnable); + if (delay < 0l) { + delay = 0; + } + if (period == 0l) { + period = 1l; + } else if (period < -1l) { + period = -1l; + } + return handle(new CraftAsyncTask(runners, plugin, runnable, nextId(), period), delay); + } - firstTick = first.getExecutionTick(); + public <T> Future<T> callSyncMethod(final Plugin plugin, final Callable<T> task) { + validate(plugin, task); + final CraftFuture<T> future = new CraftFuture<T>(task, plugin, nextId()); + handle(future, 0l); + return future; + } - if (currentTick >= firstTick) { - schedulerQueue.remove(first); - processTask(first); - if (first.getPeriod() >= 0) { - first.updateExecution(); - schedulerQueue.put(first, first.isSync()); + public void cancelTask(final int taskId) { + if (taskId <= 0) { + return; + } + CraftTask task = runners.get(taskId); + if (task != null) { + cancelTask(task); + } + task = new CraftTask( + new Runnable() { + public void run() { + if (!check(CraftScheduler.this.temp)) { + check(CraftScheduler.this.pending); + } + } + private boolean check(final Iterable<CraftTask> collection) { + final Iterator<CraftTask> tasks = collection.iterator(); + while (tasks.hasNext()) { + final CraftTask task = tasks.next(); + if (task.getTaskId() == taskId) { + cancelTask(task); + tasks.remove(); + if (task.isSync()) { + runners.remove(taskId); } - } else { - stop = true; + return true; } - } else { - stop = true; } - } else { - stop = true; - } - } - } while (!stop); - - long sleepTime = 0; - if (first == null) { - sleepTime = 60000L; - } else { - currentTick = getCurrentTick(); - sleepTime = (firstTick - currentTick) * 50 + 25; + return false; + }}); + handle(task, 0l); + for (CraftTask taskPending = head.getNext(); taskPending != null; taskPending = taskPending.getNext()) { + if (taskPending == task) { + return; } - - if (sleepTime < 50L) { - sleepTime = 50L; - } else if (sleepTime > 60000L) { - sleepTime = 60000L; + if (taskPending.getTaskId() == taskId) { + cancelTask(taskPending); } - - synchronized (schedulerQueue) { - try { - schedulerQueue.wait(sleepTime); - } catch (InterruptedException ie) {} - } - } - } - - void processTask(CraftTask task) { - if (task.isSync()) { - addToMainThreadQueue(task); - } else { - craftThreadManager.executeTask(task.getTask(), task.getOwner(), task.getIdNumber()); } } - // If the main thread cannot obtain the lock, it doesn't wait - public void mainThreadHeartbeat(long currentTick) { - if (syncedTasksLock.tryLock()) { - try { - if (mainThreadLock.tryLock()) { - try { - this.currentTick = currentTick; - while (!mainThreadQueue.isEmpty()) { - syncedTasks.addLast(mainThreadQueue.removeFirst()); - } - } finally { - mainThreadLock.unlock(); + public void cancelTasks(final Plugin plugin) { + Validate.notNull(plugin, "Cannot cancel tasks of null plugin"); + final CraftTask task = new CraftTask( + new Runnable() { + public void run() { + check(CraftScheduler.this.pending); + check(CraftScheduler.this.temp); } - } - long breakTime = System.currentTimeMillis() + 35; // max time spent in loop = 35ms - while (!syncedTasks.isEmpty() && System.currentTimeMillis() <= breakTime) { - CraftTask task = syncedTasks.removeFirst(); - try { - task.getTask().run(); - } catch (Throwable t) { - // Bad plugin! - logger.log(Level.WARNING, "Task of '" + task.getOwner().getDescription().getName() + "' generated an exception", t); - synchronized (schedulerQueue) { - schedulerQueue.remove(task); + void check(final Iterable<CraftTask> collection) { + final Iterator<CraftTask> tasks = collection.iterator(); + while (tasks.hasNext()) { + final CraftTask task = tasks.next(); + if (task.getOwner().equals(plugin)) { + cancelTask(task); + tasks.remove(); + if (task.isSync()) { + runners.remove(task.getTaskId()); + } + break; + } } } - } - } finally { - syncedTasksLock.unlock(); + }); + handle(task, 0l); + for (CraftTask taskPending = head.getNext(); taskPending != null; taskPending = taskPending.getNext()) { + if (taskPending == task) { + return; + } + if (taskPending.getTaskId() != -1 && taskPending.getOwner().equals(plugin)) { + cancelTask(taskPending); } } - } - - long getCurrentTick() { - mainThreadLock.lock(); - long tempTick = 0; - try { - tempTick = currentTick; - } finally { - mainThreadLock.unlock(); + for (CraftTask runner : runners.values()) { + if (runner.getOwner().equals(plugin)) { + cancelTask(runner); + } } - return tempTick; } - void addToMainThreadQueue(CraftTask task) { - mainThreadLock.lock(); - try { - mainThreadQueue.addLast(task); - } finally { - mainThreadLock.unlock(); + public void cancelAllTasks() { + final CraftTask task = new CraftTask( + new Runnable() { + public void run() { + Iterator<CraftTask> it = CraftScheduler.this.runners.values().iterator(); + while (it.hasNext()) { + CraftTask task = it.next(); + cancelTask(task); + if (task.isSync()) { + it.remove(); + } + } + CraftScheduler.this.pending.clear(); + CraftScheduler.this.temp.clear(); + } + }); + handle(task, 0l); + for (CraftTask taskPending = head.getNext(); taskPending != null; taskPending = taskPending.getNext()) { + if (taskPending == task) { + break; + } + cancelTask(taskPending); } - } - - void wipeSyncedTasks() { - syncedTasksLock.lock(); - try { - syncedTasks.clear(); - } finally { - syncedTasksLock.unlock(); + for (CraftTask runner : runners.values()) { + cancelTask(runner); } } - void wipeMainThreadQueue() { - mainThreadLock.lock(); - try { - mainThreadQueue.clear(); - } finally { - mainThreadLock.unlock(); + public boolean isCurrentlyRunning(final int taskId) { + final CraftTask task = runners.get(taskId); + if (task == null || task.isSync()) { + return false; + } + final CraftAsyncTask asyncTask = (CraftAsyncTask) task; + synchronized (asyncTask.getWorkers()) { + return asyncTask.getWorkers().isEmpty(); } } - public int scheduleSyncDelayedTask(Plugin plugin, Runnable task, long delay) { - return scheduleSyncRepeatingTask(plugin, task, delay, -1); - } - - public int scheduleSyncDelayedTask(Plugin plugin, Runnable task) { - return scheduleSyncDelayedTask(plugin, task, 0L); - } - - public int scheduleSyncRepeatingTask(Plugin plugin, Runnable task, long delay, long period) { - if (plugin == null) { - throw new IllegalArgumentException("Plugin cannot be null"); - } - if (task == null) { - throw new IllegalArgumentException("Task cannot be null"); - } - if (delay < 0) { - throw new IllegalArgumentException("Delay cannot be less than 0"); + public boolean isQueued(final int taskId) { + if (taskId <= 0) { + return false; } - - CraftTask newTask = new CraftTask(plugin, task, true, getCurrentTick() + delay, period); - - synchronized (schedulerQueue) { - schedulerQueue.put(newTask, true); - schedulerQueue.notify(); + for (CraftTask task = head.getNext(); task != null; task = task.getNext()) { + if (task.getTaskId() == taskId) { + return task.getPeriod() >= -1l; // The task will run + } } - return newTask.getIdNumber(); - } - - public int scheduleAsyncDelayedTask(Plugin plugin, Runnable task, long delay) { - return scheduleAsyncRepeatingTask(plugin, task, delay, -1); + CraftTask task = runners.get(taskId); + return task != null && task.getPeriod() >= -1l; } - public int scheduleAsyncDelayedTask(Plugin plugin, Runnable task) { - return scheduleAsyncDelayedTask(plugin, task, 0L); + public List<BukkitWorker> getActiveWorkers() { + final ArrayList<BukkitWorker> workers = new ArrayList<BukkitWorker>(); + for (final CraftTask taskObj : runners.values()) { + // Iterator will be a best-effort (may fail to grab very new values) if called from an async thread + if (taskObj.isSync()) { + continue; + } + final CraftAsyncTask task = (CraftAsyncTask) taskObj; + synchronized (task.getWorkers()) { + // This will never have an issue with stale threads; it's state-safe + workers.addAll(task.getWorkers()); + } + } + return workers; } - public int scheduleAsyncRepeatingTask(Plugin plugin, Runnable task, long delay, long period) { - if (plugin == null) { - throw new IllegalArgumentException("Plugin cannot be null"); - } - if (task == null) { - throw new IllegalArgumentException("Task cannot be null"); - } - if (delay < 0) { - throw new IllegalArgumentException("Delay cannot be less than 0"); + public List<BukkitTask> getPendingTasks() { + final ArrayList<CraftTask> truePending = new ArrayList<CraftTask>(); + for (CraftTask task = head.getNext(); task != null; task = task.getNext()) { + if (task.getTaskId() != -1) { + // -1 is special code + truePending.add(task); + } } - CraftTask newTask = new CraftTask(plugin, task, false, getCurrentTick() + delay, period); - - synchronized (schedulerQueue) { - schedulerQueue.put(newTask, false); - schedulerQueue.notify(); + final ArrayList<BukkitTask> pending = new ArrayList<BukkitTask>(); + final Iterator<CraftTask> it = runners.values().iterator(); + while (it.hasNext()) { + final CraftTask task = it.next(); + if (task.getPeriod() >= -1l) { + pending.add(task); + } } - return newTask.getIdNumber(); - } - public <T> Future<T> callSyncMethod(Plugin plugin, Callable<T> task) { - CraftFuture<T> craftFuture = new CraftFuture<T>(this, task); - synchronized (craftFuture) { - int taskId = scheduleSyncDelayedTask(plugin, craftFuture); - craftFuture.setTaskId(taskId); + for (final CraftTask task : truePending) { + if (task.getPeriod() >= -1l && !pending.contains(task)) { + pending.add(task); + } } - return craftFuture; + return pending; } - public void cancelTask(int taskId) { - syncedTasksLock.lock(); - try { - synchronized (schedulerQueue) { - mainThreadLock.lock(); - try { - Iterator<CraftTask> itr = schedulerQueue.keySet().iterator(); - while (itr.hasNext()) { - CraftTask current = itr.next(); - if (current.getIdNumber() == taskId) { - itr.remove(); - } - } - itr = mainThreadQueue.iterator(); - while (itr.hasNext()) { - CraftTask current = itr.next(); - if (current.getIdNumber() == taskId) { - itr.remove(); - } - } - itr = syncedTasks.iterator(); - while (itr.hasNext()) { - CraftTask current = itr.next(); - if (current.getIdNumber() == taskId) { - itr.remove(); - } - } - } finally { - mainThreadLock.unlock(); + /** + * This method is designed to never block or wait for locks; an immediate execution of all current tasks. + */ + public void mainThreadHeartbeat(final int currentTick) { + this.currentTick = currentTick; + final List<CraftTask> temp = this.temp; + parsePending(); + while (isReady(currentTick)) { + final CraftTask task = pending.remove(); + if (task.getPeriod() < -1l) { + if (task.isSync()) { + runners.remove(task.getTaskId(), task); } + parsePending(); + continue; } - } finally { - syncedTasksLock.unlock(); - } - - craftThreadManager.interruptTask(taskId); - } - - public void cancelTasks(Plugin plugin) { - syncedTasksLock.lock(); - try { - synchronized (schedulerQueue) { - mainThreadLock.lock(); + if (task.isSync()) { try { - Iterator<CraftTask> itr = schedulerQueue.keySet().iterator(); - while (itr.hasNext()) { - CraftTask current = itr.next(); - if (current.getOwner().equals(plugin)) { - itr.remove(); - } - } - itr = mainThreadQueue.iterator(); - while (itr.hasNext()) { - CraftTask current = itr.next(); - if (current.getOwner().equals(plugin)) { - itr.remove(); - } - } - itr = syncedTasks.iterator(); - while (itr.hasNext()) { - CraftTask current = itr.next(); - if (current.getOwner().equals(plugin)) { - itr.remove(); - } - } - } finally { - mainThreadLock.unlock(); + task.run(); + } catch (final Throwable throwable) { + task.getOwner().getLogger().log( + Level.WARNING, + String.format( + "Task #%s for %s generated an exception", + task.getTaskId(), + task.getOwner().getDescription().getFullName()), + throwable); } + parsePending(); + } else { + executor.execute(task); + // We don't need to parse pending + // (async tasks must live with race-conditions if they attempt to cancel between these few lines of code) + } + final long period = task.getPeriod(); // State consistency + if (period > 0) { + task.setNextRun(currentTick + period); + temp.add(task); + } else if (task.isSync()) { + runners.remove(task.getTaskId()); } - } finally { - syncedTasksLock.unlock(); } - - craftThreadManager.interruptTasks(plugin); + pending.addAll(temp); + temp.clear(); } - public void cancelAllTasks() { - synchronized (schedulerQueue) { - schedulerQueue.clear(); + private void addTask(final CraftTask task) { + final AtomicReference<CraftTask> tail = this.tail; + CraftTask tailTask = tail.get(); + while (!tail.compareAndSet(tailTask, task)) { + tailTask = tail.get(); } - wipeMainThreadQueue(); - wipeSyncedTasks(); - - craftThreadManager.interruptAllTasks(); + tailTask.setNext(task); } - public boolean isCurrentlyRunning(int taskId) { - return craftThreadManager.isAlive(taskId); + private int handle(final CraftTask task, final long delay) { + task.setNextRun(currentTick + delay); + addTask(task); + return task.getTaskId(); } - public boolean isQueued(int taskId) { - synchronized (schedulerQueue) { - Iterator<CraftTask> itr = schedulerQueue.keySet().iterator(); - while (itr.hasNext()) { - CraftTask current = itr.next(); - if (current.getIdNumber() == taskId) { - return true; - } - } - return false; - } + private static void validate(final Plugin plugin, final Object task) { + Validate.notNull(plugin, "Plugin cannot be null"); + Validate.notNull(task, "Task cannot be null"); } - public List<BukkitWorker> getActiveWorkers() { - synchronized (craftThreadManager.workers) { - List<BukkitWorker> workerList = new ArrayList<BukkitWorker>(craftThreadManager.workers.size()); - Iterator<CraftWorker> itr = craftThreadManager.workers.iterator(); + private int nextId() { + return ids.incrementAndGet(); + } - while (itr.hasNext()) { - workerList.add((BukkitWorker) itr.next()); + private void parsePending() { + CraftTask head = this.head; + CraftTask task = head.getNext(); + CraftTask lastTask = head; + for (; task != null; task = (lastTask = task).getNext()) { + if (task.getTaskId() == -1) { + task.run(); + } else if (task.getPeriod() >= -1l) { + pending.add(task); + runners.put(task.getTaskId(), task); } - return workerList; } + // We split this because of the way things are ordered for all of the async calls in CraftScheduler + // (it prevents race-conditions) + for (task = head; task != lastTask; task = head) { + head = task.getNext(); + task.setNext(null); + } + this.head = lastTask; } - public List<BukkitTask> getPendingTasks() { - List<CraftTask> taskList = null; - syncedTasksLock.lock(); - try { - synchronized (schedulerQueue) { - mainThreadLock.lock(); - try { - taskList = new ArrayList<CraftTask>(mainThreadQueue.size() + syncedTasks.size() + schedulerQueue.size()); - taskList.addAll(mainThreadQueue); - taskList.addAll(syncedTasks); - taskList.addAll(schedulerQueue.keySet()); - } finally { - mainThreadLock.unlock(); + private boolean isReady(final int currentTick) { + return !pending.isEmpty() && pending.peek().getNextRun() <= currentTick; + } + + /** + * This method is important to make sure the code is consistent everywhere. + * Synchronizing is needed for future and async to prevent race conditions, + * main thread or otherwise. + * @return True if cancelled + */ + private boolean cancelTask(final CraftTask task) { + if (task.isSync()) { + if (task instanceof CraftFuture) { + synchronized (task) { + if (task.getPeriod() != -1l) { + return false; + } + // This needs to be set INSIDE of the synchronized block + task.setPeriod(-2l); + task.notifyAll(); } + } else { + task.setPeriod(-2l); + } + } else { + synchronized (((CraftAsyncTask) task).getWorkers()) { + // Synchronizing here prevents race condition for a completing task + task.setPeriod(-2l); } - } finally { - syncedTasksLock.unlock(); - } - List<BukkitTask> newTaskList = new ArrayList<BukkitTask>(taskList.size()); - - for (CraftTask craftTask : taskList) { - newTaskList.add((BukkitTask) craftTask); } - return newTaskList; + return true; } - } diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java index 2304719e..8e56766d 100644 --- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java +++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java @@ -1,112 +1,78 @@ package org.bukkit.craftbukkit.scheduler; -import java.lang.Comparable; - import org.bukkit.plugin.Plugin; import org.bukkit.scheduler.BukkitTask; -public class CraftTask implements Comparable<Object>, BukkitTask { - private final Runnable task; - private final boolean syncTask; - private long executionTick; - private final long period; - private final Plugin owner; - private final int idNumber; +class CraftTask implements BukkitTask, Runnable { - private static Integer idCounter = 1; - private static Object idCounterSync = new Object(); + private volatile CraftTask next = null; + /** + * -1 means no repeating <br> + * -2 means cancel <br> + * -3 means processing for Future <br> + * -4 means done for Future <br> + * Never 0 <br> + * >0 means number of ticks to wait between each execution + */ + private volatile long period; + private long nextRun; + private final Runnable task; + private final Plugin plugin; + private final int id; - CraftTask(Plugin owner, Runnable task, boolean syncTask) { - this(owner, task, syncTask, -1, -1); + CraftTask() { + this(null, null, -1, -1); } - CraftTask(Plugin owner, Runnable task, boolean syncTask, long executionTick) { - this(owner, task, syncTask, executionTick, -1); + CraftTask(final Runnable task) { + this(null, task, -1, -1); } - CraftTask(Plugin owner, Runnable task, boolean syncTask, long executionTick, long period) { + CraftTask(final Plugin plugin, final Runnable task, final int id, final long period) { + this.plugin = plugin; this.task = task; - this.syncTask = syncTask; - this.executionTick = executionTick; + this.id = id; this.period = period; - this.owner = owner; - this.idNumber = CraftTask.getNextId(); } - static int getNextId() { - synchronized (idCounterSync) { - idCounter++; - return idCounter; - } + public final int getTaskId() { + return id; } - Runnable getTask() { - return task; + public final Plugin getOwner() { + return plugin; } public boolean isSync() { - return syncTask; + return true; } - long getExecutionTick() { - return executionTick; + public void run() { + task.run(); } long getPeriod() { return period; } - public Plugin getOwner() { - return owner; - } - - void updateExecution() { - executionTick += period; - } - - public int getTaskId() { - return getIdNumber(); + void setPeriod(long period) { + this.period = period; } - int getIdNumber() { - return idNumber; + long getNextRun() { + return nextRun; } - public int compareTo(Object other) { - if (!(other instanceof CraftTask)) { - return 0; - } else { - CraftTask o = (CraftTask) other; - long timeDiff = executionTick - o.getExecutionTick(); - if (timeDiff > 0) { - return 1; - } else if (timeDiff < 0) { - return -1; - } else { - CraftTask otherCraftTask = (CraftTask) other; - return getIdNumber() - otherCraftTask.getIdNumber(); - } - } + void setNextRun(long nextRun) { + this.nextRun = nextRun; } - @Override - public boolean equals(Object other) { - - if (other == null) { - return false; - } - - if (!(other instanceof CraftTask)) { - return false; - } - - CraftTask otherCraftTask = (CraftTask) other; - return otherCraftTask.getIdNumber() == getIdNumber(); + CraftTask getNext() { + return next; } - @Override - public int hashCode() { - return getIdNumber(); + void setNext(CraftTask next) { + this.next = next; } } diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftThreadManager.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftThreadManager.java deleted file mode 100644 index 29defcda..00000000 --- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftThreadManager.java +++ /dev/null @@ -1,68 +0,0 @@ -package org.bukkit.craftbukkit.scheduler; - -import java.util.HashSet; -import java.util.Iterator; - -import org.bukkit.plugin.Plugin; - -public class CraftThreadManager { - - final HashSet<CraftWorker> workers = new HashSet<CraftWorker>(); - - void executeTask(Runnable task, Plugin owner, int taskId) { - - CraftWorker craftWorker = new CraftWorker(this, task, owner, taskId); - synchronized (workers) { - workers.add(craftWorker); - } - - } - - void interruptTask(int taskId) { - synchronized (workers) { - Iterator<CraftWorker> itr = workers.iterator(); - while (itr.hasNext()) { - CraftWorker craftWorker = itr.next(); - if (craftWorker.getTaskId() == taskId) { - craftWorker.interrupt(); - } - } - } - } - - void interruptTasks(Plugin owner) { - synchronized (workers) { - Iterator<CraftWorker> itr = workers.iterator(); - while (itr.hasNext()) { - CraftWorker craftWorker = itr.next(); - if (craftWorker.getOwner().equals(owner)) { - craftWorker.interrupt(); - } - } - } - } - - void interruptAllTasks() { - synchronized (workers) { - Iterator<CraftWorker> itr = workers.iterator(); - while (itr.hasNext()) { - CraftWorker craftWorker = itr.next(); - craftWorker.interrupt(); - } - } - } - - boolean isAlive(int taskId) { - synchronized (workers) { - Iterator<CraftWorker> itr = workers.iterator(); - while (itr.hasNext()) { - CraftWorker craftWorker = itr.next(); - if (craftWorker.getTaskId() == taskId) { - return craftWorker.isAlive(); - } - } - // didn't find it, so it must have been removed - return false; - } - } -} diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftWorker.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftWorker.java deleted file mode 100644 index 94aa411c..00000000 --- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftWorker.java +++ /dev/null @@ -1,90 +0,0 @@ -package org.bukkit.craftbukkit.scheduler; - -import org.bukkit.plugin.Plugin; -import org.bukkit.scheduler.BukkitWorker; - -public class CraftWorker implements Runnable, BukkitWorker { - - private static int hashIdCounter = 1; - private static Object hashIdCounterSync = new Object(); - - private final int hashId; - - private final Plugin owner; - private final int taskId; - - private final Thread t; - private final CraftThreadManager parent; - - private final Runnable task; - - CraftWorker(CraftThreadManager parent, Runnable task, Plugin owner, int taskId) { - this.parent = parent; - this.taskId = taskId; - this.task = task; - this.owner = owner; - this.hashId = CraftWorker.getNextHashId(); - t = new Thread(this); - t.start(); - } - - public void run() { - - try { - task.run(); - } catch (Exception e) { - e.printStackTrace(); - } - - synchronized (parent.workers) { - parent.workers.remove(this); - } - - } - - public int getTaskId() { - return taskId; - } - - public Plugin getOwner() { - return owner; - } - - public Thread getThread() { - return t; - } - - public void interrupt() { - t.interrupt(); - } - - public boolean isAlive() { - return t.isAlive(); - } - - private static int getNextHashId() { - synchronized (hashIdCounterSync) { - return hashIdCounter++; - } - } - - @Override - public int hashCode() { - return hashId; - } - - @Override - public boolean equals(Object other) { - if (other == null) { - return false; - } - - if (!(other instanceof CraftWorker)) { - return false; - } - - CraftWorker otherCraftWorker = (CraftWorker) other; - return otherCraftWorker.hashCode() == hashId; - } - -} diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/ObjectContainer.java b/src/main/java/org/bukkit/craftbukkit/scheduler/ObjectContainer.java deleted file mode 100644 index c0fa37cb..00000000 --- a/src/main/java/org/bukkit/craftbukkit/scheduler/ObjectContainer.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.bukkit.craftbukkit.scheduler; - -public class ObjectContainer<T> { - - T object; - - public void setObject(T object) { - this.object = object; - } - - public T getObject() { - return object; - } - -} |