1 module mergearray.impl.bag_array;
2 
3 // lockfree merge implementation
4 
5 import core.atomic, core.sync.mutex, std.typecons, std.traits;
6 
7 import mergearray.impl.header;
8 import mergearray.impl.handle_array;
9 
10 ////////////////////////////////////////////////////////////////////////////////
11 
12 debug(mergearray_printactions) {
13     import std.stdio : writefln;
14 }
15 
16 private mixin template IntrusiveSList() {
17     private static shared(typeof(this)) nil_value, dummyNodeIndicator_value;
18     private enum nil = &nil_value;
19     private enum dummyNodeIndicator = &dummyNodeIndicator_value;
20     
21     private shared(typeof(this))* iSListMyHead;
22     private shared(typeof(this))* iSListUpperNext;
23 }
24 
25 /++
26 An array-based implementation of a Bag for use by a Handle.
27 +/
28 struct MergeArray(Obj) if (is(Obj == struct) && !is(Obj == shared)) {
29     /++
30     The type of the contained sequential data structure.
31     +/
32     alias ElementType = Obj;
33     
34     /++
35     The type of Handle which MergeArray expects to be underneath.
36     +/
37     alias H = Handle!(shared MergeArray);
38     
39 private:
40     enum RemoveDummyNodes = true;
41     
42     align(64)
43     static struct Node {
44         CoarseLocked!Obj elem = void;
45         alias elem this;
46         
47         mixin IntrusiveSList mergeList; // list of pending merged Nodes
48         
49         Node* parent = null;
50         
51         enum UseTailSkip = true;
52         
53         static if (UseTailSkip) {
54             Node* iSListUpperSkip;
55         }
56         
57         H myhandle;   // the union-find node which this was created for
58         
59         this(ObjArgs...)(H h, ObjArgs args) shared {
60             myhandle = h;
61             elem = shared CoarseLocked!Obj(Obj(args));
62             
63             iSListMyHead = nil;
64             iSListUpperNext = nil;
65             
66             static if (UseTailSkip) {
67                 iSListUpperSkip = nil;
68             }
69         }
70         
71         // returns false if this node's bag has been
72         // merged as the source into another bag (linearized)
73         @property
74         bool wasMerged() shared {
75             if (! isOwned) {
76                 return false;
77             }
78             else if (iSListMyHead.atomicLoad is null) {
79                 return true;
80             }
81             else {
82                 return myhandle.isFullyMerged;
83             }
84         }
85         
86         @property
87         bool isOwned() shared {
88             return parent.atomicLoad !is null;
89         }
90         bool ownedBy(shared(Node)* n) shared {
91             auto p = parent.atomicLoad;
92             
93             if (p is null) {
94                 // cur was inserted into this list (and maybe others)
95                 // but not marked thus try to claim it
96                 if (my_cas(& parent, cast(shared(Node)*) null, n)) {
97                     // was the thread which
98                     debug(mergearray_printactions) writefln("%s: Made %x own %x", tid, n, &this);
99                     
100                     return true;
101                 }
102                 else {
103                     p = parent.atomicLoad;
104                 }
105             }
106             
107             return p is n;
108         }
109         
110         //----------------------------------------------------------------------
111         
112         @property
113         bool isNormal() shared in {
114             assert(&this !is dummyNodeIndicator);
115             assert(&this !is null);
116         } body {
117             return &this !is nil;
118         }
119         
120         bool isAppendableTail(shared(Node)* cur) shared {
121             return !cur.isNormal || !cur.ownedBy(&this);
122         }
123         
124         auto listContinuesAfter(shared(Node)* cur) shared {
125             if (!isAppendableTail(cur)) {
126                 auto next = cur.iSListUpperNext.atomicLoad;
127                 
128                 assert(next !is null);
129                 
130                 if (next !is dummyNodeIndicator) {
131                     return next;
132                 }
133             }
134             return null;
135         }
136         
137         void listFindEnd(ref shared(Node)* prev, ref shared(Node)* cur) shared {
138             static if (UseTailSkip) {
139                 // heuristic shortcut to end of list
140                 auto skip = prev.iSListUpperSkip.atomicLoad;
141                 
142                 while (skip !is nil) {
143                     // prev is at least two nodes away from the end of the list
144                     assert(! isAppendableTail(skip));
145                     
146                     auto further = skip.iSListUpperSkip.atomicLoad;
147                     
148                     if (further is nil) {
149                         // skip is near the end of the list, go to slow search
150                         cur = prev.iSListUpperNext.atomicLoad;
151                         break;
152                     }
153                     else {
154                         // skip is in the middle of the skip list, compress and keep going
155                         my_cas(& prev.iSListUpperSkip, skip, further);
156                         prev = skip;
157                         skip = further;
158                     }
159                 }
160                 // got to the end of the skip list
161             }
162             
163         slowSearch:
164             {
165                 // O(n) search for the end of the list
166                 while(true) {
167                     if (auto next = listContinuesAfter(cur)) {
168                         static if (UseTailSkip) {
169                             if (prev.iSListUpperSkip.atomicLoad is nil) {
170                                 my_cas(& prev.iSListUpperSkip, nil, cur);
171                             }
172                         }
173                         prev = cur;
174                         cur = next;
175                     }
176                     else break;
177                 }
178             }
179         }
180         
181         //----------------------------------------------------------------------
182         
183         enum EvalMergeStatus { NoMergesLeft, SomeMergesLeft, LockFreeBailout }
184         static struct EvalMergeResult {
185             Flag!"ShouldBailout" bailout;
186         }
187         
188         // loop over each node in my merge list and merge elems
189         // at the point of return, the merge list is empty
190         EvalMergeResult evalMerges_locked(Obj* rootElem, shared(Node)* newHead = nil, ptrdiff_t conditionOffset = 0) shared
191         in { assert(newHead is nil || newHead is null); }
192         body {
193             bool stop = false;
194             
195             with(EvalMergeStatus)
196             while(! stop) {
197                 shared(Node)* cur = mergeList.iSListMyHead.atomicLoad;
198                 
199                 auto firstHead = cur;
200                 
201                 stop = cur is newHead;
202                 
203                 if (stop) break;
204                 
205                 assert(cur !is null);
206                 assert(cur !is dummyNodeIndicator);
207                 
208                 if (null is listContinuesAfter(cur)) {
209                     // remove cur from the list, which is thus empty
210                     stop = my_cas(& iSListMyHead, cur, newHead);
211                     
212                     debug(mergearray_printactions) writefln("%s: Cleared head %x of %x", tid, cur, &this);
213                     
214                     continue;
215                 }
216                 
217                 if (cur.iSListMyHead.atomicLoad is null) {
218                     // cur has been inserted then deleted before
219                     stop = my_cas(& iSListMyHead, cur, newHead);
220                     
221                     debug(mergearray_printactions) writefln("%s: Cleared head %x of reinserted %x", tid, cur, &this);
222                     
223                     continue;
224                 }
225                 
226                 while(cur !is nil) {
227                     assert(cur !is null);
228                     assert(cur !is dummyNodeIndicator);
229                     assert(cur.isOwned && cur.ownedBy(&this));
230                     
231                     assert(cur is iSListMyHead.atomicLoad);
232                     assert(dummyNodeIndicator !is cur.iSListUpperNext.atomicLoad);
233                     
234                     if (cur.iSListMyHead.atomicLoad is null) {
235                         // "duplicate" node due to multiple threads succeeding at inserting cur into lists
236                         // cur was previously deleted, so just need to remove from list
237                         
238                         assert(false);
239                     }
240                     else {
241                         // cur must be marked deleted and have its elem merged into this's elem
242                         
243                         // ensure that cur is linearized to be inserted into this bag
244                         cur.myhandle.unshared.ensureMerged();
245                         
246                         // at this point:
247                         // cur.isFullyMerged = true (linearized)
248                         // if any thread starts trying to insert to cur.mergeList, it will fail and try to insert into my list
249                         
250                         bool bailout = false;
251                         
252                         bool gotLock = cur.tryLocked((Obj* subElem) {
253                             /+
254                             since cur is locked and cur's merge is linearized, its list
255                             can only be appended to at most once by each other thread
256                             +/
257                             EvalMergeResult subResult = cur.evalMerges_locked(subElem, null);
258                             
259                             if (subResult.bailout) {
260                                 // bailed out because of lock contention
261                                 // subElem is not guaranteed to be in a linearized state,
262                                 // thus follow suit and propogate bailout
263                                 bailout = true;
264                                 return;
265                             }
266                             
267                             rootElem.mergeSteal(*subElem);
268                         });
269                         
270                         if (bailout || !gotLock) {
271                             // abort the operation
272                             return EvalMergeResult(Yes.ShouldBailout);
273                         }
274                     }
275                     
276                     assert(cur.iSListMyHead.atomicLoad is null);
277                     
278                     size_t iter = 0;
279                     
280                 removeCur:
281                     
282                     iter++;
283                     
284                     assert(cur is iSListMyHead.atomicLoad);
285                     
286                     // cur is the head of mergeList, thus to remove, only need to redirect head to next
287                     auto next = cur.iSListUpperNext.atomicLoad;
288                     
289                     assert(next !is null);
290                     
291                     assert(next !is dummyNodeIndicator);
292                     
293                     if (! isAppendableTail(next)) {
294                         // list has more elements
295                         
296                         assert(iSListMyHead.atomicLoad is cur);
297                         
298                         assert(next !is dummyNodeIndicator);
299                         
300                         //iSListMyHead.atomicStore(next);
301                         if (my_cas(& iSListMyHead, cur, next)) {
302                             // successfully removed cur
303                             debug(mergearray_printactions) writefln("%s: Deleted %x under %x, was head of nonempty list", tid, cur, &this);
304                             
305                             cur = next;
306                         }
307                         else {
308                             // should be impossible to get here ???
309                             debug(mergearray_printactions) writefln("%s: Failed to delete %x under %x, was head of nonempty list!!!!!!!!", tid, cur, &this);
310                             
311                             assert(false, "MergeArray Error: Failed to delete head of non-empty list.");
312                         }
313                     }
314                     else {
315                         // list has no other elements, thus another thread may insert a new node at next
316                         
317                         // set next to dummyNodeIndicator so other threads will try to replace cur
318                         // rather than cur's next slot
319                         
320                         assert(iSListMyHead.atomicLoad is cur);
321                         
322                         assert(next !is dummyNodeIndicator);
323                         
324                         if (my_cas(& cur.iSListUpperNext, next, dummyNodeIndicator)) {
325                             // succeeded in marking cur as a removable node
326                             
327                             debug(mergearray_printactions) {
328                                 writefln("%s: Marked %x as dummy under %x", tid, cur, &this);
329                             }
330                             
331                             static if (RemoveDummyNodes) {
332                                 // now try to remove cur from myhead if another thread hasn't yet
333                                 stop = my_cas(& iSListMyHead, cur, newHead);
334                                 
335                                 // at this point, iSListMyHead must not be equal to cur and never will again
336                                 debug(mergearray_printactions) writefln("%s: Dummy %x was removed from under %x", tid, cur, &this);
337                             }
338                             else {
339                                 // let another thread clean it up
340                                 stop = true;
341                             }
342                             
343                             break;
344                         }
345                         else {
346                             // another thread appended to cur, so remaining list is not empty anymore
347                             
348                             
349                             goto removeCur;
350                         }
351                     }
352                 }
353             }
354             return EvalMergeResult(No.ShouldBailout);
355         }
356         
357         // resolves all lazy merges in myMergeHead which have been linearized
358         // then applies dg to this node's elem
359         //
360         enum EvalApplyResult { Finished, NextElem, NextBag }
361         
362         auto tryEvalMergesAndApply(scope bool delegate(Obj*) dg, ptrdiff_t conditionOffset = 0) shared {
363             EvalApplyResult result = EvalApplyResult.NextElem;
364             
365             bool gotLock = tryLocked((Obj* myElem) {
366                 
367                 if (wasMerged) {
368                     // should abandon this bag
369                     result = EvalApplyResult.NextBag;
370                     return;
371                 }
372                 
373                 // try to evaluate lazy merges which are linearized before
374                 bool retry = evalMerges_locked(myElem, nil, conditionOffset).bailout;
375                 
376                 if (retry) {
377                     // detected contention thus retry on a different elem
378                     return;
379                 }
380                 
381                 if (wasMerged) {
382                     // should abandon this bag
383                     result = EvalApplyResult.NextBag;
384                     return;
385                 }
386                 
387                 // perform operation on current state of the elem
388                 if ( dg(myElem) ) {
389                     result = EvalApplyResult.Finished;
390                 }
391             });
392             
393             return result;
394         }
395     }
396     
397     Node[] nodes;
398     
399     // randomly distributed infinite range over [0,width)
400     @property
401     auto randomIndexRange() shared {
402         import std.random, std.range;
403         size_t start = uniform!"[)"(cast(size_t) 0, width);
404         return iota(0, width).cycle().dropExactly(start);
405     }
406     
407 public:
408     /++
409     Construct a new MergeArray, allocating on the GC heap.
410     Params:
411     parent = The Handle which this is being created for.
412     width = The size of the internal Obj[].
413     args = Arguments for the constructor of each Obj.
414     +/
415     this(ObjArgs...)(H parent, size_t width, ObjArgs args) shared {
416         nodes = new shared Node[](width);
417         
418         foreach(ref n ; nodes) {
419             n = shared Node(parent, args);
420         }
421     }
422     
423     /++
424     Returns: The total number of bytes that will be requested to the allocator
425     when creating a new instance of this type.
426     +/
427     static size_t fixedAllocSize(size_t width) {
428         //pragma(msg, "MergeArray.sizeof + Node.sizeof * width = " ~ MergeArray.sizeof.stringof ~ " + " ~ Node.sizeof.stringof ~ " * width");
429         
430         return MergeArray.sizeof + Node.sizeof * width;
431     }
432     /++
433     Static factory method using Alloc to allocate a new shared MergeArray.
434     Params:
435     parent = The Handle which this is being created for.
436     width = The size of the internal Obj[].
437     args = Arguments for the constructor of each Obj.
438     Returns: A pointer to the new shared MergeArray.
439     +/
440     static shared(MergeArray)* makeWith(Alloc, ObjArgs...)(H parent, size_t width, ObjArgs args) {
441         auto nodesptr = (cast(shared(Node)*) Alloc.alloc(Node.sizeof * width).ptr);
442         
443         assert((cast(size_t)nodesptr) % shared(Node).alignof == 0);
444         
445         auto self = cast(shared(MergeArray)*) Alloc.alloc(MergeArray.sizeof).ptr;
446         
447         assert((cast(size_t)self) % shared(MergeArray).alignof == 0);
448         
449         self.nodes = nodesptr[0 .. width];
450         
451         foreach(ref n ; self.nodes) {
452             n = shared Node(parent, args);
453         }
454         return self;
455     }
456     
457     /++
458     Returns: The number of Obj in the internal array. This is a constant.
459     +/
460     @property
461     size_t width() shared {
462         return nodes.length;
463     }
464     
465     /++
466     Tests whether or not this is merge-compatible (has the same width) as other.
467     Returns: width == other.width
468     +/
469     bool canMergeWith(shared(MergeArray)* other) shared {
470         return width == other.width;
471     }
472     
473     /++
474     Merges all Objs in this MergeArray into the MergeArray referenced by dest.
475     
476     Each individual Obj merge is only performed once by any thread.
477     +/
478     void mergePerElementInto(H dest) shared {
479         import std.algorithm, std.range;
480         
481         auto indices = randomIndexRange().takeExactly(width);
482         
483         shared(Node)* prevDummy = null;
484         
485     getDestBag:
486         auto destBag = dest.findClosestBag();
487         
488     indicesLoop:
489         while(!indices.empty) {
490             immutable i = indices.front;
491             auto srcNode = & nodes[i];
492             
493             if (srcNode.wasMerged) {
494                 return;
495             }
496             auto destNode = & destBag.nodes[i];
497             
498             enum nil = Node.nil;
499             enum dummyNodeIndicator = Node.dummyNodeIndicator;
500             
501             //------------------------------------------------------------------
502             
503             bool tryInsert(shared(Node*)* loc, shared(Node)* expected, string reason) {
504                 if (srcNode.isOwned) {
505                     // another thread inserted srcNode into the list already
506                     // go to next srcNode
507                     indices.popFront();
508                     return true; // continue indicesLoop;
509                 }
510                 
511                 if (my_cas(loc, expected, srcNode)) {
512                     // succeeded at inserting
513                     
514                     debug(mergearray_printactions) {
515                         writefln("%s: Inserted %x under %x at %x, was %x (%s)", tid, srcNode, destNode, loc, expected, reason);
516                     }
517                     
518                     if (srcNode.ownedBy(destNode)) {
519                         // marked destNode as the "true" owner of srcNode
520                         debug(mergearray_printactions) writefln("%s: %x owns newly-inserted %x", tid, destNode, srcNode);
521                     }
522                     else {
523                         // srcNode was already claimed by another bag
524                         
525                         // undo the double insert and go to next srcNode
526                         my_cas(loc, srcNode, nil);
527                         
528                         debug(mergearray_printactions) writefln("%s: Failed to make %x own %x", tid, destNode, srcNode);
529                     }
530                     
531                     // go to next srcNode
532                     indices.popFront();
533                     return true;
534                 }
535                 
536                 return false;
537             }
538             
539             //------------------------------------------------------------------
540         curLoop:
541             
542             shared(Node)* first = destNode.iSListMyHead.atomicLoad;
543             assert(first !is dummyNodeIndicator);
544             
545             if (first is null) {
546                 // destNode was linearized merged and removed
547                 goto getDestBag;
548             }
549             
550             if (first is srcNode) {
551                 // found it already in the list, so don't 
552                 bool gotit = srcNode.ownedBy(destNode);
553                 
554                 debug(mergearray_printactions) writefln("%s: Found %x at head of %x list (ownedBy == %s)", tid, srcNode, destNode, gotit);
555                 
556                 indices.popFront();
557                 continue indicesLoop;
558             }
559             
560             if (auto second = destNode.listContinuesAfter(first)) {
561                 // must insert srcNode at the tail of nonempty list
562                 
563                 shared(Node)* prev = first;
564                 shared(Node)* cur = second;
565                 
566             recLoop:
567                 destNode.listFindEnd(prev, cur);
568                 
569                 if (destNode.isAppendableTail(cur)) {
570                     // prevAddr can be rewritten from cur to srcNode
571                     if (tryInsert(& prev.iSListUpperNext, cur, "tail")) {
572                         // srcNode has been inserted somewhere
573                         continue indicesLoop;
574                     }
575                     else {
576                         // some other thread modified it first, so read it again
577                         cur = prev.iSListUpperNext.atomicLoad;
578                         if (cur !is dummyNodeIndicator) {
579                             // prev now points to nil or some other node
580                             // just try again
581                             goto recLoop;
582                         }
583                         else {
584                             // prev is now a dummy node, can't append to it
585                             // must start/continue another merge list
586                         }
587                     }
588                 }
589                 else {
590                     // cur is a dummy node, can't append to it
591                     assert(cur.iSListUpperNext.atomicLoad is dummyNodeIndicator);
592                 }
593                 
594                 // list ends in a dummy node thus isn't valid for appending
595                 
596                 auto newHead = destNode.iSListMyHead.atomicLoad;
597                 
598                 if (newHead is first) {
599                     // head hasn't changed since we read first and second, thus
600                     // the dummy node is illegally at the end of a list,
601                     // so first must have been deleted and re-inserted
602                     
603                     assert(first.iSListMyHead.atomicLoad is null);
604                     
605                     if (tryInsert(& destNode.iSListMyHead, first, "head")) {
606                         // srcNode has been inserted somewhere
607                         continue indicesLoop;
608                     }
609                     else {
610                         // some other thread modified head
611                         goto curLoop;
612                     }
613                 }
614                 else {
615                     // list has been updated so the dummy node may not be illegal
616                     goto curLoop;
617                 }
618             }
619             else {
620                 // can insert srcNode directly at head of empty list
621                 if (tryInsert(& destNode.iSListMyHead, first, "head")) {
622                     // srcNode has been inserted somewhere
623                     continue indicesLoop;
624                 }
625                 else {
626                     // some other thread modified head
627                     goto curLoop;
628                 }
629             }
630         }
631     }
632     
633     /+
634     Applies dg to elements in the array until dg returns true (thus this does too)
635     or this bag is fully merged before then (thus this returns false)
636     +/
637     bool tryApplyUntil(scope bool delegate(Obj*) dg) shared {
638         foreach(immutable i ; randomIndexRange) {
639             immutable result = nodes[i].tryEvalMergesAndApply(dg);
640             
641             with(Node.EvalApplyResult)
642             final switch(result) {
643                 case Finished: return true;
644                 case NextElem: continue;
645                 case NextBag : return false;
646             }
647             assert(false);
648         }
649         assert(false);
650     }
651     
652     /++
653     Applies dg to each element in cyclic fashion until it returns true or the
654     MergeArray is disrupted by a merge.
655     
656     dg is applied to each element in turn, such that once dg is applied to an
657     element, it is not reapplied to the same element until width-1 other
658     applications of dg finished and all returned false.
659     
660     If this is merged with another MergeArray with a lower Handle id, such that
661     the Objs in this are mergeSteal'd from, then the further applications of dg
662     will stop.
663     
664     Warning: The applications of dg are NOT collectively atomic, only
665     individually atomic.
666     
667     Progress: Deadlock-free if dg returns true after a finite number of
668     applications.
669     
670     Returns: true if an application of dg returns true, or false if this was
671     merged into another MergeArray before dg could return true.
672     +/
673     bool tryApplyEachUntil(scope bool delegate(Obj*) dg) shared {
674         with (Node.EvalApplyResult)
675         foreach(immutable i ; randomIndexRange) {
676             auto result = NextElem;
677             auto srcNode = & nodes[i];
678             
679             srcNode.locked((Obj* myElem) {
680                 if (srcNode.wasMerged) {
681                     // should abandon this bag
682                     result = NextBag;
683                     return;
684                 }
685                 
686                 // evaluate lazy merges which are linearized before
687                 srcNode.evalMerges_locked(myElem);
688                 
689                 if (srcNode.wasMerged) {
690                     // should abandon this bag
691                     result = NextBag;
692                     return;
693                 }
694                 
695                 // perform operation on current state of the elem
696                 if ( dg(myElem) ) {
697                     result = Finished;
698                 }
699             });
700             
701             final switch(result) {
702                 case Finished: return true; // dg returned true
703                 case NextElem: continue;    // dg returned false
704                 case NextBag : return false;// bag was Merged
705             }
706             assert(false);
707         }
708         assert(false);
709     }
710 }
711