summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorWesley Wolfe <weswolf@aol.com>2012-08-14 06:09:51 -0500
committerfeildmaster <admin@feildmaster.com>2012-08-22 16:41:46 -0500
commitdcd01bf0c0b87d1aaa1ec46bf58bd2750d0a9904 (patch)
tree2de4c1d90ae972dbb326e0b6e04308b505eaaf5f
parent8fdb006143f34a5bbfaadf4d1677ac7fb40403fb (diff)
downloadcraftbukkit-dcd01bf0c0b87d1aaa1ec46bf58bd2750d0a9904.tar
craftbukkit-dcd01bf0c0b87d1aaa1ec46bf58bd2750d0a9904.tar.gz
craftbukkit-dcd01bf0c0b87d1aaa1ec46bf58bd2750d0a9904.tar.lz
craftbukkit-dcd01bf0c0b87d1aaa1ec46bf58bd2750d0a9904.tar.xz
craftbukkit-dcd01bf0c0b87d1aaa1ec46bf58bd2750d0a9904.zip
Rewrite scheduler. Fixes BUKKIT-1831, and BUKKIT-845
The new scheduler uses a non-blocking methodology. Combining volatile references to make a linked reference chain, with the atomic reference handling the tail, tasks are queued without waiting for locks. The main thread will no longer limit the length of time spend for scheduled tasks, but no task will run twice in the same tick. Scheduling a new task inside of a synchronous task will always run the new task during the same tick, assuming there is no supplied delay > 0. Asynchronous tasks are now run using a thread pool. Any thread-local implemenation should now account for threads being reused between executions. Race conditions were carefully examined and the order of logic is now very important. Each task is placed in a secondary collection before removal from primary collections. Thus, by reading tasks from the collections in the same order they travel, it retains state-safety. This does make modifications less responsive in some situations, as the task may be transitioning before the modifier accesses it. This cost outweighs the requirement to synchronize on the scheduler; previously any conflict would be first-come-first-serve, with the main thread backing out arbitrarily.
-rw-r--r--src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncTask.java98
-rw-r--r--src/main/java/org/bukkit/craftbukkit/scheduler/CraftFuture.java128
-rw-r--r--src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java640
-rw-r--r--src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java112
-rw-r--r--src/main/java/org/bukkit/craftbukkit/scheduler/CraftThreadManager.java68
-rw-r--r--src/main/java/org/bukkit/craftbukkit/scheduler/CraftWorker.java90
-rw-r--r--src/main/java/org/bukkit/craftbukkit/scheduler/ObjectContainer.java15
7 files changed, 533 insertions, 618 deletions
diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncTask.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncTask.java
new file mode 100644
index 00000000..6901e4ee
--- /dev/null
+++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncTask.java
@@ -0,0 +1,98 @@
+package org.bukkit.craftbukkit.scheduler;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
+import org.apache.commons.lang.UnhandledException;
+import org.bukkit.plugin.Plugin;
+import org.bukkit.scheduler.BukkitWorker;
+
+
+class CraftAsyncTask extends CraftTask {
+
+ private final LinkedList<BukkitWorker> workers = new LinkedList<BukkitWorker>();
+ private final Map<Integer, CraftTask> runners;
+
+ CraftAsyncTask(final Map<Integer, CraftTask> runners, final Plugin plugin, final Runnable task, final int id, final long delay) {
+ super(plugin, task, id, delay);
+ this.runners = runners;
+ }
+
+ @Override
+ public boolean isSync() {
+ return false;
+ }
+
+ @Override
+ public void run() {
+ final Thread thread = Thread.currentThread();
+ synchronized(workers) {
+ if (getPeriod() == -2) {
+ // Never continue running after cancelled.
+ // Checking this with the lock is important!
+ return;
+ }
+ workers.add(
+ new BukkitWorker() {
+ public Thread getThread() {
+ return thread;
+ }
+
+ public int getTaskId() {
+ return CraftAsyncTask.this.getTaskId();
+ }
+
+ public Plugin getOwner() {
+ return CraftAsyncTask.this.getOwner();
+ }
+ });
+ }
+ Throwable thrown = null;
+ try {
+ super.run();
+ } catch (final Throwable t) {
+ thrown = t;
+ throw new UnhandledException(
+ String.format(
+ "Plugin %s generated an exception while executing task %s",
+ getOwner().getDescription().getFullName(),
+ getTaskId()),
+ thrown);
+ } finally {
+ // Cleanup is important for any async task, otherwise ghost tasks are everywhere
+ synchronized(workers) {
+ try {
+ final Iterator<BukkitWorker> workers = this.workers.iterator();
+ boolean removed = false;
+ while (workers.hasNext()) {
+ if (workers.next().getThread() == thread) {
+ workers.remove();
+ removed = true; // Don't throw exception
+ break;
+ }
+ }
+ if (!removed) {
+ throw new IllegalStateException(
+ String.format(
+ "Unable to remove worker %s on task %s for %s",
+ thread.getName(),
+ getTaskId(),
+ getOwner().getDescription().getFullName()),
+ thrown); // We don't want to lose the original exception, if any
+ }
+ } finally {
+ if (getPeriod() < 0 && workers.isEmpty()) {
+ // At this spot, we know we are the final async task being executed!
+ // Because we have the lock, nothing else is running or will run because delay < 0
+ runners.remove(getTaskId());
+ }
+ }
+ }
+ }
+ }
+
+ LinkedList<BukkitWorker> getWorkers() {
+ return workers;
+ }
+}
diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftFuture.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftFuture.java
index c30d00a7..de96ec9b 100644
--- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftFuture.java
+++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftFuture.java
@@ -7,100 +7,88 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-public class CraftFuture<T> implements Runnable, Future<T> {
+import org.bukkit.plugin.Plugin;
+
+class CraftFuture<T> extends CraftTask implements Future<T> {
- private final CraftScheduler craftScheduler;
private final Callable<T> callable;
- private final ObjectContainer<T> returnStore = new ObjectContainer<T>();
- private boolean done = false;
- private boolean running = false;
- private boolean cancelled = false;
- private Exception e = null;
- private int taskId = -1;
+ private T value;
+ private Exception exception = null;
- CraftFuture(CraftScheduler craftScheduler, Callable<T> callable) {
+ CraftFuture(final Callable<T> callable, final Plugin plugin, final int id) {
+ super(plugin, null, id, -1l);
this.callable = callable;
- this.craftScheduler = craftScheduler;
- }
-
- public void run() {
- synchronized (this) {
- if (cancelled) {
- return;
- }
- running = true;
- }
- try {
- returnStore.setObject(callable.call());
- } catch (Exception e) {
- this.e = e;
- }
- synchronized (this) {
- running = false;
- done = true;
- this.notify();
- }
}
- public T get() throws InterruptedException, ExecutionException {
- try {
- return get(0L, TimeUnit.MILLISECONDS);
- } catch (TimeoutException te) {}
- return null;
- }
-
- public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- synchronized (this) {
- if (isDone()) {
- return getResult();
- }
- this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
- return getResult();
+ public synchronized boolean cancel(final boolean mayInterruptIfRunning) {
+ if (getPeriod() != -1l) {
+ return false;
}
+ setPeriod(-2l);
+ return true;
}
- public T getResult() throws ExecutionException {
- if (cancelled) {
- throw new CancellationException();
- }
- if (e != null) {
- throw new ExecutionException(e);
- }
- return returnStore.getObject();
+ public boolean isCancelled() {
+ return getPeriod() == -2l;
}
public boolean isDone() {
- synchronized (this) {
- return done;
- }
+ final long period = this.getPeriod();
+ return period != -1l && period != -3l;
}
- public boolean isCancelled() {
- synchronized (this) {
- return cancelled;
+ public T get() throws CancellationException, InterruptedException, ExecutionException {
+ try {
+ return get(0, TimeUnit.MILLISECONDS);
+ } catch (final TimeoutException e) {
+ throw new Error(e);
}
}
- public boolean cancel(boolean mayInterruptIfRunning) {
- synchronized (this) {
- if (cancelled) {
- return false;
+ public synchronized T get(long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ timeout = unit.toMillis(timeout);
+ long period = this.getPeriod();
+ while (true) {
+ if (period == -1l || period == -3l) {
+ this.wait(unit.toMillis(timeout));
+ period = this.getPeriod();
+ if (period == -1l || period == -3l) {
+ if (timeout == 0l) {
+ continue;
+ }
+ throw new TimeoutException();
+ }
}
- cancelled = true;
- if (taskId != -1) {
- craftScheduler.cancelTask(taskId);
+ if (period == -2l) {
+ throw new CancellationException();
}
- if (!running && !done) {
- return true;
- } else {
- return false;
+ if (period == -4l) {
+ if (exception == null) {
+ return value;
+ }
+ throw new ExecutionException(exception);
}
+ throw new IllegalStateException("Expected " + -1l + " to " + -4l + ", got " + period);
}
}
- public void setTaskId(int taskId) {
+ @Override
+ public void run() {
synchronized (this) {
- this.taskId = taskId;
+ if (getPeriod() == -2l) {
+ return;
+ }
+ setPeriod(-3l);
+ }
+ try {
+ value = callable.call();
+ } catch (final Exception e) {
+ exception = e;
+ } finally {
+ synchronized (this) {
+ setPeriod(-4l);
+ this.notifyAll();
+ }
}
}
}
diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java
index e2ecd6cf..6b70a78b 100644
--- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java
+++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java
@@ -1,385 +1,421 @@
package org.bukkit.craftbukkit.scheduler;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
-import java.util.TreeMap;
+import java.util.PriorityQueue;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
-import java.util.logging.Logger;
+import org.apache.commons.lang.Validate;
import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitScheduler;
import org.bukkit.scheduler.BukkitTask;
import org.bukkit.scheduler.BukkitWorker;
-public class CraftScheduler implements BukkitScheduler, Runnable {
-
- private static final Logger logger = Logger.getLogger("Minecraft");
-
- private final CraftThreadManager craftThreadManager = new CraftThreadManager();
-
- private final LinkedList<CraftTask> mainThreadQueue = new LinkedList<CraftTask>();
- private final LinkedList<CraftTask> syncedTasks = new LinkedList<CraftTask>();
-
- private final TreeMap<CraftTask, Boolean> schedulerQueue = new TreeMap<CraftTask, Boolean>();
+/**
+ * The fundamental concepts for this implementation:
+ * <li>Main thread owns {@link #head} and {@link #currentTick}, but it may be read from any thread</li>
+ * <li>Main thread exclusively controls {@link #temp} and {@link #pending}.
+ * They are never to be accessed outside of the main thread; alternatives exist to prevent locking.</li>
+ * <li>{@link #head} to {@link #tail} act as a linked list/queue, with 1 consumer and infinite producers.
+ * Adding to the tail is atomic and very efficient; utility method is {@link #handle(CraftTask, long)} or {@link #addTask(CraftTask)}. </li>
+ * <li>Changing the period on a task is delicate.
+ * Any future task needs to notify waiting threads.
+ * Async tasks must be synchronized to make sure that any thread that's finishing will remove itself from {@link #runners}.
+ * Another utility method is provided for this, {@link #cancelTask(CraftTask)}</li>
+ * <li>{@link #runners} provides a moderately up-to-date view of active tasks.
+ * If the linked head to tail set is read, all remaining tasks that were active at the time execution started will be located in runners.</li>
+ * <li>Async tasks are responsible for removing themselves from runners</li>
+ * <li>Sync tasks are only to be removed from runners on the main thread when coupled with a removal from pending and temp.</li>
+ * <li>Most of the design in this scheduler relies on queuing special tasks to perform any data changes on the main thread.
+ * When executed from inside a synchronous method, the scheduler will be updated before next execution by virtue of the frequent {@link #parsePending()} calls.</li>
+ */
+public class CraftScheduler implements BukkitScheduler {
+
+ /**
+ * Counter for IDs. Order doesn't matter, only uniqueness.
+ */
+ private final AtomicInteger ids = new AtomicInteger(1);
+ /**
+ * Current head of linked-list. This reference is always stale, {@link CraftTask#next} is the live reference.
+ */
+ private volatile CraftTask head = new CraftTask();
+ /**
+ * Tail of a linked-list. AtomicReference only matters when adding to queue
+ */
+ private final AtomicReference<CraftTask> tail = new AtomicReference<CraftTask>(head);
+ /**
+ * Main thread logic only
+ */
+ private final PriorityQueue<CraftTask> pending = new PriorityQueue<CraftTask>(10,
+ new Comparator<CraftTask>() {
+ public int compare(final CraftTask o1, final CraftTask o2) {
+ return (int) (o1.getNextRun() - o2.getNextRun());
+ }
+ });
+ /**
+ * Main thread logic only
+ */
+ private final List<CraftTask> temp = new ArrayList<CraftTask>();
+ /**
+ * These are tasks that are currently active. It's provided for 'viewing' the current state.
+ */
+ private final ConcurrentHashMap<Integer, CraftTask> runners = new ConcurrentHashMap<Integer, CraftTask>();
+ private volatile int currentTick = -1;
+ private final Executor executor = Executors.newCachedThreadPool();
+
+ public int scheduleSyncDelayedTask(final Plugin plugin, final Runnable task) {
+ return this.scheduleSyncDelayedTask(plugin, task, 0l);
+ }
- private Long currentTick = 0L;
+ public int scheduleAsyncDelayedTask(final Plugin plugin, final Runnable task) {
+ return this.scheduleAsyncDelayedTask(plugin, task, 0l);
+ }
- // This lock locks the mainThreadQueue and the currentTick value
- private final Lock mainThreadLock = new ReentrantLock();
- private final Lock syncedTasksLock = new ReentrantLock();
+ public int scheduleSyncDelayedTask(final Plugin plugin, final Runnable task, final long delay) {
+ return this.scheduleSyncRepeatingTask(plugin, task, delay, -1l);
+ }
- public CraftScheduler() {
- Thread t = new Thread(this);
- t.start();
+ public int scheduleAsyncDelayedTask(final Plugin plugin, final Runnable task, final long delay) {
+ return this.scheduleAsyncRepeatingTask(plugin, task, delay, -1l);
}
- public void run() {
+ public int scheduleSyncRepeatingTask(final Plugin plugin, final Runnable runnable, long delay, long period) {
+ validate(plugin, runnable);
+ if (delay < 0l) {
+ delay = 0;
+ }
+ if (period == 0l) {
+ period = 1l;
+ } else if (period < -1l) {
+ period = -1l;
+ }
+ return handle(new CraftTask(plugin, runnable, nextId(), period), delay);
+ }
- while (true) {
- boolean stop = false;
- long firstTick = -1;
- long currentTick = -1;
- CraftTask first = null;
- do {
- synchronized (schedulerQueue) {
- first = null;
- if (!schedulerQueue.isEmpty()) {
- first = schedulerQueue.firstKey();
- if (first != null) {
- currentTick = getCurrentTick();
+ public int scheduleAsyncRepeatingTask(final Plugin plugin, final Runnable runnable, long delay, long period) {
+ validate(plugin, runnable);
+ if (delay < 0l) {
+ delay = 0;
+ }
+ if (period == 0l) {
+ period = 1l;
+ } else if (period < -1l) {
+ period = -1l;
+ }
+ return handle(new CraftAsyncTask(runners, plugin, runnable, nextId(), period), delay);
+ }
- firstTick = first.getExecutionTick();
+ public <T> Future<T> callSyncMethod(final Plugin plugin, final Callable<T> task) {
+ validate(plugin, task);
+ final CraftFuture<T> future = new CraftFuture<T>(task, plugin, nextId());
+ handle(future, 0l);
+ return future;
+ }
- if (currentTick >= firstTick) {
- schedulerQueue.remove(first);
- processTask(first);
- if (first.getPeriod() >= 0) {
- first.updateExecution();
- schedulerQueue.put(first, first.isSync());
+ public void cancelTask(final int taskId) {
+ if (taskId <= 0) {
+ return;
+ }
+ CraftTask task = runners.get(taskId);
+ if (task != null) {
+ cancelTask(task);
+ }
+ task = new CraftTask(
+ new Runnable() {
+ public void run() {
+ if (!check(CraftScheduler.this.temp)) {
+ check(CraftScheduler.this.pending);
+ }
+ }
+ private boolean check(final Iterable<CraftTask> collection) {
+ final Iterator<CraftTask> tasks = collection.iterator();
+ while (tasks.hasNext()) {
+ final CraftTask task = tasks.next();
+ if (task.getTaskId() == taskId) {
+ cancelTask(task);
+ tasks.remove();
+ if (task.isSync()) {
+ runners.remove(taskId);
}
- } else {
- stop = true;
+ return true;
}
- } else {
- stop = true;
}
- } else {
- stop = true;
- }
- }
- } while (!stop);
-
- long sleepTime = 0;
- if (first == null) {
- sleepTime = 60000L;
- } else {
- currentTick = getCurrentTick();
- sleepTime = (firstTick - currentTick) * 50 + 25;
+ return false;
+ }});
+ handle(task, 0l);
+ for (CraftTask taskPending = head.getNext(); taskPending != null; taskPending = taskPending.getNext()) {
+ if (taskPending == task) {
+ return;
}
-
- if (sleepTime < 50L) {
- sleepTime = 50L;
- } else if (sleepTime > 60000L) {
- sleepTime = 60000L;
+ if (taskPending.getTaskId() == taskId) {
+ cancelTask(taskPending);
}
-
- synchronized (schedulerQueue) {
- try {
- schedulerQueue.wait(sleepTime);
- } catch (InterruptedException ie) {}
- }
- }
- }
-
- void processTask(CraftTask task) {
- if (task.isSync()) {
- addToMainThreadQueue(task);
- } else {
- craftThreadManager.executeTask(task.getTask(), task.getOwner(), task.getIdNumber());
}
}
- // If the main thread cannot obtain the lock, it doesn't wait
- public void mainThreadHeartbeat(long currentTick) {
- if (syncedTasksLock.tryLock()) {
- try {
- if (mainThreadLock.tryLock()) {
- try {
- this.currentTick = currentTick;
- while (!mainThreadQueue.isEmpty()) {
- syncedTasks.addLast(mainThreadQueue.removeFirst());
- }
- } finally {
- mainThreadLock.unlock();
+ public void cancelTasks(final Plugin plugin) {
+ Validate.notNull(plugin, "Cannot cancel tasks of null plugin");
+ final CraftTask task = new CraftTask(
+ new Runnable() {
+ public void run() {
+ check(CraftScheduler.this.pending);
+ check(CraftScheduler.this.temp);
}
- }
- long breakTime = System.currentTimeMillis() + 35; // max time spent in loop = 35ms
- while (!syncedTasks.isEmpty() && System.currentTimeMillis() <= breakTime) {
- CraftTask task = syncedTasks.removeFirst();
- try {
- task.getTask().run();
- } catch (Throwable t) {
- // Bad plugin!
- logger.log(Level.WARNING, "Task of '" + task.getOwner().getDescription().getName() + "' generated an exception", t);
- synchronized (schedulerQueue) {
- schedulerQueue.remove(task);
+ void check(final Iterable<CraftTask> collection) {
+ final Iterator<CraftTask> tasks = collection.iterator();
+ while (tasks.hasNext()) {
+ final CraftTask task = tasks.next();
+ if (task.getOwner().equals(plugin)) {
+ cancelTask(task);
+ tasks.remove();
+ if (task.isSync()) {
+ runners.remove(task.getTaskId());
+ }
+ break;
+ }
}
}
- }
- } finally {
- syncedTasksLock.unlock();
+ });
+ handle(task, 0l);
+ for (CraftTask taskPending = head.getNext(); taskPending != null; taskPending = taskPending.getNext()) {
+ if (taskPending == task) {
+ return;
+ }
+ if (taskPending.getTaskId() != -1 && taskPending.getOwner().equals(plugin)) {
+ cancelTask(taskPending);
}
}
- }
-
- long getCurrentTick() {
- mainThreadLock.lock();
- long tempTick = 0;
- try {
- tempTick = currentTick;
- } finally {
- mainThreadLock.unlock();
+ for (CraftTask runner : runners.values()) {
+ if (runner.getOwner().equals(plugin)) {
+ cancelTask(runner);
+ }
}
- return tempTick;
}
- void addToMainThreadQueue(CraftTask task) {
- mainThreadLock.lock();
- try {
- mainThreadQueue.addLast(task);
- } finally {
- mainThreadLock.unlock();
+ public void cancelAllTasks() {
+ final CraftTask task = new CraftTask(
+ new Runnable() {
+ public void run() {
+ Iterator<CraftTask> it = CraftScheduler.this.runners.values().iterator();
+ while (it.hasNext()) {
+ CraftTask task = it.next();
+ cancelTask(task);
+ if (task.isSync()) {
+ it.remove();
+ }
+ }
+ CraftScheduler.this.pending.clear();
+ CraftScheduler.this.temp.clear();
+ }
+ });
+ handle(task, 0l);
+ for (CraftTask taskPending = head.getNext(); taskPending != null; taskPending = taskPending.getNext()) {
+ if (taskPending == task) {
+ break;
+ }
+ cancelTask(taskPending);
}
- }
-
- void wipeSyncedTasks() {
- syncedTasksLock.lock();
- try {
- syncedTasks.clear();
- } finally {
- syncedTasksLock.unlock();
+ for (CraftTask runner : runners.values()) {
+ cancelTask(runner);
}
}
- void wipeMainThreadQueue() {
- mainThreadLock.lock();
- try {
- mainThreadQueue.clear();
- } finally {
- mainThreadLock.unlock();
+ public boolean isCurrentlyRunning(final int taskId) {
+ final CraftTask task = runners.get(taskId);
+ if (task == null || task.isSync()) {
+ return false;
+ }
+ final CraftAsyncTask asyncTask = (CraftAsyncTask) task;
+ synchronized (asyncTask.getWorkers()) {
+ return asyncTask.getWorkers().isEmpty();
}
}
- public int scheduleSyncDelayedTask(Plugin plugin, Runnable task, long delay) {
- return scheduleSyncRepeatingTask(plugin, task, delay, -1);
- }
-
- public int scheduleSyncDelayedTask(Plugin plugin, Runnable task) {
- return scheduleSyncDelayedTask(plugin, task, 0L);
- }
-
- public int scheduleSyncRepeatingTask(Plugin plugin, Runnable task, long delay, long period) {
- if (plugin == null) {
- throw new IllegalArgumentException("Plugin cannot be null");
- }
- if (task == null) {
- throw new IllegalArgumentException("Task cannot be null");
- }
- if (delay < 0) {
- throw new IllegalArgumentException("Delay cannot be less than 0");
+ public boolean isQueued(final int taskId) {
+ if (taskId <= 0) {
+ return false;
}
-
- CraftTask newTask = new CraftTask(plugin, task, true, getCurrentTick() + delay, period);
-
- synchronized (schedulerQueue) {
- schedulerQueue.put(newTask, true);
- schedulerQueue.notify();
+ for (CraftTask task = head.getNext(); task != null; task = task.getNext()) {
+ if (task.getTaskId() == taskId) {
+ return task.getPeriod() >= -1l; // The task will run
+ }
}
- return newTask.getIdNumber();
- }
-
- public int scheduleAsyncDelayedTask(Plugin plugin, Runnable task, long delay) {
- return scheduleAsyncRepeatingTask(plugin, task, delay, -1);
+ CraftTask task = runners.get(taskId);
+ return task != null && task.getPeriod() >= -1l;
}
- public int scheduleAsyncDelayedTask(Plugin plugin, Runnable task) {
- return scheduleAsyncDelayedTask(plugin, task, 0L);
+ public List<BukkitWorker> getActiveWorkers() {
+ final ArrayList<BukkitWorker> workers = new ArrayList<BukkitWorker>();
+ for (final CraftTask taskObj : runners.values()) {
+ // Iterator will be a best-effort (may fail to grab very new values) if called from an async thread
+ if (taskObj.isSync()) {
+ continue;
+ }
+ final CraftAsyncTask task = (CraftAsyncTask) taskObj;
+ synchronized (task.getWorkers()) {
+ // This will never have an issue with stale threads; it's state-safe
+ workers.addAll(task.getWorkers());
+ }
+ }
+ return workers;
}
- public int scheduleAsyncRepeatingTask(Plugin plugin, Runnable task, long delay, long period) {
- if (plugin == null) {
- throw new IllegalArgumentException("Plugin cannot be null");
- }
- if (task == null) {
- throw new IllegalArgumentException("Task cannot be null");
- }
- if (delay < 0) {
- throw new IllegalArgumentException("Delay cannot be less than 0");
+ public List<BukkitTask> getPendingTasks() {
+ final ArrayList<CraftTask> truePending = new ArrayList<CraftTask>();
+ for (CraftTask task = head.getNext(); task != null; task = task.getNext()) {
+ if (task.getTaskId() != -1) {
+ // -1 is special code
+ truePending.add(task);
+ }
}
- CraftTask newTask = new CraftTask(plugin, task, false, getCurrentTick() + delay, period);
-
- synchronized (schedulerQueue) {
- schedulerQueue.put(newTask, false);
- schedulerQueue.notify();
+ final ArrayList<BukkitTask> pending = new ArrayList<BukkitTask>();
+ final Iterator<CraftTask> it = runners.values().iterator();
+ while (it.hasNext()) {
+ final CraftTask task = it.next();
+ if (task.getPeriod() >= -1l) {
+ pending.add(task);
+ }
}
- return newTask.getIdNumber();
- }
- public <T> Future<T> callSyncMethod(Plugin plugin, Callable<T> task) {
- CraftFuture<T> craftFuture = new CraftFuture<T>(this, task);
- synchronized (craftFuture) {
- int taskId = scheduleSyncDelayedTask(plugin, craftFuture);
- craftFuture.setTaskId(taskId);
+ for (final CraftTask task : truePending) {
+ if (task.getPeriod() >= -1l && !pending.contains(task)) {
+ pending.add(task);
+ }
}
- return craftFuture;
+ return pending;
}
- public void cancelTask(int taskId) {
- syncedTasksLock.lock();
- try {
- synchronized (schedulerQueue) {
- mainThreadLock.lock();
- try {
- Iterator<CraftTask> itr = schedulerQueue.keySet().iterator();
- while (itr.hasNext()) {
- CraftTask current = itr.next();
- if (current.getIdNumber() == taskId) {
- itr.remove();
- }
- }
- itr = mainThreadQueue.iterator();
- while (itr.hasNext()) {
- CraftTask current = itr.next();
- if (current.getIdNumber() == taskId) {
- itr.remove();
- }
- }
- itr = syncedTasks.iterator();
- while (itr.hasNext()) {
- CraftTask current = itr.next();
- if (current.getIdNumber() == taskId) {
- itr.remove();
- }
- }
- } finally {
- mainThreadLock.unlock();
+ /**
+ * This method is designed to never block or wait for locks; an immediate execution of all current tasks.
+ */
+ public void mainThreadHeartbeat(final int currentTick) {
+ this.currentTick = currentTick;
+ final List<CraftTask> temp = this.temp;
+ parsePending();
+ while (isReady(currentTick)) {
+ final CraftTask task = pending.remove();
+ if (task.getPeriod() < -1l) {
+ if (task.isSync()) {
+ runners.remove(task.getTaskId(), task);
}
+ parsePending();
+ continue;
}
- } finally {
- syncedTasksLock.unlock();
- }
-
- craftThreadManager.interruptTask(taskId);
- }
-
- public void cancelTasks(Plugin plugin) {
- syncedTasksLock.lock();
- try {
- synchronized (schedulerQueue) {
- mainThreadLock.lock();
+ if (task.isSync()) {
try {
- Iterator<CraftTask> itr = schedulerQueue.keySet().iterator();
- while (itr.hasNext()) {
- CraftTask current = itr.next();
- if (current.getOwner().equals(plugin)) {
- itr.remove();
- }
- }
- itr = mainThreadQueue.iterator();
- while (itr.hasNext()) {
- CraftTask current = itr.next();
- if (current.getOwner().equals(plugin)) {
- itr.remove();
- }
- }
- itr = syncedTasks.iterator();
- while (itr.hasNext()) {
- CraftTask current = itr.next();
- if (current.getOwner().equals(plugin)) {
- itr.remove();
- }
- }
- } finally {
- mainThreadLock.unlock();
+ task.run();
+ } catch (final Throwable throwable) {
+ task.getOwner().getLogger().log(
+ Level.WARNING,
+ String.format(
+ "Task #%s for %s generated an exception",
+ task.getTaskId(),
+ task.getOwner().getDescription().getFullName()),
+ throwable);
}
+ parsePending();
+ } else {
+ executor.execute(task);
+ // We don't need to parse pending
+ // (async tasks must live with race-conditions if they attempt to cancel between these few lines of code)
+ }
+ final long period = task.getPeriod(); // State consistency
+ if (period > 0) {
+ task.setNextRun(currentTick + period);
+ temp.add(task);
+ } else if (task.isSync()) {
+ runners.remove(task.getTaskId());
}
- } finally {
- syncedTasksLock.unlock();
}
-
- craftThreadManager.interruptTasks(plugin);
+ pending.addAll(temp);
+ temp.clear();
}
- public void cancelAllTasks() {
- synchronized (schedulerQueue) {
- schedulerQueue.clear();
+ private void addTask(final CraftTask task) {
+ final AtomicReference<CraftTask> tail = this.tail;
+ CraftTask tailTask = tail.get();
+ while (!tail.compareAndSet(tailTask, task)) {
+ tailTask = tail.get();
}
- wipeMainThreadQueue();
- wipeSyncedTasks();
-
- craftThreadManager.interruptAllTasks();
+ tailTask.setNext(task);
}
- public boolean isCurrentlyRunning(int taskId) {
- return craftThreadManager.isAlive(taskId);
+ private int handle(final CraftTask task, final long delay) {
+ task.setNextRun(currentTick + delay);
+ addTask(task);
+ return task.getTaskId();
}
- public boolean isQueued(int taskId) {
- synchronized (schedulerQueue) {
- Iterator<CraftTask> itr = schedulerQueue.keySet().iterator();
- while (itr.hasNext()) {
- CraftTask current = itr.next();
- if (current.getIdNumber() == taskId) {
- return true;
- }
- }
- return false;
- }
+ private static void validate(final Plugin plugin, final Object task) {
+ Validate.notNull(plugin, "Plugin cannot be null");
+ Validate.notNull(task, "Task cannot be null");
}
- public List<BukkitWorker> getActiveWorkers() {
- synchronized (craftThreadManager.workers) {
- List<BukkitWorker> workerList = new ArrayList<BukkitWorker>(craftThreadManager.workers.size());
- Iterator<CraftWorker> itr = craftThreadManager.workers.iterator();
+ private int nextId() {
+ return ids.incrementAndGet();
+ }
- while (itr.hasNext()) {
- workerList.add((BukkitWorker) itr.next());
+ private void parsePending() {
+ CraftTask head = this.head;
+ CraftTask task = head.getNext();
+ CraftTask lastTask = head;
+ for (; task != null; task = (lastTask = task).getNext()) {
+ if (task.getTaskId() == -1) {
+ task.run();
+ } else if (task.getPeriod() >= -1l) {
+ pending.add(task);
+ runners.put(task.getTaskId(), task);
}
- return workerList;
}
+ // We split this because of the way things are ordered for all of the async calls in CraftScheduler
+ // (it prevents race-conditions)
+ for (task = head; task != lastTask; task = head) {
+ head = task.getNext();
+ task.setNext(null);
+ }
+ this.head = lastTask;
}
- public List<BukkitTask> getPendingTasks() {
- List<CraftTask> taskList = null;
- syncedTasksLock.lock();
- try {
- synchronized (schedulerQueue) {
- mainThreadLock.lock();
- try {
- taskList = new ArrayList<CraftTask>(mainThreadQueue.size() + syncedTasks.size() + schedulerQueue.size());
- taskList.addAll(mainThreadQueue);
- taskList.addAll(syncedTasks);
- taskList.addAll(schedulerQueue.keySet());
- } finally {
- mainThreadLock.unlock();
+ private boolean isReady(final int currentTick) {
+ return !pending.isEmpty() && pending.peek().getNextRun() <= currentTick;
+ }
+
+ /**
+ * This method is important to make sure the code is consistent everywhere.
+ * Synchronizing is needed for future and async to prevent race conditions,
+ * main thread or otherwise.
+ * @return True if cancelled
+ */
+ private boolean cancelTask(final CraftTask task) {
+ if (task.isSync()) {
+ if (task instanceof CraftFuture) {
+ synchronized (task) {
+ if (task.getPeriod() != -1l) {
+ return false;
+ }
+ // This needs to be set INSIDE of the synchronized block
+ task.setPeriod(-2l);
+ task.notifyAll();
}
+ } else {
+ task.setPeriod(-2l);
+ }
+ } else {
+ synchronized (((CraftAsyncTask) task).getWorkers()) {
+ // Synchronizing here prevents race condition for a completing task
+ task.setPeriod(-2l);
}
- } finally {
- syncedTasksLock.unlock();
- }
- List<BukkitTask> newTaskList = new ArrayList<BukkitTask>(taskList.size());
-
- for (CraftTask craftTask : taskList) {
- newTaskList.add((BukkitTask) craftTask);
}
- return newTaskList;
+ return true;
}
-
}
diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java
index 2304719e..8e56766d 100644
--- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java
+++ b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java
@@ -1,112 +1,78 @@
package org.bukkit.craftbukkit.scheduler;
-import java.lang.Comparable;
-
import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitTask;
-public class CraftTask implements Comparable<Object>, BukkitTask {
- private final Runnable task;
- private final boolean syncTask;
- private long executionTick;
- private final long period;
- private final Plugin owner;
- private final int idNumber;
+class CraftTask implements BukkitTask, Runnable {
- private static Integer idCounter = 1;
- private static Object idCounterSync = new Object();
+ private volatile CraftTask next = null;
+ /**
+ * -1 means no repeating <br>
+ * -2 means cancel <br>
+ * -3 means processing for Future <br>
+ * -4 means done for Future <br>
+ * Never 0 <br>
+ * >0 means number of ticks to wait between each execution
+ */
+ private volatile long period;
+ private long nextRun;
+ private final Runnable task;
+ private final Plugin plugin;
+ private final int id;
- CraftTask(Plugin owner, Runnable task, boolean syncTask) {
- this(owner, task, syncTask, -1, -1);
+ CraftTask() {
+ this(null, null, -1, -1);
}
- CraftTask(Plugin owner, Runnable task, boolean syncTask, long executionTick) {
- this(owner, task, syncTask, executionTick, -1);
+ CraftTask(final Runnable task) {
+ this(null, task, -1, -1);
}
- CraftTask(Plugin owner, Runnable task, boolean syncTask, long executionTick, long period) {
+ CraftTask(final Plugin plugin, final Runnable task, final int id, final long period) {
+ this.plugin = plugin;
this.task = task;
- this.syncTask = syncTask;
- this.executionTick = executionTick;
+ this.id = id;
this.period = period;
- this.owner = owner;
- this.idNumber = CraftTask.getNextId();
}
- static int getNextId() {
- synchronized (idCounterSync) {
- idCounter++;
- return idCounter;
- }
+ public final int getTaskId() {
+ return id;
}
- Runnable getTask() {
- return task;
+ public final Plugin getOwner() {
+ return plugin;
}
public boolean isSync() {
- return syncTask;
+ return true;
}
- long getExecutionTick() {
- return executionTick;
+ public void run() {
+ task.run();
}
long getPeriod() {
return period;
}
- public Plugin getOwner() {
- return owner;
- }
-
- void updateExecution() {
- executionTick += period;
- }
-
- public int getTaskId() {
- return getIdNumber();
+ void setPeriod(long period) {
+ this.period = period;
}
- int getIdNumber() {
- return idNumber;
+ long getNextRun() {
+ return nextRun;
}
- public int compareTo(Object other) {
- if (!(other instanceof CraftTask)) {
- return 0;
- } else {
- CraftTask o = (CraftTask) other;
- long timeDiff = executionTick - o.getExecutionTick();
- if (timeDiff > 0) {
- return 1;
- } else if (timeDiff < 0) {
- return -1;
- } else {
- CraftTask otherCraftTask = (CraftTask) other;
- return getIdNumber() - otherCraftTask.getIdNumber();
- }
- }
+ void setNextRun(long nextRun) {
+ this.nextRun = nextRun;
}
- @Override
- public boolean equals(Object other) {
-
- if (other == null) {
- return false;
- }
-
- if (!(other instanceof CraftTask)) {
- return false;
- }
-
- CraftTask otherCraftTask = (CraftTask) other;
- return otherCraftTask.getIdNumber() == getIdNumber();
+ CraftTask getNext() {
+ return next;
}
- @Override
- public int hashCode() {
- return getIdNumber();
+ void setNext(CraftTask next) {
+ this.next = next;
}
}
diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftThreadManager.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftThreadManager.java
deleted file mode 100644
index 29defcda..00000000
--- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftThreadManager.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.bukkit.craftbukkit.scheduler;
-
-import java.util.HashSet;
-import java.util.Iterator;
-
-import org.bukkit.plugin.Plugin;
-
-public class CraftThreadManager {
-
- final HashSet<CraftWorker> workers = new HashSet<CraftWorker>();
-
- void executeTask(Runnable task, Plugin owner, int taskId) {
-
- CraftWorker craftWorker = new CraftWorker(this, task, owner, taskId);
- synchronized (workers) {
- workers.add(craftWorker);
- }
-
- }
-
- void interruptTask(int taskId) {
- synchronized (workers) {
- Iterator<CraftWorker> itr = workers.iterator();
- while (itr.hasNext()) {
- CraftWorker craftWorker = itr.next();
- if (craftWorker.getTaskId() == taskId) {
- craftWorker.interrupt();
- }
- }
- }
- }
-
- void interruptTasks(Plugin owner) {
- synchronized (workers) {
- Iterator<CraftWorker> itr = workers.iterator();
- while (itr.hasNext()) {
- CraftWorker craftWorker = itr.next();
- if (craftWorker.getOwner().equals(owner)) {
- craftWorker.interrupt();
- }
- }
- }
- }
-
- void interruptAllTasks() {
- synchronized (workers) {
- Iterator<CraftWorker> itr = workers.iterator();
- while (itr.hasNext()) {
- CraftWorker craftWorker = itr.next();
- craftWorker.interrupt();
- }
- }
- }
-
- boolean isAlive(int taskId) {
- synchronized (workers) {
- Iterator<CraftWorker> itr = workers.iterator();
- while (itr.hasNext()) {
- CraftWorker craftWorker = itr.next();
- if (craftWorker.getTaskId() == taskId) {
- return craftWorker.isAlive();
- }
- }
- // didn't find it, so it must have been removed
- return false;
- }
- }
-}
diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftWorker.java b/src/main/java/org/bukkit/craftbukkit/scheduler/CraftWorker.java
deleted file mode 100644
index 94aa411c..00000000
--- a/src/main/java/org/bukkit/craftbukkit/scheduler/CraftWorker.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.bukkit.craftbukkit.scheduler;
-
-import org.bukkit.plugin.Plugin;
-import org.bukkit.scheduler.BukkitWorker;
-
-public class CraftWorker implements Runnable, BukkitWorker {
-
- private static int hashIdCounter = 1;
- private static Object hashIdCounterSync = new Object();
-
- private final int hashId;
-
- private final Plugin owner;
- private final int taskId;
-
- private final Thread t;
- private final CraftThreadManager parent;
-
- private final Runnable task;
-
- CraftWorker(CraftThreadManager parent, Runnable task, Plugin owner, int taskId) {
- this.parent = parent;
- this.taskId = taskId;
- this.task = task;
- this.owner = owner;
- this.hashId = CraftWorker.getNextHashId();
- t = new Thread(this);
- t.start();
- }
-
- public void run() {
-
- try {
- task.run();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- synchronized (parent.workers) {
- parent.workers.remove(this);
- }
-
- }
-
- public int getTaskId() {
- return taskId;
- }
-
- public Plugin getOwner() {
- return owner;
- }
-
- public Thread getThread() {
- return t;
- }
-
- public void interrupt() {
- t.interrupt();
- }
-
- public boolean isAlive() {
- return t.isAlive();
- }
-
- private static int getNextHashId() {
- synchronized (hashIdCounterSync) {
- return hashIdCounter++;
- }
- }
-
- @Override
- public int hashCode() {
- return hashId;
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null) {
- return false;
- }
-
- if (!(other instanceof CraftWorker)) {
- return false;
- }
-
- CraftWorker otherCraftWorker = (CraftWorker) other;
- return otherCraftWorker.hashCode() == hashId;
- }
-
-}
diff --git a/src/main/java/org/bukkit/craftbukkit/scheduler/ObjectContainer.java b/src/main/java/org/bukkit/craftbukkit/scheduler/ObjectContainer.java
deleted file mode 100644
index c0fa37cb..00000000
--- a/src/main/java/org/bukkit/craftbukkit/scheduler/ObjectContainer.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.bukkit.craftbukkit.scheduler;
-
-public class ObjectContainer<T> {
-
- T object;
-
- public void setObject(T object) {
- this.object = object;
- }
-
- public T getObject() {
- return object;
- }
-
-}