1 module mergearray.impl.header;
2 
3 version(mergearray_use_numeric_tid) {
4     /++
5     Unique thread identifier of type size_t.
6     
7     This is used for errors and debugging when
8     version(mergearray_use_numeric_tid) is set.
9     +/
10     shared size_t tidCount = 0;
11     /// ditto
12     size_t tid = 0;
13     
14     static this() {
15         synchronized {
16             tid = tidCount;
17             tidCount = tid + 1;
18         }
19     }
20 }
21 else {
22     import std.concurrency : thisTid;
23     /++
24     Unique thread identifier of type std.concurrency.Tid.
25     
26     This is used for errors and debugging when
27     version(mergearray_use_numeric_tid) is not set.
28     +/
29     alias tid = thisTid;
30     //Tid tid() { return thisTid(); }
31 }
32 
33 // =============================================================================
34 
35 version (mergearray_do_cas_statistics) {
36     /++
37     Instrumented compare-and-swap to record usage statistics for debugging or
38     performance measurements.
39     
40     This is used when version(mergearray_do_cas_statistics) is set.
41     +/
42     size_t numSuccessfulCAS = 0;
43     /// ditto
44     size_t numFailedCAS = 0;
45     
46     /// ditto
47     bool my_cas(L,E,V)(L* loc, E expected, V value) {
48         import core.atomic : cas;
49         
50         if (cas(loc,expected,value)) {
51             numSuccessfulCAS++;
52             return true;
53         }
54         else {
55             numFailedCAS++;
56             return false;
57         }
58     }
59     
60     /++
61     Resets instrumented compare-and-swap statistics to zero.
62     
63     This is used when version(mergearray_do_cas_statistics) is set.
64     +/
65     void resetCasStats() {
66         numSuccessfulCAS = 0;
67         numFailedCAS = 0;
68     }
69 }
70 else {
71     import core.atomic : cas;
72     /++
73     Uninstrumented compare-and-swap.
74     
75     This is used when version(mergearray_do_cas_statistics) is not set.
76     +/
77     alias my_cas = cas;
78 }
79 
80 //==============================================================================
81 
82 version (mergearray_do_lock_statistics) {
83     /++
84     Instrumentation variables to record usage statistics of SMutex and TasMutex
85     for debugging or performance measurements.
86     
87     This is used when version(mergearray_do_lock_statistics) is set.
88     +/
89     size_t tryLockFails = 0;
90     /// ditto
91     size_t tryLockSuccess = 0;
92     /// ditto
93     size_t lockTotal = 0;
94     
95     /// ditto
96     void resetLockStats() {
97         tryLockFails = 0;
98         tryLockSuccess = 0;
99         lockTotal = 0;
100     }
101 }
102 
103 //==============================================================================
104 
105 /++
106 Scoped wrapper for a shared(core.sync.mutex.Mutex).
107 +/
108 struct SMutex {
109 private:
110     import core.sync.mutex : Mutex;
111     
112     Mutex mutex;
113     
114 public:
115     /++
116     Returns: An unshared reference to the internal Mutex object.
117     +/
118     @property
119     Mutex unshared() shared {
120         return cast(Mutex) mutex;
121     }
122     /++
123     Static factory method.
124     Params:
125     args = Arguments to the Mutex constructor.
126     Returns: A shared SMutex, by value.
127     +/
128     static shared(SMutex) make(Args...)(Args args) {
129         return shared SMutex(cast(shared) new Mutex(args));
130     }
131     /++
132     Attempt to lock, and if successful, call dg() then unlock.
133     Params:
134     dg = A scoped callback that is called only while this is locked.
135     Returns: Whether or not the lock was taken.
136     +/
137     bool tryLocked(scope void delegate() dg) shared {
138         if (unshared.tryLock()) {
139             scope(exit) unshared.unlock();
140             
141             version (mergearray_do_lock_statistics) {
142                 tryLockSuccess++;
143                 lockTotal++;
144             }
145             
146             dg();
147             
148             return true;
149         }
150         else {
151             version (mergearray_do_lock_statistics) {
152                 tryLockFails++;
153             }
154             return false;
155         }
156     }
157     /++
158     Lock, call dg(), then unlock.
159     Params:
160     dg = A scoped callback that is called only while this is locked.
161     +/
162     void locked(scope void delegate() dg) shared {
163         synchronized (unshared) {
164             version (mergearray_do_lock_statistics) {
165                 lockTotal++;
166             }
167             dg();
168         }
169     }
170 }
171 
172 /++
173 Scoped lock wrapping an atomic bool.
174 +/
175 struct TasMutex {
176     import core.atomic;
177     
178     private bool lock;
179     
180     /++
181     Static factory method.
182     Returns: A shared TasMutex, by value.
183     +/
184     static shared(TasMutex) make() {
185         return shared(TasMutex)(false);
186     }
187     /++
188     Attempt to lock, and if successful, call dg() then unlock.
189     Params:
190     dg = A scoped callback that is called only while this is locked.
191     Returns: Whether or not the lock was taken.
192     +/
193     bool tryLocked(scope void delegate() dg) shared {
194         if (cas(&lock, false, true)) {
195             scope(exit) lock.atomicStore(false);
196             
197             version (mergearray_do_lock_statistics) {
198                 tryLockSuccess++;
199                 lockTotal++;
200             }
201             
202             dg();
203             
204             return true;
205         }
206         else {
207             version (mergearray_do_lock_statistics) {
208                 tryLockFails++;
209             }
210             return false;
211         }
212     }
213     /++
214     Lock, call dg(), then unlock.
215     Params:
216     dg = A scoped callback that is called only while this is locked.
217     +/
218     void locked(scope void delegate() dg) shared {
219         while(true) {
220             while(lock.atomicLoad) { }
221             if (tryLocked(dg)) return;
222         }
223     }
224 }
225 // -----------------------------------------------------------------------------
226 
227 /++
228 Shared, coarsely-locked wrapper of an unshared T.
229 +/
230 struct CoarseLocked(T) if (!is(T == shared)) {
231     alias M = TasMutex; // SMutex
232     
233     private T t;
234     private M mutex;
235     
236     /++
237     Construct from an initial value with a new lock.
238     Params:
239     init = The initial value for the internal T.
240     +/
241     this(T init) shared {
242         t = cast(shared) init;
243         mutex = M.make();
244     }
245     /++
246     Attempt to lock, and if successful, call dg(&t) for internal value t, then unlock.
247     Params:
248     dg = A scoped callback that is called only while this is locked.
249     Returns: Whether or not the lock was taken.
250     +/
251     bool tryLocked(scope void delegate(T*) dg) shared {
252         return mutex.tryLocked({
253             T* ptr = cast(T*) &t;
254             dg(ptr);
255         });
256     }
257     /++
258     Attempt to lock, and if successful, call dg() then unlock.
259     Params:
260     dg = A scoped callback that is called only while this is locked.
261     Returns: Whether or not the lock was taken.
262     +/
263     bool tryLocked(scope void delegate() dg) shared {
264         return mutex.tryLocked(dg);
265     }
266     /++
267     Lock, call dg(&t) for internal value t, then unlock.
268     Params:
269     dg = A scoped callback that is called only while this is locked.
270     +/
271     void locked(scope void delegate(T*) dg) shared {
272         return mutex.locked({
273             T* ptr = cast(T*) &t;
274             dg(ptr);
275         });
276     }
277     /++
278     Lock, call dg(), then unlock.
279     Params:
280     dg = A scoped callback that is called only while this is locked.
281     +/
282     void locked(scope void delegate() dg) shared {
283         return mutex.locked(dg);
284     }
285     /++
286     Unshared accessor to the internal T.
287     WARNING: This method does not provide mutual exclusion.
288     Returns: An unshared pointer to the internal T.
289     +/
290     T* peek() {
291         return &t;
292     }
293 }