1 module mergearray.impl.handle_array;
2 
3 import core.atomic, core.sync.mutex, std.typecons, std.traits, std.exception;
4 
5 import mergearray.impl.header;
6 
7 /++
8 Possibilities for the result of a merge operation.
9 +/
10 enum MergeResult {
11     /// The merge was performed successfully.
12     Success,
13     /// Both objects are already equivalent, so nothing was done.
14     WereAlreadyEqual,
15     /// ERROR: Both objects were erroneously given the same id, so nothing was done.
16     IdClash,
17     /// ERROR: The objects are not able to be merged (eg. not the same width), so nothing was done.
18     Incompatible
19 }
20 
21 /++
22 A handle to a Union-Find node, referencing a mergeable collection of type Bag.
23 
24 A Handle is the outer interface of the MergeArray framework for building
25 relaxed concurrent mergeable data structures. Such a data structure should wrap
26 a Handle to a Bag type which manages access and merging of sequential data
27 structures of type Bag.ElementType.
28 +/
29 struct Handle(Bag) if (is(Bag == shared) && is(Bag == struct)) {
30     /++
31     The type of the sequential data structures which applyUntil operates on.
32     +/
33     alias Obj = Bag.ElementType;
34     
35 private:
36     align(32)
37     static shared struct Node {
38         Bag* bag = null;   // if not null then, as long as next is null, this won't change
39         Node* next = null; // monotonic: null -> non-null high id -> non-null lower id -> ...
40         immutable size_t id;  // if next is not null then next.id <= id
41     }
42     
43     shared(Node)* ptr = null;
44     
45     /+
46     Moves this to point to the end of its chain of handles
47     Any incomplete merges along the way are linearized before this
48     +/
49     void descend_merging()
50     in {
51         assert(ptr !is null);
52     }
53     out {
54         assert(ptr !is null);
55     }
56     body {
57         while(true) {
58             findClosestBag();
59             auto next = ptr.next.atomicLoad;
60             
61             if (next is null) {
62                 // found the bottom (for now)
63                 return;
64             }
65             else {
66                 // found (potentially) unfinished work
67                 ensureMergedInto(Handle(next));
68             }
69         }
70         assert(false);
71     }
72     
73     /+
74     Tries to point higher to lower
75     +/
76     static bool tryLink(shared(Node)* higher, shared(Node)* lower)
77     in {
78         assert(higher !is null);
79         assert(lower !is null);
80         assert(lower.id < higher.id);
81     }
82     body {
83         return my_cas(& higher.next, cast(shared(Node)*) null, lower);
84     }
85     
86     /+
87     Tries to merge the node chains of the input handles
88     If the source has pre-existing links, then it is resolved before a new link is made
89     +/
90     static auto tryUnion(ref Handle a, ref Handle b)
91     in {
92         assert(a.ptr !is null);
93         assert(b.ptr !is null);
94     }
95     body {
96         struct Result {
97             bool merged;
98             Handle dest;
99         }
100         // work on local copy
101         while(true) {
102             a.descend_merging();
103             b.descend_merging();
104             
105             auto an = a.id;
106             auto bn = b.id;
107             
108             if (a.ptr is b.ptr) {
109                 // a and b are the same already
110                 return Result(true, Handle(null));
111             }
112             else if (an == bn && a.ptr.bag.atomicLoad !is null) {
113                 // a and b alias to bags of the same id (linearized in 2nd descend_merging)
114                 return Result(false, Handle(null));
115             }
116             else if (an < bn && tryLink(b.ptr, a.ptr)) {
117                 // succeeded at linking b --> a
118                 return Result(true, a);
119             }
120             else if (an > bn && tryLink(a.ptr, b.ptr)) {
121                 // succeeded at linking a --> b
122                 return Result(true, b);
123             }
124         }
125         
126         assert(false);
127     }
128     
129     /+
130     Lazily merge all of this bag's elements into next's then linearize them all at once
131     +/
132     void ensureMergedInto(Handle next)
133     in {
134         assert(ptr !is null);
135         assert(next.ptr !is null);
136         assert(this.id > next.id);
137     }
138     body {
139         shared(Bag)* bagSrc = ptr.bag.atomicLoad;
140         
141         // check that no one else finished for us
142         if (bagSrc is null) {
143             return;
144         }
145         
146         bagSrc.mergePerElementInto(next);
147         
148         // everything is merged!
149         // Linearize movement of heaps by nulling reference
150         ptr.bag.atomicStore(null);
151     }
152     
153     /+
154     Attempts to change *loc to dest while *loc has a higher id than dest
155     +/
156     static void ensureMovedDownTo(shared(Node*)* loc, shared(Node)* dest)
157     in {
158         assert(loc !is null);
159         assert(dest !is null);
160     }
161     out {
162         assert(loc !is null);
163     }
164     body {
165         immutable id = dest.id;
166         
167         while(true) {
168             shared(Node)* cur = atomicLoad(*loc);
169             
170             assert(cur !is null);
171             
172             if (cur is dest || cur.id < id) {
173                 // already done
174                 break;
175             }
176             else if (my_cas(loc, cur, dest)) {
177                 // succeeded in moving downwards
178                 break;
179             }
180             else {
181                 // another thread moved loc down, don't know yet if far enough
182             }
183         }
184     }
185     
186     /+
187     Moves ptr to point to the closest node with a non-null bag and returns the bag found there
188     +/
189     auto findClosestBag_noCompress()
190     in { assert(ptr !is null); }
191     out(bag) { assert(ptr !is null); assert(bag !is null); }
192     body {
193         while(true) {
194             assert(ptr !is null);
195             
196             shared(Bag)* curBag = ptr.bag.atomicLoad; // last exec of this line == linearization point ???
197             
198             if (curBag !is null) {
199                 // found a bag!
200                 return curBag;
201             }
202             else {
203                 // cur's bag was merged into a descendent
204                 shared(Node)* next = ptr.next.atomicLoad; // ok because bag's atomicLoad
205                 
206                 // thus there must exist a descendent
207                 assert(next !is null);
208                 assert(ptr.id > next.id);
209                 
210                 // go to next node
211                 ptr = next;
212             }
213         }
214         assert(false);
215     }
216     
217 package:
218     void ensureMerged() {
219         if (ptr is null) {
220             return;
221         }
222         auto next = ptr.next.atomicLoad;
223         
224         if (next !is null) {
225             ensureMergedInto(Handle(next));
226         }
227     }
228     
229     /+
230     finds closest bag AND move down ptr
231     +/
232     auto findClosestBag()
233     in { assert(ptr !is null); }
234     out(bag) { assert(ptr !is null); assert(bag !is null); }
235     body {
236         shared(Node)* start = ptr;
237         
238         auto bag = findClosestBag_noCompress();
239         
240         assert(ptr !is null);
241         
242         while(start.id > ptr.id) {
243             shared(Node)* next = start.next.atomicLoad;
244             
245             ensureMovedDownTo(&start.next, ptr);
246             
247             start = next;
248         }
249         
250         return bag;
251     }
252     
253     /+
254     Returns false if handle points directly to an unmerged bag
255     Returns true if handle is null or pointed to a merged bag (thus set to null)
256     +/
257     @property
258     bool isFullyMerged() shared {
259         auto cur = ptr.atomicLoad;
260         if (cur is null) {
261             return true;
262         }
263         else if (null is cur.bag.atomicLoad) {
264             ptr.atomicStore(null);
265             return true;
266         }
267         else {
268             return false;
269         }
270     }
271     
272     /+
273     Checks whether the handles' bags are mergable
274     Assumes that if any two bags are mergable, then other merges will preserve that
275     +/
276     bool canMergeWith(ref Handle other) {
277         return findClosestBag().canMergeWith( other.findClosestBag() );
278     }
279     
280 public:
281     /++
282     GC allocates a new Node and returns a Handle to it.
283     Params:
284     id = Unique identifier for the given Bag
285     bagArgs = Arguments to the Bag constructor.
286     Returns: A Handle to the created Node refering to the new Bag.
287     +/
288     static Handle make(BagArgs...)(size_t id, BagArgs bagArgs) {
289         auto n = new shared Node(null, null, id);
290         n.bag = new Bag(Handle(n), bagArgs);
291         return Handle(n);
292     }
293     
294     /++
295     Returns: The total number of bytes that will be requested to the allocator
296     when creating a new, empty instance of this type. The Args a are forwarded
297     to Bag.fixedAllocSize to determine the result.
298     +/
299     static size_t fixedAllocSize(Args...)(Args a) {
300         //pragma(msg, "Node.sizeof = " ~ Node.sizeof.stringof);
301         
302         return Node.sizeof + Bag.fixedAllocSize(a);
303     }
304     /++
305     Static factory method which uses Alloc to allocate a new Node and returns a
306     Handle to it.
307     Params:
308     id = Unique identifier for the given Bag
309     bagArgs = Arguments to the Bag constructor.
310     Returns: A Handle to the created Node refering to the new Bag.
311     +/
312     static Handle makeWith(Alloc, BagArgs...)(size_t id, BagArgs bagArgs) {
313         void* nMem = Alloc.alloc(Node.sizeof).ptr;
314         
315         assert((cast(size_t)nMem) % Node.alignof == 0);
316         
317         // id is immutable thus must assign before casting to Node
318         * cast(size_t*)(nMem + Node.id.offsetof) = id;
319         
320         auto n = cast(shared(Node)*) nMem;
321         assert(n.id == id);
322         n.next = null;
323         n.bag = null;
324         
325         n.bag = Bag.makeWith!Alloc(Handle(n), bagArgs);
326         
327         return Handle(n);
328     }
329     
330     /++
331     Atomically loads a shared Handle.
332     Returns: An unshared Handle refering to the same Bag that the shared Handle
333     referred to at some point during the call.
334     Progress: Wait-free
335     +/
336     @property
337     Handle unshared() shared {
338         return Handle(ptr.atomicLoad);
339     }
340     
341     /++
342     Attempt to move this shared handle down the chain to skip over merged nodes.
343     
344     This is for optimization only and has no semantic effects.
345     
346     Returns: true if this thread reaches an unmerged node,
347     else false if another thread modified the handle concurrently.
348     
349     Progress: Lock-free, but wait-free if number of total merges is bounded.
350     +/
351     bool update() shared {
352         auto cur = ptr.atomicLoad;
353         
354         if (cur is null) {
355             return true;
356         }
357         
358         while(true) {
359             if (null !is cur.bag.atomicLoad) {
360                 return true;
361             }
362             
363             auto next = cur.next.atomicLoad; 
364             
365             assert(next !is null);
366             assert(cur.id > next.id);
367             
368             if (!my_cas(&ptr, cur, next)) {
369                 return false;
370             }
371             
372             cur = next;
373         }
374         assert(false);
375     }
376     
377     /++
378     Returns: The id of the currently-referred Node.
379     Progress: Wait-free.
380     +/
381     @property
382     size_t id() in { assert(ptr !is null); } body {
383         return ptr.id;
384     }
385     /++
386     Returns: The id of a Node which this referred to at some point during the
387     call.
388     Progress: Wait-free.
389     +/
390     @property
391     size_t id() shared {
392         return unshared.id;
393     }
394     
395     /++
396     Returns: The width of the currently-referred Node's Bag. 
397     Progress: Lock-free.
398     +/
399     @property
400     size_t width() in { assert(ptr !is null); } body {
401         return findClosestBag().width;
402     }
403     /++
404     Returns: The width of the currently-referred Node's Bag.
405     Progress: Lock-free.
406     +/
407     @property
408     size_t width() shared {
409         return unshared.width;
410     }
411     
412     /++
413     Atomically applies dg to some element of the Bag.
414     Progress: Lock-free if there are no more than width total concurrent threads,
415     otherwise deadlock-free.
416     +/
417     void apply(scope void delegate(Obj*) dg) shared {
418         applyUntil((Obj* t) {
419             dg(t);
420             return true;
421         });
422     }
423     
424     /++
425     Applies dg to elements in Bags which this Handle refers to during this call
426     until dg returns true for the first time.
427     
428     If dg does not mutate when it returns false, then applyUntil is linearizable
429     to a single application of dg which returns true.
430     
431     Warning: The applications of dg are NOT collectively atomic; merges and
432     other calls to applyUntil may take affect on elements between calls to dg.
433     To build a linearizable data structure using Handle, each time dg returns
434     false, dg should NOT modify the element through the pointer or store
435     information about the state of the element during that application, as if
436     dg is a transaction which failed and must roll-back. However, dg may modify
437     external state to keep track on the number of applications, for example.
438     
439     Progress: Lock-free if
440     
441     1. there are no more than width total concurrent threads,
442     
443     2. dg does not semantically modify through its argument if it returns false,
444     and
445     
446     3. dg returns true after a finite number of applications.
447     
448     If only 2 and 3 hold, then this method is deadlock-free. Otherwise, no
449     progress is guaranteed.
450     +/
451     void applyUntil(scope bool delegate(Obj*) dg) shared {
452         scope(success) update();
453         
454         auto cur = unshared;
455         while(true) {
456             if(cur.findClosestBag().tryApplyUntil(dg) ) {
457                 break;
458             }
459         }
460     }
461     static if (hasMember!(Bag, "tryApplyUntil_2")) {
462         void applyUntil_2(scope bool delegate(Obj*,Obj*) dg) shared {
463             scope(success) update();
464             
465             auto cur = unshared;
466             while(true) {
467                 if( cur.findClosestBag().tryApplyUntil_2(dg) ) {
468                     break;
469                 }
470             }
471         }
472     }
473     /++
474     Calls tryApplyEachUntil(dg) on the Bag this Handle references and
475     returns the result.
476     
477     Progress: minimum of Lock-free and the progress of Bag.tryApplyEachUntil.
478     +/
479     bool tryApplyEachUntil(scope bool delegate(Obj*) dg) shared {
480         scope(success) update();
481         
482         return unshared.findClosestBag().tryApplyEachUntil(dg);
483     }
484     
485     /++
486     Attempts to merge the elements in this and other's Bags together.
487     Returns: MergeResult describing what occurred.
488     Progress: Lock-free if there are no more than width total concurrent threads,
489     otherwise deadlock-free.
490     +/
491     MergeResult mergeInto(ref Handle other) {
492         // get local copies to use
493         alias a = this;  //Handle(ptr.atomicLoad);
494         alias b = other; //Handle(other.ptr.atomicLoad);
495         
496         assert(a.ptr !is null);
497         assert(b.ptr !is null);
498         
499         if (! a.canMergeWith(b)) {
500             return MergeResult.Incompatible;
501         }
502         
503         // try to link other and this
504         auto result = tryUnion(a, b);
505         
506         if (! result.merged ) {
507             return MergeResult.IdClash;
508         }
509         if (result.dest.ptr is null) {
510             return MergeResult.WereAlreadyEqual;
511         }
512         if (a != result.dest) {
513             a.ensureMergedInto(b);
514         }
515         else {
516             b.ensureMergedInto(a);
517         }
518         
519         return MergeResult.Success;
520     }
521 }
522