PriorityQueue

A Relaxed Concurrent Mergeable Priority Queue.

PriorityQueue is a relaxed concurrent priority queue implementation which, under the appropriate assumptions, supports lock-free insert and merge operations and a deadlock-free remove-min. Any operation such that given p threads and width p, a removed element is expected to have average rank p and worst-case rank O(p log p).

Sequential Semantics: A PriorityQueue is an array of InnerPQ instances. An insert and remove will randomly choose an index to atomically operate on, and a PriorityQueue merge is an atomic, index-by-index merge of corresponding InnerPQ instances.

Constructors

this
this(size_t width, size_t handleId)

Construct a new PriorityQueue, which is a handle to width InnerPQs allocated using Alloc.

Public Imports

std.typecons
public import std.typecons : Nullable;

Members

Aliases

ElementType
alias ElementType = T

The element type of the priority queue

Functions

empty
bool empty()

Tests whether or not there are elements in any InnerPQ.

insert
void insert(T t)

Inserts an element into one of the InnerPQs.

merge
MergeResult merge(PriorityQueue other)

Attempts to merge this PriorityQueue with another of the same width.

removeAny
T removeAny()

Removes an element from one of the InnerPQs.

swapEmptyWith
void swapEmptyWith(InnerPQ src)

Searches for an empty InnerPQ and swaps it with src. Useful for efficient bulk initialization. Warning: This method is blocking: it will run forever if no InnerPQ is empty.

tryRemoveAny
Nullable!T tryRemoveAny(size_t maxRetries)

Attempts to remove an element from one of the InnerPQs up to maxRetries times.

Properties

width
size_t width [@property getter]

The width of the PriorityQueue.

Static functions

fixedAllocSize
size_t fixedAllocSize(size_t width)

Variables

PerElemAllocSize
enum size_t PerElemAllocSize;

The total number of bytes that will be requested to the allocator in order to insert a single element (element size + overhead).

Examples

import std.range : iota;
import std.parallelism : TaskPool;
import core.sync.barrier : Barrier;
enum numThreads = 4;
auto b = new Barrier(numThreads);
auto tp = new TaskPool(numThreads-1);
// pq0 and pq1 are shared handles to priority queue data structures
alias A = TLRegionAllocator;
alias PQ = shared PriorityQueue!(int, A, SkewHeap!(int,A), Yes.EnableEmpty);
PQ pq0, pq1;
// for each thread in taskpool:
foreach(id ; tp.parallel(iota(numThreads), 1)) {
    b.wait();
    // initialize allocator and construct priority queues
    A.enter(2 ^^ 12);
    if (id == 0) {
        pq0 = PQ(numThreads, 0);
        pq1 = PQ(numThreads, 1);
    }
    b.wait();
    // insert test values
    pq0.insert(id);
    pq1.insert(id + numThreads);
    // attempt to merge pq0 and pq1
    if (id % 2 > 0) {
        pq0.merge(pq1);
    }
    else {
        pq1.merge(pq0);
    }
    // remove all items
    while (! pq0.empty) {
        auto maybe_int = pq0.tryRemoveAny(10);
    }
    b.wait();
    // cleanup allocator
    A.exit();
}
// cleanup taskpool
tp.finish(true);

Meta