1 module mergearray.priorityqueue;
2 
3 import mergearray.impl.allocator;
4 import mergearray.seqpq.skewheap, mergearray.seqpq.pairingheap;
5 import std.typecons : Flag, Yes, No;
6 
7 private struct Versioned(C) {
8     C c;
9     alias c this;
10     
11     size_t versionNumber = 0;
12 }
13 
14 /++
15 A Relaxed Concurrent Mergeable Priority Queue.
16 
17 PriorityQueue is a relaxed concurrent priority queue implementation which, under
18 the appropriate assumptions, supports lock-free insert and merge operations and
19 a deadlock-free remove-min. Any operation such that given p threads and width p, a
20 removed element is expected to have average rank p and worst-case rank
21 O(p log p).
22 
23 Sequential Semantics: A PriorityQueue is an array of InnerPQ instances.
24 An insert and remove will randomly choose an index to atomically operate on, and
25 a PriorityQueue merge is an atomic, index-by-index merge of corresponding
26 InnerPQ instances.
27 +/
28 struct PriorityQueue(T,
29                      Alloc = GCAllocator,
30                      InnerPQ = SkewHeap!(T, Alloc) /+ PairingHeap!(T, Alloc) +/,
31                      Flag!"EnableEmpty" EnableEmpty = No.EnableEmpty) {
32 private:
33     
34     static if (EnableEmpty) {
35         alias SeqPQ = Versioned!InnerPQ;
36     }
37     else {
38         alias SeqPQ = InnerPQ;
39     }
40     
41     private import mergearray.impl.handle_array, mergearray.impl.bag_array;
42     alias Bag = shared(MergeArray!SeqPQ);
43     alias H = Handle!Bag;
44     
45     import std.traits : hasMember;
46     enum TwoQueueBalanceInsert = false && hasMember!(H, "applyUntil_2");
47     enum TwoQueueBalanceRemove = false && hasMember!(H, "applyUntil_2");
48     
49     H handle;
50     
51 public:
52     /++
53     The element type of the priority queue
54     +/
55     alias ElementType = T;
56     
57     /++
58     The total number of bytes that will be requested to the allocator in order
59     to insert a single element (element size + overhead).
60     +/
61     enum size_t PerElemAllocSize = SeqPQ.NodeSize;
62     
63     /++
64     Returns: The total number of bytes that will be requested to the allocator
65     when creating a new, empty instance of this type with the given width.
66     +/
67     static size_t fixedAllocSize(size_t width) {
68         return H.fixedAllocSize(width);
69     }
70     
71     /++
72     Construct a new PriorityQueue, which is a handle to width InnerPQs allocated
73     using Alloc.
74     Params:
75         width = The number of InnerPQs.
76         handleId = A unique identifier among PriorityQueues with the same width.
77     +/
78     this(size_t width, size_t handleId) shared {
79         static if (is(Alloc == GCAllocator)) {
80             handle = H.make(handleId, width);
81         }
82         else {
83             handle = H.makeWith!Alloc(handleId, width);
84         }
85     }
86     
87     /++
88     The width of the PriorityQueue.
89     
90     Returns: The number of InnerPQs held inside.
91     +/
92     @property
93     size_t width() shared {
94         return handle.width;
95     }
96     
97     /++
98     Inserts an element into one of the InnerPQs.
99     
100     The destination is chosen randomly among the InnerPQs that are not being
101     operated on by other threads. Elements should be well-distributed under low
102     contention, and also under high contention if the distribution of inserted
103     elements is similar between threads.
104     Params:
105         t = The element to be inserted.
106     Progress: Lock-free if there are no more than width total concurrent threads,
107     otherwise deadlock-free.
108     +/
109     void insert(T t) shared {
110         void dg(SeqPQ* sh) {
111             sh.insert(t);
112             
113             static if (EnableEmpty) {
114                 sh.versionNumber++;
115             }
116         }
117         
118         static if (TwoQueueBalanceInsert) {
119             if (handle.width > 1) {
120                 // heuristic: insert into heap with fewer elements
121                 handle.applyUntil_2((SeqPQ* first, SeqPQ* second) {
122                     if (first.length <= second.length) {
123                         dg(first);
124                     }
125                     else {
126                         dg(second);
127                     }
128                     
129                     return true;
130                 });
131             }
132             else {
133                 handle.apply(& dg);
134             }
135         }
136         else {
137             handle.apply(& dg);
138         }
139     }
140     
141     import std.typecons : Nullable;
142     /++
143     Attempts to remove an element from one of the InnerPQs up to maxRetries
144     times.
145     
146     The source is chosen randomly among the InnerPQs that are not being
147     operated on by other threads.
148     With low contention, the expected rank of the removed element is width. With
149     high contention, the expected average rank of concurrently-removed elements
150     is width. The expected maximum rank is O(width log(width)).
151     Returns: A removed element, or null if each attempt failed.
152     Progress: Lock-free if there are no more than width total concurrent threads,
153     otherwise deadlock-free.
154     +/
155     Nullable!T tryRemoveAny(const size_t maxRetries) shared {
156         size_t retries = 0;
157         
158         Nullable!T finalResult;
159         
160         bool dg(SeqPQ* sh) {
161             import std.typecons;
162             
163             auto result = sh.deleteMin();
164             
165             if (! result.isNull) {
166                 static if (EnableEmpty) {
167                     sh.versionNumber++;
168                 }
169                 
170                 finalResult = result;
171                 return true;
172             }
173             else {
174                 retries++;
175                 return retries > maxRetries;
176             }
177         }
178         
179         static if (TwoQueueBalanceRemove) {
180             if (handle.width > 1) {
181                 // heuristic: delete from heap with higher priority root
182                 handle.applyUntil_2((SeqPQ* first, SeqPQ* second) {
183                     auto peekFirst = first.peekMin();
184                     auto peekSecond = second.peekMin();
185                     
186                     if (peekSecond.isNull) {
187                         return dg(first);
188                     }
189                     else if (peekFirst.isNull) {
190                         return dg(second);
191                     }
192                     else if (peekFirst.get <= peekSecond.get) {
193                         return dg(first);
194                     }
195                     else {
196                         return dg(second);
197                     }
198                 });
199             }
200             else {
201                 handle.applyUntil(& dg);
202             }
203         }
204         else {
205             handle.applyUntil(& dg);
206         }
207         
208         return finalResult;
209     }
210     
211     /++
212     Removes an element from one of the InnerPQs.
213     
214     Slightly-more efficient than looping tryRemoveAny, but no way to bail out.
215     Warning: This method is blocking: it will run forever if empty.
216     Returns: A removed element.
217     Progress: Deadlock-free if there are no more than width total concurrent threads.
218     +/
219     T removeAny() shared {
220         T t;
221         
222         bool dg(SeqPQ* sh) {
223             auto result = sh.deleteMin();
224             
225             if (! result.isNull) {
226                 static if (EnableEmpty) {
227                     sh.versionNumber++;
228                 }
229                 
230                 t = result.get;
231                 return true;
232             }
233             else {
234                 return false;
235             }
236         }
237         
238         static if (TwoQueueBalanceRemove) {
239             if (handle.width > 1) {
240                 // heuristic: delete from heap with higher priority root
241                 handle.applyUntil_2((SeqPQ* first, SeqPQ* second) {
242                     auto peekFirst = first.peekMin();
243                     auto peekSecond = second.peekMin();
244                     
245                     if (peekSecond.isNull) {
246                         return dg(first);
247                     }
248                     else if (peekFirst.isNull) {
249                         return dg(second);
250                     }
251                     else if (peekFirst.get <= peekSecond.get) {
252                         return dg(first);
253                     }
254                     else {
255                         return dg(second);
256                     }
257                 });
258             }
259             else {
260                 handle.applyUntil(& dg);
261             }
262         }
263         else {
264             handle.applyUntil(& dg);
265         }
266         
267         return t;
268     }
269     
270     /++
271     Searches for an empty InnerPQ and swaps it with src. Useful for efficient
272     bulk initialization.
273     Warning: This method is blocking: it will run forever if no InnerPQ is empty.
274     +/
275     void swapEmptyWith(ref InnerPQ src) shared {
276         bool dg(SeqPQ* sh) {
277             if (! sh.empty) {
278                 return false;
279             }
280             //*sh = src;
281             //src = InnerPQ.init;
282             import std.algorithm : swap;
283             static if (EnableEmpty) {
284                 swap(sh.c, src);
285             
286                 sh.versionNumber++;
287             }
288             else {
289                 swap(*sh, src);
290             }
291             return true;
292         }
293         
294         handle.applyUntil(& dg);
295     }
296     
297     static if (EnableEmpty) {
298         /++
299         Tests whether or not there are elements in any InnerPQ.
300         
301         Requires the EnableEmpty flag to be set to use this method. The flag
302         incurs a width * size_t.sizeof overhead, and this method allocates
303         width * size_t.sizeof internally to perform a snapshot.
304         Returns: false if it finds any InnerPQ that is not empty, or true if it
305         can determine that all InnerPQs are simultaneously empty.
306         This value is relative to a linearization point during the call.
307         
308         Progress: Deadlock-free.
309         +/
310         bool empty() shared {
311             bool isEmpty = false;
312             size_t i;
313             import std.container : Array;
314             auto versions = Array!size_t();
315             versions.length = handle.width;
316             
317             bool dg(SeqPQ* sh) {
318                 if (! sh.empty) {
319                     return true;
320                 }
321                 else if (i < versions.length) {
322                     versions[i] = sh.versionNumber;
323                     i++;
324                     return false;
325                 }
326                 else if (i < 2 * versions.length) {
327                     const orig = i - versions.length;
328                     
329                     if (versions[orig] != sh.versionNumber) {
330                         return true; // not a snapshot - wasn't empty at some point
331                     }
332                     else {
333                         i++;
334                         return false;
335                     }
336                 }
337                 else {
338                     // 2-pass snapshot of all empty heaps!
339                     isEmpty = true;
340                     return true;
341                 }
342             }
343             
344             while(true) {
345                 i = 0;
346                 if (handle.tryApplyEachUntil(&dg)) {
347                     break;
348                 }
349             }
350             
351             return isEmpty;
352         }
353     }
354     
355     /++
356     Attempts to merge this PriorityQueue with another of the same width.
357     
358     After a successful merge, this and other will both be handles refering to
359     the same width InnerPQs, where the two original InnerPQs with the same
360     [0,width-1] index will be merged to create the InnerPQ of that index in the
361     result.
362     Returns:
363         MergeResult.Success if merge was successful.
364         
365         MergeResult.WereAlreadyEqual if merge was already performed.
366         
367         MergeResult.IdClash if two identical ids were found (should never allow to happen).
368         
369         MergeResult.Incompatible if widths of this and other are not equal.
370     
371     Progress: Lock-free.
372     +/
373     MergeResult merge(shared PriorityQueue other) shared {
374         auto a = handle.unshared;
375         auto b = other.handle.unshared;
376         
377         scope(success) {
378             handle.update();
379             other.handle.update();
380         }
381         
382         return a.mergeInto(b);
383     }
384 }
385 ///
386 unittest {
387     import std.range : iota;
388     import std.parallelism : TaskPool;
389     import core.sync.barrier : Barrier;
390     enum numThreads = 4;
391     auto b = new Barrier(numThreads);
392     auto tp = new TaskPool(numThreads-1);
393     // pq0 and pq1 are shared handles to priority queue data structures
394     alias A = TLRegionAllocator;
395     alias PQ = shared PriorityQueue!(int, A, SkewHeap!(int,A), Yes.EnableEmpty);
396     PQ pq0, pq1;
397     // for each thread in taskpool:
398     foreach(id ; tp.parallel(iota(numThreads), 1)) {
399         b.wait();
400         // initialize allocator and construct priority queues
401         A.enter(2 ^^ 12);
402         if (id == 0) {
403             pq0 = PQ(numThreads, 0);
404             pq1 = PQ(numThreads, 1);
405         }
406         b.wait();
407         // insert test values
408         pq0.insert(id);
409         pq1.insert(id + numThreads);
410         // attempt to merge pq0 and pq1
411         if (id % 2 > 0) {
412             pq0.merge(pq1);
413         }
414         else {
415             pq1.merge(pq0);
416         }
417         // remove all items
418         while (! pq0.empty) {
419             auto maybe_int = pq0.tryRemoveAny(10);
420         }
421         b.wait();
422         // cleanup allocator
423         A.exit();
424     }
425     // cleanup taskpool
426     tp.finish(true);
427 }