Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement async notify queue that submits to a KeyQueuedExecutorService #2334

Merged
merged 2 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
Expand All @@ -52,6 +53,7 @@ public abstract class QueueHandler implements Trimable, Runnable {
null,
false
);

/**
* Secondary queue should be used for "cleanup" tasks that are likely to be shorter in life than those submitted to the
* primary queue. They may be IO-bound tasks.
Expand Down Expand Up @@ -508,4 +510,28 @@ public boolean trim(boolean aggressive) {
return result;
}

/**
* Primary queue should be used for tasks that are unlikely to wait on other tasks, IO, etc. (i.e. spend most of their
* time utilising CPU.
* <p>
* Internal API usage only.
*
* @since TODO
*/
public ExecutorService getForkJoinPoolPrimary() {
return forkJoinPoolPrimary;
}

/**
* Secondary queue should be used for "cleanup" tasks that are likely to be shorter in life than those submitted to the
* primary queue. They may be IO-bound tasks.
* <p>
* Internal API usage only.
*
* @since TODO
*/
public ExecutorService getForkJoinPoolSecondary() {
return forkJoinPoolSecondary;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.fastasyncworldedit.core.util.task;

import com.fastasyncworldedit.core.configuration.Settings;

import java.io.Closeable;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.function.Supplier;

/**
* async queue that accepts a {@link Thread.UncaughtExceptionHandler} for exception handling per instance, delegating to a
* parent {@link KeyQueuedExecutorService}.
*
* @since TODO
*/
public class AsyncNotifyKeyedQueue implements Closeable {

private static final KeyQueuedExecutorService<UUID> QUEUE_SUBMISSIONS = new KeyQueuedExecutorService<>(new ForkJoinPool(
Settings.settings().QUEUE.PARALLEL_THREADS,
new FaweForkJoinWorkerThreadFactory("AsyncNotifyKeyedQueue - %s"),
null,
false
));

private final Thread.UncaughtExceptionHandler handler;
private final Supplier<UUID> key;
private volatile boolean closed;

/**
* New instance
*
* @param handler exception handler
* @param key supplier of UUID key
*/
public AsyncNotifyKeyedQueue(Thread.UncaughtExceptionHandler handler, Supplier<UUID> key) {
this.handler = handler;
this.key = key;
}

public Thread.UncaughtExceptionHandler getHandler() {
return handler;
}

public <T> Future<T> run(Runnable task) {
return call(() -> {
task.run();
return null;
});
}

public <T> Future<T> call(Callable<T> task) {
Future[] self = new Future[1];
Callable<T> wrapped = () -> {
if (!closed) {
try {
return task.call();
} catch (Throwable e) {
handler.uncaughtException(Thread.currentThread(), e);
}
}
if (self[0] != null) {
self[0].cancel(true);
}
return null;
};
self[0] = QUEUE_SUBMISSIONS.submit(key.get(), wrapped);
return self[0];
}

@Override
public void close() {
closed = true;
}

public boolean isClosed() {
return closed;
}

}
Original file line number Diff line number Diff line change
@@ -1,27 +1,16 @@
package com.fastasyncworldedit.core.util.task;

import com.fastasyncworldedit.core.Fawe;
import com.fastasyncworldedit.core.configuration.Settings;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.io.Closeable;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

public class AsyncNotifyQueue implements Closeable {

private static final ForkJoinPool QUEUE_SUBMISSIONS = new ForkJoinPool(
Settings.settings().QUEUE.PARALLEL_THREADS,
new FaweForkJoinWorkerThreadFactory("AsyncNotifyQueue - %s"),
null,
false
);

private final Lock lock = new ReentrantLock(true);
private final Thread.UncaughtExceptionHandler handler;
private boolean closed;
Expand Down Expand Up @@ -56,9 +45,6 @@ public <T> Future<T> call(Callable<T> task) {
return task.call();
} catch (Throwable e) {
handler.uncaughtException(Thread.currentThread(), e);
if (self[0] != null) {
self[0].cancel(true);
}
}
}
} finally {
Expand All @@ -70,7 +56,7 @@ public <T> Future<T> call(Callable<T> task) {
}
return null;
};
self[0] = QUEUE_SUBMISSIONS.submit(wrapped);
self[0] = Fawe.instance().getQueueHandler().async(wrapped);
return self[0];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

package com.sk89q.worldedit.extension.platform;

import com.fastasyncworldedit.core.configuration.Caption;
import com.fastasyncworldedit.core.internal.exception.FaweException;
import com.fastasyncworldedit.core.util.TaskManager;
import com.fastasyncworldedit.core.util.task.AsyncNotifyQueue;
import com.fastasyncworldedit.core.util.task.AsyncNotifyKeyedQueue;
import com.sk89q.worldedit.WorldEditException;
import com.sk89q.worldedit.internal.cui.CUIEvent;
import com.sk89q.worldedit.util.formatting.text.TextComponent;
Expand Down Expand Up @@ -68,7 +67,7 @@ public Map<String, Object> getRawMeta() {

// Queue for async tasks
private final AtomicInteger runningCount = new AtomicInteger();
private final AsyncNotifyQueue asyncNotifyQueue = new AsyncNotifyQueue((thread, throwable) -> {
private final AsyncNotifyKeyedQueue asyncNotifyQueue = new AsyncNotifyKeyedQueue((thread, throwable) -> {
while (throwable.getCause() != null) {
throwable = throwable.getCause();
}
Expand All @@ -82,7 +81,7 @@ public Map<String, Object> getRawMeta() {
throwable.printStackTrace();
}
}
});
}, this::getUniqueId);

/**
* Run a task either async, or on the current thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.fastasyncworldedit.core.regions.FaweMaskManager;
import com.fastasyncworldedit.core.util.TaskManager;
import com.fastasyncworldedit.core.util.WEManager;
import com.fastasyncworldedit.core.util.task.AsyncNotifyQueue;
import com.fastasyncworldedit.core.util.task.AsyncNotifyKeyedQueue;
import com.sk89q.worldedit.EditSession;
import com.sk89q.worldedit.MaxChangedBlocksException;
import com.sk89q.worldedit.WorldEdit;
Expand Down Expand Up @@ -81,7 +81,7 @@ public abstract class AbstractPlayerActor implements Actor, Player, Cloneable {

// Queue for async tasks
private final AtomicInteger runningCount = new AtomicInteger();
private final AsyncNotifyQueue asyncNotifyQueue = new AsyncNotifyQueue(
private final AsyncNotifyKeyedQueue asyncNotifyQueue = new AsyncNotifyKeyedQueue(
(thread, throwable) -> {
while (throwable.getCause() != null) {
throwable = throwable.getCause();
Expand All @@ -96,7 +96,7 @@ public abstract class AbstractPlayerActor implements Actor, Player, Cloneable {
throwable.printStackTrace();
}
}
});
}, this::getUniqueId);

public AbstractPlayerActor(Map<String, Object> meta) {
this.meta = meta;
Expand Down
Loading