Construct a new PriorityQueue, which is a handle to width InnerPQs allocated using Alloc.
The element type of the priority queue
Tests whether or not there are elements in any InnerPQ.
Inserts an element into one of the InnerPQs.
Attempts to merge this PriorityQueue with another of the same width.
Removes an element from one of the InnerPQs.
Searches for an empty InnerPQ and swaps it with src. Useful for efficient bulk initialization. Warning: This method is blocking: it will run forever if no InnerPQ is empty.
Attempts to remove an element from one of the InnerPQs up to maxRetries times.
The width of the PriorityQueue.
The total number of bytes that will be requested to the allocator in order to insert a single element (element size + overhead).
import std.range : iota; import std.parallelism : TaskPool; import core.sync.barrier : Barrier; enum numThreads = 4; auto b = new Barrier(numThreads); auto tp = new TaskPool(numThreads-1); // pq0 and pq1 are shared handles to priority queue data structures alias A = TLRegionAllocator; alias PQ = shared PriorityQueue!(int, A, SkewHeap!(int,A), Yes.EnableEmpty); PQ pq0, pq1; // for each thread in taskpool: foreach(id ; tp.parallel(iota(numThreads), 1)) { b.wait(); // initialize allocator and construct priority queues A.enter(2 ^^ 12); if (id == 0) { pq0 = PQ(numThreads, 0); pq1 = PQ(numThreads, 1); } b.wait(); // insert test values pq0.insert(id); pq1.insert(id + numThreads); // attempt to merge pq0 and pq1 if (id % 2 > 0) { pq0.merge(pq1); } else { pq1.merge(pq0); } // remove all items while (! pq0.empty) { auto maybe_int = pq0.tryRemoveAny(10); } b.wait(); // cleanup allocator A.exit(); } // cleanup taskpool tp.finish(true);
A Relaxed Concurrent Mergeable Priority Queue.
PriorityQueue is a relaxed concurrent priority queue implementation which, under the appropriate assumptions, supports lock-free insert and merge operations and a deadlock-free remove-min. Any operation such that given p threads and width p, a removed element is expected to have average rank p and worst-case rank O(p log p).
Sequential Semantics: A PriorityQueue is an array of InnerPQ instances. An insert and remove will randomly choose an index to atomically operate on, and a PriorityQueue merge is an atomic, index-by-index merge of corresponding InnerPQ instances.