1 module mergearray.impl.handle_array; 2 3 import core.atomic, core.sync.mutex, std.typecons, std.traits, std.exception; 4 5 import mergearray.impl.header; 6 7 /++ 8 Possibilities for the result of a merge operation. 9 +/ 10 enum MergeResult { 11 /// The merge was performed successfully. 12 Success, 13 /// Both objects are already equivalent, so nothing was done. 14 WereAlreadyEqual, 15 /// ERROR: Both objects were erroneously given the same id, so nothing was done. 16 IdClash, 17 /// ERROR: The objects are not able to be merged (eg. not the same width), so nothing was done. 18 Incompatible 19 } 20 21 /++ 22 A handle to a Union-Find node, referencing a mergeable collection of type Bag. 23 24 A Handle is the outer interface of the MergeArray framework for building 25 relaxed concurrent mergeable data structures. Such a data structure should wrap 26 a Handle to a Bag type which manages access and merging of sequential data 27 structures of type Bag.ElementType. 28 +/ 29 struct Handle(Bag) if (is(Bag == shared) && is(Bag == struct)) { 30 /++ 31 The type of the sequential data structures which applyUntil operates on. 32 +/ 33 alias Obj = Bag.ElementType; 34 35 private: 36 align(32) 37 static shared struct Node { 38 Bag* bag = null; // if not null then, as long as next is null, this won't change 39 Node* next = null; // monotonic: null -> non-null high id -> non-null lower id -> ... 40 immutable size_t id; // if next is not null then next.id <= id 41 } 42 43 shared(Node)* ptr = null; 44 45 /+ 46 Moves this to point to the end of its chain of handles 47 Any incomplete merges along the way are linearized before this 48 +/ 49 void descend_merging() 50 in { 51 assert(ptr !is null); 52 } 53 out { 54 assert(ptr !is null); 55 } 56 body { 57 while(true) { 58 findClosestBag(); 59 auto next = ptr.next.atomicLoad; 60 61 if (next is null) { 62 // found the bottom (for now) 63 return; 64 } 65 else { 66 // found (potentially) unfinished work 67 ensureMergedInto(Handle(next)); 68 } 69 } 70 assert(false); 71 } 72 73 /+ 74 Tries to point higher to lower 75 +/ 76 static bool tryLink(shared(Node)* higher, shared(Node)* lower) 77 in { 78 assert(higher !is null); 79 assert(lower !is null); 80 assert(lower.id < higher.id); 81 } 82 body { 83 return my_cas(& higher.next, cast(shared(Node)*) null, lower); 84 } 85 86 /+ 87 Tries to merge the node chains of the input handles 88 If the source has pre-existing links, then it is resolved before a new link is made 89 +/ 90 static auto tryUnion(ref Handle a, ref Handle b) 91 in { 92 assert(a.ptr !is null); 93 assert(b.ptr !is null); 94 } 95 body { 96 struct Result { 97 bool merged; 98 Handle dest; 99 } 100 // work on local copy 101 while(true) { 102 a.descend_merging(); 103 b.descend_merging(); 104 105 auto an = a.id; 106 auto bn = b.id; 107 108 if (a.ptr is b.ptr) { 109 // a and b are the same already 110 return Result(true, Handle(null)); 111 } 112 else if (an == bn && a.ptr.bag.atomicLoad !is null) { 113 // a and b alias to bags of the same id (linearized in 2nd descend_merging) 114 return Result(false, Handle(null)); 115 } 116 else if (an < bn && tryLink(b.ptr, a.ptr)) { 117 // succeeded at linking b --> a 118 return Result(true, a); 119 } 120 else if (an > bn && tryLink(a.ptr, b.ptr)) { 121 // succeeded at linking a --> b 122 return Result(true, b); 123 } 124 } 125 126 assert(false); 127 } 128 129 /+ 130 Lazily merge all of this bag's elements into next's then linearize them all at once 131 +/ 132 void ensureMergedInto(Handle next) 133 in { 134 assert(ptr !is null); 135 assert(next.ptr !is null); 136 assert(this.id > next.id); 137 } 138 body { 139 shared(Bag)* bagSrc = ptr.bag.atomicLoad; 140 141 // check that no one else finished for us 142 if (bagSrc is null) { 143 return; 144 } 145 146 bagSrc.mergePerElementInto(next); 147 148 // everything is merged! 149 // Linearize movement of heaps by nulling reference 150 ptr.bag.atomicStore(null); 151 } 152 153 /+ 154 Attempts to change *loc to dest while *loc has a higher id than dest 155 +/ 156 static void ensureMovedDownTo(shared(Node*)* loc, shared(Node)* dest) 157 in { 158 assert(loc !is null); 159 assert(dest !is null); 160 } 161 out { 162 assert(loc !is null); 163 } 164 body { 165 immutable id = dest.id; 166 167 while(true) { 168 shared(Node)* cur = atomicLoad(*loc); 169 170 assert(cur !is null); 171 172 if (cur is dest || cur.id < id) { 173 // already done 174 break; 175 } 176 else if (my_cas(loc, cur, dest)) { 177 // succeeded in moving downwards 178 break; 179 } 180 else { 181 // another thread moved loc down, don't know yet if far enough 182 } 183 } 184 } 185 186 /+ 187 Moves ptr to point to the closest node with a non-null bag and returns the bag found there 188 +/ 189 auto findClosestBag_noCompress() 190 in { assert(ptr !is null); } 191 out(bag) { assert(ptr !is null); assert(bag !is null); } 192 body { 193 while(true) { 194 assert(ptr !is null); 195 196 shared(Bag)* curBag = ptr.bag.atomicLoad; // last exec of this line == linearization point ??? 197 198 if (curBag !is null) { 199 // found a bag! 200 return curBag; 201 } 202 else { 203 // cur's bag was merged into a descendent 204 shared(Node)* next = ptr.next.atomicLoad; // ok because bag's atomicLoad 205 206 // thus there must exist a descendent 207 assert(next !is null); 208 assert(ptr.id > next.id); 209 210 // go to next node 211 ptr = next; 212 } 213 } 214 assert(false); 215 } 216 217 package: 218 void ensureMerged() { 219 if (ptr is null) { 220 return; 221 } 222 auto next = ptr.next.atomicLoad; 223 224 if (next !is null) { 225 ensureMergedInto(Handle(next)); 226 } 227 } 228 229 /+ 230 finds closest bag AND move down ptr 231 +/ 232 auto findClosestBag() 233 in { assert(ptr !is null); } 234 out(bag) { assert(ptr !is null); assert(bag !is null); } 235 body { 236 shared(Node)* start = ptr; 237 238 auto bag = findClosestBag_noCompress(); 239 240 assert(ptr !is null); 241 242 while(start.id > ptr.id) { 243 shared(Node)* next = start.next.atomicLoad; 244 245 ensureMovedDownTo(&start.next, ptr); 246 247 start = next; 248 } 249 250 return bag; 251 } 252 253 /+ 254 Returns false if handle points directly to an unmerged bag 255 Returns true if handle is null or pointed to a merged bag (thus set to null) 256 +/ 257 @property 258 bool isFullyMerged() shared { 259 auto cur = ptr.atomicLoad; 260 if (cur is null) { 261 return true; 262 } 263 else if (null is cur.bag.atomicLoad) { 264 ptr.atomicStore(null); 265 return true; 266 } 267 else { 268 return false; 269 } 270 } 271 272 /+ 273 Checks whether the handles' bags are mergable 274 Assumes that if any two bags are mergable, then other merges will preserve that 275 +/ 276 bool canMergeWith(ref Handle other) { 277 return findClosestBag().canMergeWith( other.findClosestBag() ); 278 } 279 280 public: 281 /++ 282 GC allocates a new Node and returns a Handle to it. 283 Params: 284 id = Unique identifier for the given Bag 285 bagArgs = Arguments to the Bag constructor. 286 Returns: A Handle to the created Node refering to the new Bag. 287 +/ 288 static Handle make(BagArgs...)(size_t id, BagArgs bagArgs) { 289 auto n = new shared Node(null, null, id); 290 n.bag = new Bag(Handle(n), bagArgs); 291 return Handle(n); 292 } 293 294 /++ 295 Returns: The total number of bytes that will be requested to the allocator 296 when creating a new, empty instance of this type. The Args a are forwarded 297 to Bag.fixedAllocSize to determine the result. 298 +/ 299 static size_t fixedAllocSize(Args...)(Args a) { 300 //pragma(msg, "Node.sizeof = " ~ Node.sizeof.stringof); 301 302 return Node.sizeof + Bag.fixedAllocSize(a); 303 } 304 /++ 305 Static factory method which uses Alloc to allocate a new Node and returns a 306 Handle to it. 307 Params: 308 id = Unique identifier for the given Bag 309 bagArgs = Arguments to the Bag constructor. 310 Returns: A Handle to the created Node refering to the new Bag. 311 +/ 312 static Handle makeWith(Alloc, BagArgs...)(size_t id, BagArgs bagArgs) { 313 void* nMem = Alloc.alloc(Node.sizeof).ptr; 314 315 assert((cast(size_t)nMem) % Node.alignof == 0); 316 317 // id is immutable thus must assign before casting to Node 318 * cast(size_t*)(nMem + Node.id.offsetof) = id; 319 320 auto n = cast(shared(Node)*) nMem; 321 assert(n.id == id); 322 n.next = null; 323 n.bag = null; 324 325 n.bag = Bag.makeWith!Alloc(Handle(n), bagArgs); 326 327 return Handle(n); 328 } 329 330 /++ 331 Atomically loads a shared Handle. 332 Returns: An unshared Handle refering to the same Bag that the shared Handle 333 referred to at some point during the call. 334 Progress: Wait-free 335 +/ 336 @property 337 Handle unshared() shared { 338 return Handle(ptr.atomicLoad); 339 } 340 341 /++ 342 Attempt to move this shared handle down the chain to skip over merged nodes. 343 344 This is for optimization only and has no semantic effects. 345 346 Returns: true if this thread reaches an unmerged node, 347 else false if another thread modified the handle concurrently. 348 349 Progress: Lock-free, but wait-free if number of total merges is bounded. 350 +/ 351 bool update() shared { 352 auto cur = ptr.atomicLoad; 353 354 if (cur is null) { 355 return true; 356 } 357 358 while(true) { 359 if (null !is cur.bag.atomicLoad) { 360 return true; 361 } 362 363 auto next = cur.next.atomicLoad; 364 365 assert(next !is null); 366 assert(cur.id > next.id); 367 368 if (!my_cas(&ptr, cur, next)) { 369 return false; 370 } 371 372 cur = next; 373 } 374 assert(false); 375 } 376 377 /++ 378 Returns: The id of the currently-referred Node. 379 Progress: Wait-free. 380 +/ 381 @property 382 size_t id() in { assert(ptr !is null); } body { 383 return ptr.id; 384 } 385 /++ 386 Returns: The id of a Node which this referred to at some point during the 387 call. 388 Progress: Wait-free. 389 +/ 390 @property 391 size_t id() shared { 392 return unshared.id; 393 } 394 395 /++ 396 Returns: The width of the currently-referred Node's Bag. 397 Progress: Lock-free. 398 +/ 399 @property 400 size_t width() in { assert(ptr !is null); } body { 401 return findClosestBag().width; 402 } 403 /++ 404 Returns: The width of the currently-referred Node's Bag. 405 Progress: Lock-free. 406 +/ 407 @property 408 size_t width() shared { 409 return unshared.width; 410 } 411 412 /++ 413 Atomically applies dg to some element of the Bag. 414 Progress: Lock-free if there are no more than width total concurrent threads, 415 otherwise deadlock-free. 416 +/ 417 void apply(scope void delegate(Obj*) dg) shared { 418 applyUntil((Obj* t) { 419 dg(t); 420 return true; 421 }); 422 } 423 424 /++ 425 Applies dg to elements in Bags which this Handle refers to during this call 426 until dg returns true for the first time. 427 428 If dg does not mutate when it returns false, then applyUntil is linearizable 429 to a single application of dg which returns true. 430 431 Warning: The applications of dg are NOT collectively atomic; merges and 432 other calls to applyUntil may take affect on elements between calls to dg. 433 To build a linearizable data structure using Handle, each time dg returns 434 false, dg should NOT modify the element through the pointer or store 435 information about the state of the element during that application, as if 436 dg is a transaction which failed and must roll-back. However, dg may modify 437 external state to keep track on the number of applications, for example. 438 439 Progress: Lock-free if 440 441 1. there are no more than width total concurrent threads, 442 443 2. dg does not semantically modify through its argument if it returns false, 444 and 445 446 3. dg returns true after a finite number of applications. 447 448 If only 2 and 3 hold, then this method is deadlock-free. Otherwise, no 449 progress is guaranteed. 450 +/ 451 void applyUntil(scope bool delegate(Obj*) dg) shared { 452 scope(success) update(); 453 454 auto cur = unshared; 455 while(true) { 456 if(cur.findClosestBag().tryApplyUntil(dg) ) { 457 break; 458 } 459 } 460 } 461 static if (hasMember!(Bag, "tryApplyUntil_2")) { 462 void applyUntil_2(scope bool delegate(Obj*,Obj*) dg) shared { 463 scope(success) update(); 464 465 auto cur = unshared; 466 while(true) { 467 if( cur.findClosestBag().tryApplyUntil_2(dg) ) { 468 break; 469 } 470 } 471 } 472 } 473 /++ 474 Calls tryApplyEachUntil(dg) on the Bag this Handle references and 475 returns the result. 476 477 Progress: minimum of Lock-free and the progress of Bag.tryApplyEachUntil. 478 +/ 479 bool tryApplyEachUntil(scope bool delegate(Obj*) dg) shared { 480 scope(success) update(); 481 482 return unshared.findClosestBag().tryApplyEachUntil(dg); 483 } 484 485 /++ 486 Attempts to merge the elements in this and other's Bags together. 487 Returns: MergeResult describing what occurred. 488 Progress: Lock-free if there are no more than width total concurrent threads, 489 otherwise deadlock-free. 490 +/ 491 MergeResult mergeInto(ref Handle other) { 492 // get local copies to use 493 alias a = this; //Handle(ptr.atomicLoad); 494 alias b = other; //Handle(other.ptr.atomicLoad); 495 496 assert(a.ptr !is null); 497 assert(b.ptr !is null); 498 499 if (! a.canMergeWith(b)) { 500 return MergeResult.Incompatible; 501 } 502 503 // try to link other and this 504 auto result = tryUnion(a, b); 505 506 if (! result.merged ) { 507 return MergeResult.IdClash; 508 } 509 if (result.dest.ptr is null) { 510 return MergeResult.WereAlreadyEqual; 511 } 512 if (a != result.dest) { 513 a.ensureMergedInto(b); 514 } 515 else { 516 b.ensureMergedInto(a); 517 } 518 519 return MergeResult.Success; 520 } 521 } 522