1 module mergearray.impl.bag_array; 2 3 // lockfree merge implementation 4 5 import core.atomic, core.sync.mutex, std.typecons, std.traits; 6 7 import mergearray.impl.header; 8 import mergearray.impl.handle_array; 9 10 //////////////////////////////////////////////////////////////////////////////// 11 12 debug(mergearray_printactions) { 13 import std.stdio : writefln; 14 } 15 16 private mixin template IntrusiveSList() { 17 private static shared(typeof(this)) nil_value, dummyNodeIndicator_value; 18 private enum nil = &nil_value; 19 private enum dummyNodeIndicator = &dummyNodeIndicator_value; 20 21 private shared(typeof(this))* iSListMyHead; 22 private shared(typeof(this))* iSListUpperNext; 23 } 24 25 /++ 26 An array-based implementation of a Bag for use by a Handle. 27 +/ 28 struct MergeArray(Obj) if (is(Obj == struct) && !is(Obj == shared)) { 29 /++ 30 The type of the contained sequential data structure. 31 +/ 32 alias ElementType = Obj; 33 34 /++ 35 The type of Handle which MergeArray expects to be underneath. 36 +/ 37 alias H = Handle!(shared MergeArray); 38 39 private: 40 enum RemoveDummyNodes = true; 41 42 align(64) 43 static struct Node { 44 CoarseLocked!Obj elem = void; 45 alias elem this; 46 47 mixin IntrusiveSList mergeList; // list of pending merged Nodes 48 49 Node* parent = null; 50 51 enum UseTailSkip = true; 52 53 static if (UseTailSkip) { 54 Node* iSListUpperSkip; 55 } 56 57 H myhandle; // the union-find node which this was created for 58 59 this(ObjArgs...)(H h, ObjArgs args) shared { 60 myhandle = h; 61 elem = shared CoarseLocked!Obj(Obj(args)); 62 63 iSListMyHead = nil; 64 iSListUpperNext = nil; 65 66 static if (UseTailSkip) { 67 iSListUpperSkip = nil; 68 } 69 } 70 71 // returns false if this node's bag has been 72 // merged as the source into another bag (linearized) 73 @property 74 bool wasMerged() shared { 75 if (! isOwned) { 76 return false; 77 } 78 else if (iSListMyHead.atomicLoad is null) { 79 return true; 80 } 81 else { 82 return myhandle.isFullyMerged; 83 } 84 } 85 86 @property 87 bool isOwned() shared { 88 return parent.atomicLoad !is null; 89 } 90 bool ownedBy(shared(Node)* n) shared { 91 auto p = parent.atomicLoad; 92 93 if (p is null) { 94 // cur was inserted into this list (and maybe others) 95 // but not marked thus try to claim it 96 if (my_cas(& parent, cast(shared(Node)*) null, n)) { 97 // was the thread which 98 debug(mergearray_printactions) writefln("%s: Made %x own %x", tid, n, &this); 99 100 return true; 101 } 102 else { 103 p = parent.atomicLoad; 104 } 105 } 106 107 return p is n; 108 } 109 110 //---------------------------------------------------------------------- 111 112 @property 113 bool isNormal() shared in { 114 assert(&this !is dummyNodeIndicator); 115 assert(&this !is null); 116 } body { 117 return &this !is nil; 118 } 119 120 bool isAppendableTail(shared(Node)* cur) shared { 121 return !cur.isNormal || !cur.ownedBy(&this); 122 } 123 124 auto listContinuesAfter(shared(Node)* cur) shared { 125 if (!isAppendableTail(cur)) { 126 auto next = cur.iSListUpperNext.atomicLoad; 127 128 assert(next !is null); 129 130 if (next !is dummyNodeIndicator) { 131 return next; 132 } 133 } 134 return null; 135 } 136 137 void listFindEnd(ref shared(Node)* prev, ref shared(Node)* cur) shared { 138 static if (UseTailSkip) { 139 // heuristic shortcut to end of list 140 auto skip = prev.iSListUpperSkip.atomicLoad; 141 142 while (skip !is nil) { 143 // prev is at least two nodes away from the end of the list 144 assert(! isAppendableTail(skip)); 145 146 auto further = skip.iSListUpperSkip.atomicLoad; 147 148 if (further is nil) { 149 // skip is near the end of the list, go to slow search 150 cur = prev.iSListUpperNext.atomicLoad; 151 break; 152 } 153 else { 154 // skip is in the middle of the skip list, compress and keep going 155 my_cas(& prev.iSListUpperSkip, skip, further); 156 prev = skip; 157 skip = further; 158 } 159 } 160 // got to the end of the skip list 161 } 162 163 slowSearch: 164 { 165 // O(n) search for the end of the list 166 while(true) { 167 if (auto next = listContinuesAfter(cur)) { 168 static if (UseTailSkip) { 169 if (prev.iSListUpperSkip.atomicLoad is nil) { 170 my_cas(& prev.iSListUpperSkip, nil, cur); 171 } 172 } 173 prev = cur; 174 cur = next; 175 } 176 else break; 177 } 178 } 179 } 180 181 //---------------------------------------------------------------------- 182 183 enum EvalMergeStatus { NoMergesLeft, SomeMergesLeft, LockFreeBailout } 184 static struct EvalMergeResult { 185 Flag!"ShouldBailout" bailout; 186 } 187 188 // loop over each node in my merge list and merge elems 189 // at the point of return, the merge list is empty 190 EvalMergeResult evalMerges_locked(Obj* rootElem, shared(Node)* newHead = nil, ptrdiff_t conditionOffset = 0) shared 191 in { assert(newHead is nil || newHead is null); } 192 body { 193 bool stop = false; 194 195 with(EvalMergeStatus) 196 while(! stop) { 197 shared(Node)* cur = mergeList.iSListMyHead.atomicLoad; 198 199 auto firstHead = cur; 200 201 stop = cur is newHead; 202 203 if (stop) break; 204 205 assert(cur !is null); 206 assert(cur !is dummyNodeIndicator); 207 208 if (null is listContinuesAfter(cur)) { 209 // remove cur from the list, which is thus empty 210 stop = my_cas(& iSListMyHead, cur, newHead); 211 212 debug(mergearray_printactions) writefln("%s: Cleared head %x of %x", tid, cur, &this); 213 214 continue; 215 } 216 217 if (cur.iSListMyHead.atomicLoad is null) { 218 // cur has been inserted then deleted before 219 stop = my_cas(& iSListMyHead, cur, newHead); 220 221 debug(mergearray_printactions) writefln("%s: Cleared head %x of reinserted %x", tid, cur, &this); 222 223 continue; 224 } 225 226 while(cur !is nil) { 227 assert(cur !is null); 228 assert(cur !is dummyNodeIndicator); 229 assert(cur.isOwned && cur.ownedBy(&this)); 230 231 assert(cur is iSListMyHead.atomicLoad); 232 assert(dummyNodeIndicator !is cur.iSListUpperNext.atomicLoad); 233 234 if (cur.iSListMyHead.atomicLoad is null) { 235 // "duplicate" node due to multiple threads succeeding at inserting cur into lists 236 // cur was previously deleted, so just need to remove from list 237 238 assert(false); 239 } 240 else { 241 // cur must be marked deleted and have its elem merged into this's elem 242 243 // ensure that cur is linearized to be inserted into this bag 244 cur.myhandle.unshared.ensureMerged(); 245 246 // at this point: 247 // cur.isFullyMerged = true (linearized) 248 // if any thread starts trying to insert to cur.mergeList, it will fail and try to insert into my list 249 250 bool bailout = false; 251 252 bool gotLock = cur.tryLocked((Obj* subElem) { 253 /+ 254 since cur is locked and cur's merge is linearized, its list 255 can only be appended to at most once by each other thread 256 +/ 257 EvalMergeResult subResult = cur.evalMerges_locked(subElem, null); 258 259 if (subResult.bailout) { 260 // bailed out because of lock contention 261 // subElem is not guaranteed to be in a linearized state, 262 // thus follow suit and propogate bailout 263 bailout = true; 264 return; 265 } 266 267 rootElem.mergeSteal(*subElem); 268 }); 269 270 if (bailout || !gotLock) { 271 // abort the operation 272 return EvalMergeResult(Yes.ShouldBailout); 273 } 274 } 275 276 assert(cur.iSListMyHead.atomicLoad is null); 277 278 size_t iter = 0; 279 280 removeCur: 281 282 iter++; 283 284 assert(cur is iSListMyHead.atomicLoad); 285 286 // cur is the head of mergeList, thus to remove, only need to redirect head to next 287 auto next = cur.iSListUpperNext.atomicLoad; 288 289 assert(next !is null); 290 291 assert(next !is dummyNodeIndicator); 292 293 if (! isAppendableTail(next)) { 294 // list has more elements 295 296 assert(iSListMyHead.atomicLoad is cur); 297 298 assert(next !is dummyNodeIndicator); 299 300 //iSListMyHead.atomicStore(next); 301 if (my_cas(& iSListMyHead, cur, next)) { 302 // successfully removed cur 303 debug(mergearray_printactions) writefln("%s: Deleted %x under %x, was head of nonempty list", tid, cur, &this); 304 305 cur = next; 306 } 307 else { 308 // should be impossible to get here ??? 309 debug(mergearray_printactions) writefln("%s: Failed to delete %x under %x, was head of nonempty list!!!!!!!!", tid, cur, &this); 310 311 assert(false, "MergeArray Error: Failed to delete head of non-empty list."); 312 } 313 } 314 else { 315 // list has no other elements, thus another thread may insert a new node at next 316 317 // set next to dummyNodeIndicator so other threads will try to replace cur 318 // rather than cur's next slot 319 320 assert(iSListMyHead.atomicLoad is cur); 321 322 assert(next !is dummyNodeIndicator); 323 324 if (my_cas(& cur.iSListUpperNext, next, dummyNodeIndicator)) { 325 // succeeded in marking cur as a removable node 326 327 debug(mergearray_printactions) { 328 writefln("%s: Marked %x as dummy under %x", tid, cur, &this); 329 } 330 331 static if (RemoveDummyNodes) { 332 // now try to remove cur from myhead if another thread hasn't yet 333 stop = my_cas(& iSListMyHead, cur, newHead); 334 335 // at this point, iSListMyHead must not be equal to cur and never will again 336 debug(mergearray_printactions) writefln("%s: Dummy %x was removed from under %x", tid, cur, &this); 337 } 338 else { 339 // let another thread clean it up 340 stop = true; 341 } 342 343 break; 344 } 345 else { 346 // another thread appended to cur, so remaining list is not empty anymore 347 348 349 goto removeCur; 350 } 351 } 352 } 353 } 354 return EvalMergeResult(No.ShouldBailout); 355 } 356 357 // resolves all lazy merges in myMergeHead which have been linearized 358 // then applies dg to this node's elem 359 // 360 enum EvalApplyResult { Finished, NextElem, NextBag } 361 362 auto tryEvalMergesAndApply(scope bool delegate(Obj*) dg, ptrdiff_t conditionOffset = 0) shared { 363 EvalApplyResult result = EvalApplyResult.NextElem; 364 365 bool gotLock = tryLocked((Obj* myElem) { 366 367 if (wasMerged) { 368 // should abandon this bag 369 result = EvalApplyResult.NextBag; 370 return; 371 } 372 373 // try to evaluate lazy merges which are linearized before 374 bool retry = evalMerges_locked(myElem, nil, conditionOffset).bailout; 375 376 if (retry) { 377 // detected contention thus retry on a different elem 378 return; 379 } 380 381 if (wasMerged) { 382 // should abandon this bag 383 result = EvalApplyResult.NextBag; 384 return; 385 } 386 387 // perform operation on current state of the elem 388 if ( dg(myElem) ) { 389 result = EvalApplyResult.Finished; 390 } 391 }); 392 393 return result; 394 } 395 } 396 397 Node[] nodes; 398 399 // randomly distributed infinite range over [0,width) 400 @property 401 auto randomIndexRange() shared { 402 import std.random, std.range; 403 size_t start = uniform!"[)"(cast(size_t) 0, width); 404 return iota(0, width).cycle().dropExactly(start); 405 } 406 407 public: 408 /++ 409 Construct a new MergeArray, allocating on the GC heap. 410 Params: 411 parent = The Handle which this is being created for. 412 width = The size of the internal Obj[]. 413 args = Arguments for the constructor of each Obj. 414 +/ 415 this(ObjArgs...)(H parent, size_t width, ObjArgs args) shared { 416 nodes = new shared Node[](width); 417 418 foreach(ref n ; nodes) { 419 n = shared Node(parent, args); 420 } 421 } 422 423 /++ 424 Returns: The total number of bytes that will be requested to the allocator 425 when creating a new instance of this type. 426 +/ 427 static size_t fixedAllocSize(size_t width) { 428 //pragma(msg, "MergeArray.sizeof + Node.sizeof * width = " ~ MergeArray.sizeof.stringof ~ " + " ~ Node.sizeof.stringof ~ " * width"); 429 430 return MergeArray.sizeof + Node.sizeof * width; 431 } 432 /++ 433 Static factory method using Alloc to allocate a new shared MergeArray. 434 Params: 435 parent = The Handle which this is being created for. 436 width = The size of the internal Obj[]. 437 args = Arguments for the constructor of each Obj. 438 Returns: A pointer to the new shared MergeArray. 439 +/ 440 static shared(MergeArray)* makeWith(Alloc, ObjArgs...)(H parent, size_t width, ObjArgs args) { 441 auto nodesptr = (cast(shared(Node)*) Alloc.alloc(Node.sizeof * width).ptr); 442 443 assert((cast(size_t)nodesptr) % shared(Node).alignof == 0); 444 445 auto self = cast(shared(MergeArray)*) Alloc.alloc(MergeArray.sizeof).ptr; 446 447 assert((cast(size_t)self) % shared(MergeArray).alignof == 0); 448 449 self.nodes = nodesptr[0 .. width]; 450 451 foreach(ref n ; self.nodes) { 452 n = shared Node(parent, args); 453 } 454 return self; 455 } 456 457 /++ 458 Returns: The number of Obj in the internal array. This is a constant. 459 +/ 460 @property 461 size_t width() shared { 462 return nodes.length; 463 } 464 465 /++ 466 Tests whether or not this is merge-compatible (has the same width) as other. 467 Returns: width == other.width 468 +/ 469 bool canMergeWith(shared(MergeArray)* other) shared { 470 return width == other.width; 471 } 472 473 /++ 474 Merges all Objs in this MergeArray into the MergeArray referenced by dest. 475 476 Each individual Obj merge is only performed once by any thread. 477 +/ 478 void mergePerElementInto(H dest) shared { 479 import std.algorithm, std.range; 480 481 auto indices = randomIndexRange().takeExactly(width); 482 483 shared(Node)* prevDummy = null; 484 485 getDestBag: 486 auto destBag = dest.findClosestBag(); 487 488 indicesLoop: 489 while(!indices.empty) { 490 immutable i = indices.front; 491 auto srcNode = & nodes[i]; 492 493 if (srcNode.wasMerged) { 494 return; 495 } 496 auto destNode = & destBag.nodes[i]; 497 498 enum nil = Node.nil; 499 enum dummyNodeIndicator = Node.dummyNodeIndicator; 500 501 //------------------------------------------------------------------ 502 503 bool tryInsert(shared(Node*)* loc, shared(Node)* expected, string reason) { 504 if (srcNode.isOwned) { 505 // another thread inserted srcNode into the list already 506 // go to next srcNode 507 indices.popFront(); 508 return true; // continue indicesLoop; 509 } 510 511 if (my_cas(loc, expected, srcNode)) { 512 // succeeded at inserting 513 514 debug(mergearray_printactions) { 515 writefln("%s: Inserted %x under %x at %x, was %x (%s)", tid, srcNode, destNode, loc, expected, reason); 516 } 517 518 if (srcNode.ownedBy(destNode)) { 519 // marked destNode as the "true" owner of srcNode 520 debug(mergearray_printactions) writefln("%s: %x owns newly-inserted %x", tid, destNode, srcNode); 521 } 522 else { 523 // srcNode was already claimed by another bag 524 525 // undo the double insert and go to next srcNode 526 my_cas(loc, srcNode, nil); 527 528 debug(mergearray_printactions) writefln("%s: Failed to make %x own %x", tid, destNode, srcNode); 529 } 530 531 // go to next srcNode 532 indices.popFront(); 533 return true; 534 } 535 536 return false; 537 } 538 539 //------------------------------------------------------------------ 540 curLoop: 541 542 shared(Node)* first = destNode.iSListMyHead.atomicLoad; 543 assert(first !is dummyNodeIndicator); 544 545 if (first is null) { 546 // destNode was linearized merged and removed 547 goto getDestBag; 548 } 549 550 if (first is srcNode) { 551 // found it already in the list, so don't 552 bool gotit = srcNode.ownedBy(destNode); 553 554 debug(mergearray_printactions) writefln("%s: Found %x at head of %x list (ownedBy == %s)", tid, srcNode, destNode, gotit); 555 556 indices.popFront(); 557 continue indicesLoop; 558 } 559 560 if (auto second = destNode.listContinuesAfter(first)) { 561 // must insert srcNode at the tail of nonempty list 562 563 shared(Node)* prev = first; 564 shared(Node)* cur = second; 565 566 recLoop: 567 destNode.listFindEnd(prev, cur); 568 569 if (destNode.isAppendableTail(cur)) { 570 // prevAddr can be rewritten from cur to srcNode 571 if (tryInsert(& prev.iSListUpperNext, cur, "tail")) { 572 // srcNode has been inserted somewhere 573 continue indicesLoop; 574 } 575 else { 576 // some other thread modified it first, so read it again 577 cur = prev.iSListUpperNext.atomicLoad; 578 if (cur !is dummyNodeIndicator) { 579 // prev now points to nil or some other node 580 // just try again 581 goto recLoop; 582 } 583 else { 584 // prev is now a dummy node, can't append to it 585 // must start/continue another merge list 586 } 587 } 588 } 589 else { 590 // cur is a dummy node, can't append to it 591 assert(cur.iSListUpperNext.atomicLoad is dummyNodeIndicator); 592 } 593 594 // list ends in a dummy node thus isn't valid for appending 595 596 auto newHead = destNode.iSListMyHead.atomicLoad; 597 598 if (newHead is first) { 599 // head hasn't changed since we read first and second, thus 600 // the dummy node is illegally at the end of a list, 601 // so first must have been deleted and re-inserted 602 603 assert(first.iSListMyHead.atomicLoad is null); 604 605 if (tryInsert(& destNode.iSListMyHead, first, "head")) { 606 // srcNode has been inserted somewhere 607 continue indicesLoop; 608 } 609 else { 610 // some other thread modified head 611 goto curLoop; 612 } 613 } 614 else { 615 // list has been updated so the dummy node may not be illegal 616 goto curLoop; 617 } 618 } 619 else { 620 // can insert srcNode directly at head of empty list 621 if (tryInsert(& destNode.iSListMyHead, first, "head")) { 622 // srcNode has been inserted somewhere 623 continue indicesLoop; 624 } 625 else { 626 // some other thread modified head 627 goto curLoop; 628 } 629 } 630 } 631 } 632 633 /+ 634 Applies dg to elements in the array until dg returns true (thus this does too) 635 or this bag is fully merged before then (thus this returns false) 636 +/ 637 bool tryApplyUntil(scope bool delegate(Obj*) dg) shared { 638 foreach(immutable i ; randomIndexRange) { 639 immutable result = nodes[i].tryEvalMergesAndApply(dg); 640 641 with(Node.EvalApplyResult) 642 final switch(result) { 643 case Finished: return true; 644 case NextElem: continue; 645 case NextBag : return false; 646 } 647 assert(false); 648 } 649 assert(false); 650 } 651 652 /++ 653 Applies dg to each element in cyclic fashion until it returns true or the 654 MergeArray is disrupted by a merge. 655 656 dg is applied to each element in turn, such that once dg is applied to an 657 element, it is not reapplied to the same element until width-1 other 658 applications of dg finished and all returned false. 659 660 If this is merged with another MergeArray with a lower Handle id, such that 661 the Objs in this are mergeSteal'd from, then the further applications of dg 662 will stop. 663 664 Warning: The applications of dg are NOT collectively atomic, only 665 individually atomic. 666 667 Progress: Deadlock-free if dg returns true after a finite number of 668 applications. 669 670 Returns: true if an application of dg returns true, or false if this was 671 merged into another MergeArray before dg could return true. 672 +/ 673 bool tryApplyEachUntil(scope bool delegate(Obj*) dg) shared { 674 with (Node.EvalApplyResult) 675 foreach(immutable i ; randomIndexRange) { 676 auto result = NextElem; 677 auto srcNode = & nodes[i]; 678 679 srcNode.locked((Obj* myElem) { 680 if (srcNode.wasMerged) { 681 // should abandon this bag 682 result = NextBag; 683 return; 684 } 685 686 // evaluate lazy merges which are linearized before 687 srcNode.evalMerges_locked(myElem); 688 689 if (srcNode.wasMerged) { 690 // should abandon this bag 691 result = NextBag; 692 return; 693 } 694 695 // perform operation on current state of the elem 696 if ( dg(myElem) ) { 697 result = Finished; 698 } 699 }); 700 701 final switch(result) { 702 case Finished: return true; // dg returned true 703 case NextElem: continue; // dg returned false 704 case NextBag : return false;// bag was Merged 705 } 706 assert(false); 707 } 708 assert(false); 709 } 710 } 711