1 module mergearray.impl.header; 2 3 version(mergearray_use_numeric_tid) { 4 /++ 5 Unique thread identifier of type size_t. 6 7 This is used for errors and debugging when 8 version(mergearray_use_numeric_tid) is set. 9 +/ 10 shared size_t tidCount = 0; 11 /// ditto 12 size_t tid = 0; 13 14 static this() { 15 synchronized { 16 tid = tidCount; 17 tidCount = tid + 1; 18 } 19 } 20 } 21 else { 22 import std.concurrency : thisTid; 23 /++ 24 Unique thread identifier of type std.concurrency.Tid. 25 26 This is used for errors and debugging when 27 version(mergearray_use_numeric_tid) is not set. 28 +/ 29 alias tid = thisTid; 30 //Tid tid() { return thisTid(); } 31 } 32 33 // ============================================================================= 34 35 version (mergearray_do_cas_statistics) { 36 /++ 37 Instrumented compare-and-swap to record usage statistics for debugging or 38 performance measurements. 39 40 This is used when version(mergearray_do_cas_statistics) is set. 41 +/ 42 size_t numSuccessfulCAS = 0; 43 /// ditto 44 size_t numFailedCAS = 0; 45 46 /// ditto 47 bool my_cas(L,E,V)(L* loc, E expected, V value) { 48 import core.atomic : cas; 49 50 if (cas(loc,expected,value)) { 51 numSuccessfulCAS++; 52 return true; 53 } 54 else { 55 numFailedCAS++; 56 return false; 57 } 58 } 59 60 /++ 61 Resets instrumented compare-and-swap statistics to zero. 62 63 This is used when version(mergearray_do_cas_statistics) is set. 64 +/ 65 void resetCasStats() { 66 numSuccessfulCAS = 0; 67 numFailedCAS = 0; 68 } 69 } 70 else { 71 import core.atomic : cas; 72 /++ 73 Uninstrumented compare-and-swap. 74 75 This is used when version(mergearray_do_cas_statistics) is not set. 76 +/ 77 alias my_cas = cas; 78 } 79 80 //============================================================================== 81 82 version (mergearray_do_lock_statistics) { 83 /++ 84 Instrumentation variables to record usage statistics of SMutex and TasMutex 85 for debugging or performance measurements. 86 87 This is used when version(mergearray_do_lock_statistics) is set. 88 +/ 89 size_t tryLockFails = 0; 90 /// ditto 91 size_t tryLockSuccess = 0; 92 /// ditto 93 size_t lockTotal = 0; 94 95 /// ditto 96 void resetLockStats() { 97 tryLockFails = 0; 98 tryLockSuccess = 0; 99 lockTotal = 0; 100 } 101 } 102 103 //============================================================================== 104 105 /++ 106 Scoped wrapper for a shared(core.sync.mutex.Mutex). 107 +/ 108 struct SMutex { 109 private: 110 import core.sync.mutex : Mutex; 111 112 Mutex mutex; 113 114 public: 115 /++ 116 Returns: An unshared reference to the internal Mutex object. 117 +/ 118 @property 119 Mutex unshared() shared { 120 return cast(Mutex) mutex; 121 } 122 /++ 123 Static factory method. 124 Params: 125 args = Arguments to the Mutex constructor. 126 Returns: A shared SMutex, by value. 127 +/ 128 static shared(SMutex) make(Args...)(Args args) { 129 return shared SMutex(cast(shared) new Mutex(args)); 130 } 131 /++ 132 Attempt to lock, and if successful, call dg() then unlock. 133 Params: 134 dg = A scoped callback that is called only while this is locked. 135 Returns: Whether or not the lock was taken. 136 +/ 137 bool tryLocked(scope void delegate() dg) shared { 138 if (unshared.tryLock()) { 139 scope(exit) unshared.unlock(); 140 141 version (mergearray_do_lock_statistics) { 142 tryLockSuccess++; 143 lockTotal++; 144 } 145 146 dg(); 147 148 return true; 149 } 150 else { 151 version (mergearray_do_lock_statistics) { 152 tryLockFails++; 153 } 154 return false; 155 } 156 } 157 /++ 158 Lock, call dg(), then unlock. 159 Params: 160 dg = A scoped callback that is called only while this is locked. 161 +/ 162 void locked(scope void delegate() dg) shared { 163 synchronized (unshared) { 164 version (mergearray_do_lock_statistics) { 165 lockTotal++; 166 } 167 dg(); 168 } 169 } 170 } 171 172 /++ 173 Scoped lock wrapping an atomic bool. 174 +/ 175 struct TasMutex { 176 import core.atomic; 177 178 private bool lock; 179 180 /++ 181 Static factory method. 182 Returns: A shared TasMutex, by value. 183 +/ 184 static shared(TasMutex) make() { 185 return shared(TasMutex)(false); 186 } 187 /++ 188 Attempt to lock, and if successful, call dg() then unlock. 189 Params: 190 dg = A scoped callback that is called only while this is locked. 191 Returns: Whether or not the lock was taken. 192 +/ 193 bool tryLocked(scope void delegate() dg) shared { 194 if (cas(&lock, false, true)) { 195 scope(exit) lock.atomicStore(false); 196 197 version (mergearray_do_lock_statistics) { 198 tryLockSuccess++; 199 lockTotal++; 200 } 201 202 dg(); 203 204 return true; 205 } 206 else { 207 version (mergearray_do_lock_statistics) { 208 tryLockFails++; 209 } 210 return false; 211 } 212 } 213 /++ 214 Lock, call dg(), then unlock. 215 Params: 216 dg = A scoped callback that is called only while this is locked. 217 +/ 218 void locked(scope void delegate() dg) shared { 219 while(true) { 220 while(lock.atomicLoad) { } 221 if (tryLocked(dg)) return; 222 } 223 } 224 } 225 // ----------------------------------------------------------------------------- 226 227 /++ 228 Shared, coarsely-locked wrapper of an unshared T. 229 +/ 230 struct CoarseLocked(T) if (!is(T == shared)) { 231 alias M = TasMutex; // SMutex 232 233 private T t; 234 private M mutex; 235 236 /++ 237 Construct from an initial value with a new lock. 238 Params: 239 init = The initial value for the internal T. 240 +/ 241 this(T init) shared { 242 t = cast(shared) init; 243 mutex = M.make(); 244 } 245 /++ 246 Attempt to lock, and if successful, call dg(&t) for internal value t, then unlock. 247 Params: 248 dg = A scoped callback that is called only while this is locked. 249 Returns: Whether or not the lock was taken. 250 +/ 251 bool tryLocked(scope void delegate(T*) dg) shared { 252 return mutex.tryLocked({ 253 T* ptr = cast(T*) &t; 254 dg(ptr); 255 }); 256 } 257 /++ 258 Attempt to lock, and if successful, call dg() then unlock. 259 Params: 260 dg = A scoped callback that is called only while this is locked. 261 Returns: Whether or not the lock was taken. 262 +/ 263 bool tryLocked(scope void delegate() dg) shared { 264 return mutex.tryLocked(dg); 265 } 266 /++ 267 Lock, call dg(&t) for internal value t, then unlock. 268 Params: 269 dg = A scoped callback that is called only while this is locked. 270 +/ 271 void locked(scope void delegate(T*) dg) shared { 272 return mutex.locked({ 273 T* ptr = cast(T*) &t; 274 dg(ptr); 275 }); 276 } 277 /++ 278 Lock, call dg(), then unlock. 279 Params: 280 dg = A scoped callback that is called only while this is locked. 281 +/ 282 void locked(scope void delegate() dg) shared { 283 return mutex.locked(dg); 284 } 285 /++ 286 Unshared accessor to the internal T. 287 WARNING: This method does not provide mutual exclusion. 288 Returns: An unshared pointer to the internal T. 289 +/ 290 T* peek() { 291 return &t; 292 } 293 }