1 module mergearray.priorityqueue; 2 3 import mergearray.impl.allocator; 4 import mergearray.seqpq.skewheap, mergearray.seqpq.pairingheap; 5 import std.typecons : Flag, Yes, No; 6 7 private struct Versioned(C) { 8 C c; 9 alias c this; 10 11 size_t versionNumber = 0; 12 } 13 14 /++ 15 A Relaxed Concurrent Mergeable Priority Queue. 16 17 PriorityQueue is a relaxed concurrent priority queue implementation which, under 18 the appropriate assumptions, supports lock-free insert and merge operations and 19 a deadlock-free remove-min. Any operation such that given p threads and width p, a 20 removed element is expected to have average rank p and worst-case rank 21 O(p log p). 22 23 Sequential Semantics: A PriorityQueue is an array of InnerPQ instances. 24 An insert and remove will randomly choose an index to atomically operate on, and 25 a PriorityQueue merge is an atomic, index-by-index merge of corresponding 26 InnerPQ instances. 27 +/ 28 struct PriorityQueue(T, 29 Alloc = GCAllocator, 30 InnerPQ = SkewHeap!(T, Alloc) /+ PairingHeap!(T, Alloc) +/, 31 Flag!"EnableEmpty" EnableEmpty = No.EnableEmpty) { 32 private: 33 34 static if (EnableEmpty) { 35 alias SeqPQ = Versioned!InnerPQ; 36 } 37 else { 38 alias SeqPQ = InnerPQ; 39 } 40 41 private import mergearray.impl.handle_array, mergearray.impl.bag_array; 42 alias Bag = shared(MergeArray!SeqPQ); 43 alias H = Handle!Bag; 44 45 import std.traits : hasMember; 46 enum TwoQueueBalanceInsert = false && hasMember!(H, "applyUntil_2"); 47 enum TwoQueueBalanceRemove = false && hasMember!(H, "applyUntil_2"); 48 49 H handle; 50 51 public: 52 /++ 53 The element type of the priority queue 54 +/ 55 alias ElementType = T; 56 57 /++ 58 The total number of bytes that will be requested to the allocator in order 59 to insert a single element (element size + overhead). 60 +/ 61 enum size_t PerElemAllocSize = SeqPQ.NodeSize; 62 63 /++ 64 Returns: The total number of bytes that will be requested to the allocator 65 when creating a new, empty instance of this type with the given width. 66 +/ 67 static size_t fixedAllocSize(size_t width) { 68 return H.fixedAllocSize(width); 69 } 70 71 /++ 72 Construct a new PriorityQueue, which is a handle to width InnerPQs allocated 73 using Alloc. 74 Params: 75 width = The number of InnerPQs. 76 handleId = A unique identifier among PriorityQueues with the same width. 77 +/ 78 this(size_t width, size_t handleId) shared { 79 static if (is(Alloc == GCAllocator)) { 80 handle = H.make(handleId, width); 81 } 82 else { 83 handle = H.makeWith!Alloc(handleId, width); 84 } 85 } 86 87 /++ 88 The width of the PriorityQueue. 89 90 Returns: The number of InnerPQs held inside. 91 +/ 92 @property 93 size_t width() shared { 94 return handle.width; 95 } 96 97 /++ 98 Inserts an element into one of the InnerPQs. 99 100 The destination is chosen randomly among the InnerPQs that are not being 101 operated on by other threads. Elements should be well-distributed under low 102 contention, and also under high contention if the distribution of inserted 103 elements is similar between threads. 104 Params: 105 t = The element to be inserted. 106 Progress: Lock-free if there are no more than width total concurrent threads, 107 otherwise deadlock-free. 108 +/ 109 void insert(T t) shared { 110 void dg(SeqPQ* sh) { 111 sh.insert(t); 112 113 static if (EnableEmpty) { 114 sh.versionNumber++; 115 } 116 } 117 118 static if (TwoQueueBalanceInsert) { 119 if (handle.width > 1) { 120 // heuristic: insert into heap with fewer elements 121 handle.applyUntil_2((SeqPQ* first, SeqPQ* second) { 122 if (first.length <= second.length) { 123 dg(first); 124 } 125 else { 126 dg(second); 127 } 128 129 return true; 130 }); 131 } 132 else { 133 handle.apply(& dg); 134 } 135 } 136 else { 137 handle.apply(& dg); 138 } 139 } 140 141 import std.typecons : Nullable; 142 /++ 143 Attempts to remove an element from one of the InnerPQs up to maxRetries 144 times. 145 146 The source is chosen randomly among the InnerPQs that are not being 147 operated on by other threads. 148 With low contention, the expected rank of the removed element is width. With 149 high contention, the expected average rank of concurrently-removed elements 150 is width. The expected maximum rank is O(width log(width)). 151 Returns: A removed element, or null if each attempt failed. 152 Progress: Lock-free if there are no more than width total concurrent threads, 153 otherwise deadlock-free. 154 +/ 155 Nullable!T tryRemoveAny(const size_t maxRetries) shared { 156 size_t retries = 0; 157 158 Nullable!T finalResult; 159 160 bool dg(SeqPQ* sh) { 161 import std.typecons; 162 163 auto result = sh.deleteMin(); 164 165 if (! result.isNull) { 166 static if (EnableEmpty) { 167 sh.versionNumber++; 168 } 169 170 finalResult = result; 171 return true; 172 } 173 else { 174 retries++; 175 return retries > maxRetries; 176 } 177 } 178 179 static if (TwoQueueBalanceRemove) { 180 if (handle.width > 1) { 181 // heuristic: delete from heap with higher priority root 182 handle.applyUntil_2((SeqPQ* first, SeqPQ* second) { 183 auto peekFirst = first.peekMin(); 184 auto peekSecond = second.peekMin(); 185 186 if (peekSecond.isNull) { 187 return dg(first); 188 } 189 else if (peekFirst.isNull) { 190 return dg(second); 191 } 192 else if (peekFirst.get <= peekSecond.get) { 193 return dg(first); 194 } 195 else { 196 return dg(second); 197 } 198 }); 199 } 200 else { 201 handle.applyUntil(& dg); 202 } 203 } 204 else { 205 handle.applyUntil(& dg); 206 } 207 208 return finalResult; 209 } 210 211 /++ 212 Removes an element from one of the InnerPQs. 213 214 Slightly-more efficient than looping tryRemoveAny, but no way to bail out. 215 Warning: This method is blocking: it will run forever if empty. 216 Returns: A removed element. 217 Progress: Deadlock-free if there are no more than width total concurrent threads. 218 +/ 219 T removeAny() shared { 220 T t; 221 222 bool dg(SeqPQ* sh) { 223 auto result = sh.deleteMin(); 224 225 if (! result.isNull) { 226 static if (EnableEmpty) { 227 sh.versionNumber++; 228 } 229 230 t = result.get; 231 return true; 232 } 233 else { 234 return false; 235 } 236 } 237 238 static if (TwoQueueBalanceRemove) { 239 if (handle.width > 1) { 240 // heuristic: delete from heap with higher priority root 241 handle.applyUntil_2((SeqPQ* first, SeqPQ* second) { 242 auto peekFirst = first.peekMin(); 243 auto peekSecond = second.peekMin(); 244 245 if (peekSecond.isNull) { 246 return dg(first); 247 } 248 else if (peekFirst.isNull) { 249 return dg(second); 250 } 251 else if (peekFirst.get <= peekSecond.get) { 252 return dg(first); 253 } 254 else { 255 return dg(second); 256 } 257 }); 258 } 259 else { 260 handle.applyUntil(& dg); 261 } 262 } 263 else { 264 handle.applyUntil(& dg); 265 } 266 267 return t; 268 } 269 270 /++ 271 Searches for an empty InnerPQ and swaps it with src. Useful for efficient 272 bulk initialization. 273 Warning: This method is blocking: it will run forever if no InnerPQ is empty. 274 +/ 275 void swapEmptyWith(ref InnerPQ src) shared { 276 bool dg(SeqPQ* sh) { 277 if (! sh.empty) { 278 return false; 279 } 280 //*sh = src; 281 //src = InnerPQ.init; 282 import std.algorithm : swap; 283 static if (EnableEmpty) { 284 swap(sh.c, src); 285 286 sh.versionNumber++; 287 } 288 else { 289 swap(*sh, src); 290 } 291 return true; 292 } 293 294 handle.applyUntil(& dg); 295 } 296 297 static if (EnableEmpty) { 298 /++ 299 Tests whether or not there are elements in any InnerPQ. 300 301 Requires the EnableEmpty flag to be set to use this method. The flag 302 incurs a width * size_t.sizeof overhead, and this method allocates 303 width * size_t.sizeof internally to perform a snapshot. 304 Returns: false if it finds any InnerPQ that is not empty, or true if it 305 can determine that all InnerPQs are simultaneously empty. 306 This value is relative to a linearization point during the call. 307 308 Progress: Deadlock-free. 309 +/ 310 bool empty() shared { 311 bool isEmpty = false; 312 size_t i; 313 import std.container : Array; 314 auto versions = Array!size_t(); 315 versions.length = handle.width; 316 317 bool dg(SeqPQ* sh) { 318 if (! sh.empty) { 319 return true; 320 } 321 else if (i < versions.length) { 322 versions[i] = sh.versionNumber; 323 i++; 324 return false; 325 } 326 else if (i < 2 * versions.length) { 327 const orig = i - versions.length; 328 329 if (versions[orig] != sh.versionNumber) { 330 return true; // not a snapshot - wasn't empty at some point 331 } 332 else { 333 i++; 334 return false; 335 } 336 } 337 else { 338 // 2-pass snapshot of all empty heaps! 339 isEmpty = true; 340 return true; 341 } 342 } 343 344 while(true) { 345 i = 0; 346 if (handle.tryApplyEachUntil(&dg)) { 347 break; 348 } 349 } 350 351 return isEmpty; 352 } 353 } 354 355 /++ 356 Attempts to merge this PriorityQueue with another of the same width. 357 358 After a successful merge, this and other will both be handles refering to 359 the same width InnerPQs, where the two original InnerPQs with the same 360 [0,width-1] index will be merged to create the InnerPQ of that index in the 361 result. 362 Returns: 363 MergeResult.Success if merge was successful. 364 365 MergeResult.WereAlreadyEqual if merge was already performed. 366 367 MergeResult.IdClash if two identical ids were found (should never allow to happen). 368 369 MergeResult.Incompatible if widths of this and other are not equal. 370 371 Progress: Lock-free. 372 +/ 373 MergeResult merge(shared PriorityQueue other) shared { 374 auto a = handle.unshared; 375 auto b = other.handle.unshared; 376 377 scope(success) { 378 handle.update(); 379 other.handle.update(); 380 } 381 382 return a.mergeInto(b); 383 } 384 } 385 /// 386 unittest { 387 import std.range : iota; 388 import std.parallelism : TaskPool; 389 import core.sync.barrier : Barrier; 390 enum numThreads = 4; 391 auto b = new Barrier(numThreads); 392 auto tp = new TaskPool(numThreads-1); 393 // pq0 and pq1 are shared handles to priority queue data structures 394 alias A = TLRegionAllocator; 395 alias PQ = shared PriorityQueue!(int, A, SkewHeap!(int,A), Yes.EnableEmpty); 396 PQ pq0, pq1; 397 // for each thread in taskpool: 398 foreach(id ; tp.parallel(iota(numThreads), 1)) { 399 b.wait(); 400 // initialize allocator and construct priority queues 401 A.enter(2 ^^ 12); 402 if (id == 0) { 403 pq0 = PQ(numThreads, 0); 404 pq1 = PQ(numThreads, 1); 405 } 406 b.wait(); 407 // insert test values 408 pq0.insert(id); 409 pq1.insert(id + numThreads); 410 // attempt to merge pq0 and pq1 411 if (id % 2 > 0) { 412 pq0.merge(pq1); 413 } 414 else { 415 pq1.merge(pq0); 416 } 417 // remove all items 418 while (! pq0.empty) { 419 auto maybe_int = pq0.tryRemoveAny(10); 420 } 421 b.wait(); 422 // cleanup allocator 423 A.exit(); 424 } 425 // cleanup taskpool 426 tp.finish(true); 427 }