From 58c4f7deede7a92bbfbde84c0b5ba17006cbe7b2 Mon Sep 17 00:00:00 2001 From: Andrei Alexandrescu Date: Sun, 11 Jun 2017 14:20:53 -0400 Subject: [PATCH 1/3] Eliminate static shared this from std/parallelism.d --- std/parallelism.d | 170 ++++++++++++++++++++++++++-------------------- 1 file changed, 95 insertions(+), 75 deletions(-) diff --git a/std/parallelism.d b/std/parallelism.d index b2775f760e2..6470b406e1c 100644 --- a/std/parallelism.d +++ b/std/parallelism.d @@ -99,87 +99,47 @@ else version(NetBSD) version = useSysctlbyname; } - -version(Windows) -{ - // BUGS: Only works on Windows 2000 and above. - shared static this() - { - import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo; - import std.algorithm.comparison : max; - - SYSTEM_INFO si; - GetSystemInfo(&si); - totalCPUs = max(1, cast(uint) si.dwNumberOfProcessors); - } - -} -else version(linux) -{ - shared static this() - { - import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; - totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN); - } -} -else version(Solaris) +/* +A lazily initialized global constant. The underlying value is a shared global +statically initialized to `outOfBandValue` which must not be a legit value of +the constant. Upon the first call the situation is detected and the global is +initialized by calling `initializer`. The initializer is assumed to be pure +(even if not marked as such), i.e. return the same value upon repeated calls. +For that reason, no special precautions are taken so `initializer` may be called +more than one time leading to benign races on the cached value. + +In the quiescent state the cost of the function is an atomic load from a global. +*/ +package @property @nogc nothrow pure @trusted +T lazilyInitializedConstant(T, T outOfBandValue, alias initializer)() { - shared static this() - { - import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; - totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN); - } + shared T result = outOfBandValue; + // Short path + auto local = atomicLoad(result); + if (local != outOfBandValue) return local; + // Long path + local = (cast(size_t function() @nogc nothrow pure @trusted) &initializer)(); + atomicStore(result, local); + return local; } -else version(useSysctlbyname) -{ - extern(C) int sysctlbyname( - const char *, void *, size_t *, void *, size_t - ); - - shared static this() - { - version(OSX) - { - auto nameStr = "machdep.cpu.core_count\0".ptr; - } - else version(FreeBSD) - { - auto nameStr = "hw.ncpu\0".ptr; - } - else version(NetBSD) - { - auto nameStr = "hw.ncpu\0".ptr; - } - uint ans; - size_t len = uint.sizeof; - sysctlbyname(nameStr, &ans, &len, null, 0); - totalCPUs = ans; - } - -} -else -{ - static assert(0, "Don't know how to get N CPUs on this OS."); -} +// Returns the size of a cache line. +alias cacheLineSize = lazilyInitializedConstant!(size_t, size_t.max, cacheLineSizeImpl); -immutable size_t cacheLineSize; -shared static this() +private size_t cacheLineSizeImpl() @nogc nothrow { + size_t result = 0; import core.cpuid : datacache; - size_t lineSize = 0; foreach (cachelevel; datacache) { - if (cachelevel.lineSize > lineSize && cachelevel.lineSize < uint.max) + if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max) { - lineSize = cachelevel.lineSize; + result = cachelevel.lineSize; } } - - cacheLineSize = lineSize; + return result; } - /* Atomics code. These forward to core.atomic, but are written like this for two reasons: @@ -945,11 +905,74 @@ if (is(typeof(fun(args))) && isSafeTask!F) return ret; } +version(useSysctlbyname) + private extern(C) int sysctlbyname( + const char *, void *, size_t *, void *, size_t + ) @nogc nothrow; + /** The total number of CPU cores available on the current machine, as reported by the operating system. */ -immutable uint totalCPUs; +@property @nogc nothrow pure @trusted uint totalCPUs() +{ + return (cast(uint function() @nogc nothrow pure) + &totalCPUsImpl)(); +} + +uint totalCPUsImpl() @nogc nothrow +{ + static shared uint result; + auto localResult = atomicLoad(result); + if (localResult > 0) return localResult; + + // There might be harmless multiple initialization here + version(Windows) + { + // BUGS: Only works on Windows 2000 and above. + import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo; + import std.algorithm.comparison : max; + + SYSTEM_INFO si; + GetSystemInfo(&si); + atomicStore(result, max(1, cast(uint) si.dwNumberOfProcessors)); + } + else version(linux) + { + import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; + atomicStore(result, cast(uint) sysconf(_SC_NPROCESSORS_ONLN)); + } + else version(Solaris) + { + import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; + atomicStore(result, cast(uint) sysconf(_SC_NPROCESSORS_ONLN)); + } + else version(useSysctlbyname) + { + version(OSX) + { + auto nameStr = "machdep.cpu.core_count\0".ptr; + } + else version(FreeBSD) + { + auto nameStr = "hw.ncpu\0".ptr; + } + else version(NetBSD) + { + auto nameStr = "hw.ncpu\0".ptr; + } + + localResult = 0; + size_t len = uint.sizeof; + sysctlbyname(nameStr, &localResult, &len, null, 0); + atomicStore(result, localResult); + } + else + { + static assert(0, "Don't know how to get N CPUs on this OS."); + } + return atomicLoad(result); +} /* This class serves two purposes: @@ -3294,11 +3317,7 @@ terminating the main thread. }()); } -private shared uint _defaultPoolThreads; -shared static this() -{ - atomicStore(_defaultPoolThreads, totalCPUs - 1); -} +private shared uint _defaultPoolThreads = uint.max; /** These properties get and set the number of worker threads in the $(D TaskPool) @@ -3308,7 +3327,8 @@ number of worker threads in the instance returned by $(D taskPool). */ @property uint defaultPoolThreads() @trusted { - return atomicLoad(_defaultPoolThreads); + auto local = atomicLoad(_defaultPoolThreads); + return local < uint.max ? local : totalCPUs - 1; } /// Ditto From 286930a99c79a6dd4ca9de9b32c533bd6eaf0969 Mon Sep 17 00:00:00 2001 From: Andrei Alexandrescu Date: Wed, 25 Oct 2017 17:55:53 -0400 Subject: [PATCH 2/3] Review --- std/parallelism.d | 91 +++++++++++++++++++++++++++++------------------ 1 file changed, 57 insertions(+), 34 deletions(-) diff --git a/std/parallelism.d b/std/parallelism.d index 6470b406e1c..d5d39226281 100644 --- a/std/parallelism.d +++ b/std/parallelism.d @@ -109,28 +109,56 @@ For that reason, no special precautions are taken so `initializer` may be called more than one time leading to benign races on the cached value. In the quiescent state the cost of the function is an atomic load from a global. + +Params: + T = The type of the pseudo-constant (may be qualified) + outOfBandValue = A value that cannot be valid, it is used for initialization + initializer = The function performing initialization; must be `nothrow` + +Returns: + The lazily initialized value */ -package @property @nogc nothrow pure @trusted -T lazilyInitializedConstant(T, T outOfBandValue, alias initializer)() +package @property pure +T lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)() +if (is(Unqual!T : T) + && is(typeof(initializer()) : T) + && is(typeof(outOfBandValue) : T)) { - shared T result = outOfBandValue; - // Short path - auto local = atomicLoad(result); - if (local != outOfBandValue) return local; - // Long path - local = (cast(size_t function() @nogc nothrow pure @trusted) &initializer)(); - atomicStore(result, local); - return local; + static T impl() nothrow + { + // Thread-local cache + static Unqual!T tls = outOfBandValue; + auto local = tls; + // Shortest path, no atomic operations + if (local != outOfBandValue) return local; + // Process-level cache + static shared Unqual!T result = outOfBandValue; + // Initialize both process-level cache and tls + local = atomicLoad(result); + if (local == outOfBandValue) + { + local = initializer(); + atomicStore(result, local); + } + tls = local; + return local; + } + + import std.traits; + alias Fun = SetFunctionAttributes!(typeof(&impl), "D", + functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_); + return (cast(Fun) &impl)(); } // Returns the size of a cache line. -alias cacheLineSize = lazilyInitializedConstant!(size_t, size_t.max, cacheLineSizeImpl); +alias cacheLineSize = + lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl); -private size_t cacheLineSizeImpl() @nogc nothrow +private size_t cacheLineSizeImpl() @nogc nothrow @trusted { size_t result = 0; import core.cpuid : datacache; - foreach (cachelevel; datacache) + foreach (ref const cachelevel; datacache) { if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max) { @@ -140,6 +168,11 @@ private size_t cacheLineSizeImpl() @nogc nothrow return result; } +unittest +{ + assert(cacheLineSize == cacheLineSizeImpl); +} + /* Atomics code. These forward to core.atomic, but are written like this for two reasons: @@ -914,38 +947,29 @@ version(useSysctlbyname) The total number of CPU cores available on the current machine, as reported by the operating system. */ -@property @nogc nothrow pure @trusted uint totalCPUs() -{ - return (cast(uint function() @nogc nothrow pure) - &totalCPUsImpl)(); -} +alias totalCPUs = + lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl); -uint totalCPUsImpl() @nogc nothrow +uint totalCPUsImpl() @nogc nothrow @trusted { - static shared uint result; - auto localResult = atomicLoad(result); - if (localResult > 0) return localResult; - - // There might be harmless multiple initialization here version(Windows) { // BUGS: Only works on Windows 2000 and above. import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo; import std.algorithm.comparison : max; - SYSTEM_INFO si; GetSystemInfo(&si); - atomicStore(result, max(1, cast(uint) si.dwNumberOfProcessors)); + return max(1, cast(uint) si.dwNumberOfProcessors); } else version(linux) { import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; - atomicStore(result, cast(uint) sysconf(_SC_NPROCESSORS_ONLN)); + return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); } else version(Solaris) { import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf; - atomicStore(result, cast(uint) sysconf(_SC_NPROCESSORS_ONLN)); + return cast(uint) sysconf(_SC_NPROCESSORS_ONLN); } else version(useSysctlbyname) { @@ -962,16 +986,15 @@ uint totalCPUsImpl() @nogc nothrow auto nameStr = "hw.ncpu\0".ptr; } - localResult = 0; - size_t len = uint.sizeof; - sysctlbyname(nameStr, &localResult, &len, null, 0); - atomicStore(result, localResult); + uint result; + size_t len = result.sizeof; + sysctlbyname(nameStr, &result, &len, null, 0); + return result; } else { static assert(0, "Don't know how to get N CPUs on this OS."); } - return atomicLoad(result); } /* @@ -3327,7 +3350,7 @@ number of worker threads in the instance returned by $(D taskPool). */ @property uint defaultPoolThreads() @trusted { - auto local = atomicLoad(_defaultPoolThreads); + const local = atomicLoad(_defaultPoolThreads); return local < uint.max ? local : totalCPUs - 1; } From 4f9b7efbf40edb7aaa68ba74fa8a4dc5885d38b2 Mon Sep 17 00:00:00 2001 From: Andrei Alexandrescu Date: Thu, 26 Oct 2017 16:38:41 -0400 Subject: [PATCH 3/3] Propagate safety properly --- std/parallelism.d | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/std/parallelism.d b/std/parallelism.d index d5d39226281..d4bd5fa1580 100644 --- a/std/parallelism.d +++ b/std/parallelism.d @@ -144,10 +144,11 @@ if (is(Unqual!T : T) return local; } - import std.traits; + import std.traits : SetFunctionAttributes; alias Fun = SetFunctionAttributes!(typeof(&impl), "D", functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_); - return (cast(Fun) &impl)(); + auto purified = (() @trusted => cast(Fun) &impl)(); + return purified(); } // Returns the size of a cache line. @@ -168,7 +169,7 @@ private size_t cacheLineSizeImpl() @nogc nothrow @trusted return result; } -unittest +@nogc @safe nothrow unittest { assert(cacheLineSize == cacheLineSizeImpl); }