#if UNITY_WEBGL && !UNITY_EDITOR
#define SINGLE_THREAD
#endif
using System.Collections.Generic;
using System.Threading;
namespace Pathfinding.Util {
///
/// Helper for parallelizing tasks.
/// More specifically this class is useful if the tasks need some large and slow to initialize 'scratch pad'.
/// Using this class you can initialize a scratch pad per thread and then use the appropriate one in the task
/// callback (which includes a thread index).
///
/// Any exception that is thrown in the worker threads will be propagated out to the caller of the method.
///
public class ParallelWorkQueue {
///
/// Callback to run for each item in the queue.
/// The callback takes the item as the first parameter and the thread index as the second parameter.
///
public System.Action action;
/// Number of threads to use
public readonly int threadCount;
/// Queue of items
readonly Queue queue;
readonly int initialCount;
#if !SINGLE_THREAD
ManualResetEvent[] waitEvents;
System.Exception innerException;
#endif
public ParallelWorkQueue (Queue queue) {
this.queue = queue;
initialCount = queue.Count;
#if SINGLE_THREAD
threadCount = 1;
#else
threadCount = System.Math.Min(initialCount, System.Math.Max(1, AstarPath.CalculateThreadCount(ThreadCount.AutomaticHighLoad)));
#endif
}
/// Execute the tasks.
/// This iterator will yield approximately every progressTimeoutMillis milliseconds.
/// This can be used to e.g show a progress bar.
public IEnumerable Run (int progressTimeoutMillis) {
if (initialCount != queue.Count) throw new System.InvalidOperationException("Queue has been modified since the constructor");
// Return early if there are no items in the queue.
// This is important because WaitHandle.WaitAll with an array of length zero
// results in weird behaviour (Microsoft's .Net returns false, Mono returns true
// and the documentation says it should throw an exception).
if (initialCount == 0) yield break;
#if SINGLE_THREAD
// WebGL does not support multithreading so we will do everything on the main thread instead
for (int i = 0; i < initialCount; i++) {
action(queue.Dequeue(), 0);
yield return i + 1;
}
#else
// Fire up a bunch of threads to scan the graph in parallel
waitEvents = new ManualResetEvent[threadCount];
for (int i = 0; i < waitEvents.Length; i++) {
waitEvents[i] = new ManualResetEvent(false);
#if NETFX_CORE
// Need to make a copy here, otherwise it may refer to some other index when the task actually runs.
int threadIndex = i;
System.Threading.Tasks.Task.Run(() => RunTask(threadIndex));
#else
ThreadPool.QueueUserWorkItem(threadIndex => RunTask((int)threadIndex), i);
#endif
}
while (!WaitHandle.WaitAll(waitEvents, progressTimeoutMillis)) {
int count;
lock (queue) count = queue.Count;
yield return initialCount - count;
}
if (innerException != null) throw innerException;
#endif
}
#if !SINGLE_THREAD
void RunTask (int threadIndex) {
try {
while (true) {
T tile;
lock (queue) {
if (queue.Count == 0) return;
tile = queue.Dequeue();
}
action(tile, threadIndex);
}
} catch (System.Exception e) {
innerException = e;
// Stop the remaining threads
lock (queue) queue.Clear();
} finally {
waitEvents[threadIndex].Set();
}
}
#endif
}
}