Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 72 additions & 116 deletions std/parallelism.d
Original file line number Diff line number Diff line change
Expand Up @@ -99,81 +99,87 @@ else version(NetBSD)
version = useSysctlbyname;
}

/*
A lazily initialized global constant. The underlying value is a shared global
statically initialized to `outOfBandValue` which must not be a legit value of
the constant. Upon the first call the situation is detected and the global is
initialized by calling `initializer`. The initializer is assumed to be pure
(even if not marked as such), i.e. return the same value upon repeated calls.
For that reason, no special precautions are taken so `initializer` may be called
more than one time leading to benign races on the cached value.

In the quiescent state the cost of the function is an atomic load from a global.

Params:
T = The type of the pseudo-constant (may be qualified)
outOfBandValue = A value that cannot be valid, it is used for initialization
initializer = The function performing initialization; must be `nothrow`

Returns:
The lazily initialized value
*/
package @property pure
T lazilyInitializedConstant(T, alias outOfBandValue, alias initializer)()
if (is(Unqual!T : T)
&& is(typeof(initializer()) : T)
&& is(typeof(outOfBandValue) : T))

version(Windows)
{
static T impl() nothrow
{
// Thread-local cache
static Unqual!T tls = outOfBandValue;
auto local = tls;
// Shortest path, no atomic operations
if (local != outOfBandValue) return local;
// Process-level cache
static shared Unqual!T result = outOfBandValue;
// Initialize both process-level cache and tls
local = atomicLoad(result);
if (local == outOfBandValue)
// BUGS: Only works on Windows 2000 and above.
shared static this()
{
import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo;
import std.algorithm.comparison : max;

SYSTEM_INFO si;
GetSystemInfo(&si);
totalCPUs = max(1, cast(uint) si.dwNumberOfProcessors);
}

}
else version(linux)
{
shared static this()
{
import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
}
}
else version(Solaris)
{
shared static this()
{
import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
totalCPUs = cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
}
}
else version(useSysctlbyname)
{
extern(C) int sysctlbyname(
const char *, void *, size_t *, void *, size_t
);

shared static this()
{
version(OSX)
{
auto nameStr = "machdep.cpu.core_count\0".ptr;
}
else version(FreeBSD)
{
auto nameStr = "hw.ncpu\0".ptr;
}
else version(NetBSD)
{
local = initializer();
atomicStore(result, local);
auto nameStr = "hw.ncpu\0".ptr;
}
tls = local;
return local;

uint ans;
size_t len = uint.sizeof;
sysctlbyname(nameStr, &ans, &len, null, 0);
totalCPUs = ans;
}

import std.traits : SetFunctionAttributes;
alias Fun = SetFunctionAttributes!(typeof(&impl), "D",
functionAttributes!(typeof(&impl)) | FunctionAttribute.pure_);
auto purified = (() @trusted => cast(Fun) &impl)();
return purified();
}
else
{
static assert(0, "Don't know how to get N CPUs on this OS.");
}

// Returns the size of a cache line.
alias cacheLineSize =
lazilyInitializedConstant!(immutable(size_t), size_t.max, cacheLineSizeImpl);

private size_t cacheLineSizeImpl() @nogc nothrow @trusted
immutable size_t cacheLineSize;
shared static this()
{
size_t result = 0;
import core.cpuid : datacache;
foreach (ref const cachelevel; datacache)
size_t lineSize = 0;
foreach (cachelevel; datacache)
{
if (cachelevel.lineSize > result && cachelevel.lineSize < uint.max)
if (cachelevel.lineSize > lineSize && cachelevel.lineSize < uint.max)
{
result = cachelevel.lineSize;
lineSize = cachelevel.lineSize;
}
}
return result;
}

@nogc @safe nothrow unittest
{
assert(cacheLineSize == cacheLineSizeImpl);
cacheLineSize = lineSize;
}


/* Atomics code. These forward to core.atomic, but are written like this
for two reasons:

Expand Down Expand Up @@ -939,64 +945,11 @@ if (is(typeof(fun(args))) && isSafeTask!F)
return ret;
}

version(useSysctlbyname)
private extern(C) int sysctlbyname(
const char *, void *, size_t *, void *, size_t
) @nogc nothrow;

/**
The total number of CPU cores available on the current machine, as reported by
the operating system.
*/
alias totalCPUs =
lazilyInitializedConstant!(immutable(uint), uint.max, totalCPUsImpl);

uint totalCPUsImpl() @nogc nothrow @trusted
{
version(Windows)
{
// BUGS: Only works on Windows 2000 and above.
import core.sys.windows.windows : SYSTEM_INFO, GetSystemInfo;
import std.algorithm.comparison : max;
SYSTEM_INFO si;
GetSystemInfo(&si);
return max(1, cast(uint) si.dwNumberOfProcessors);
}
else version(linux)
{
import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
}
else version(Solaris)
{
import core.sys.posix.unistd : _SC_NPROCESSORS_ONLN, sysconf;
return cast(uint) sysconf(_SC_NPROCESSORS_ONLN);
}
else version(useSysctlbyname)
{
version(OSX)
{
auto nameStr = "machdep.cpu.core_count\0".ptr;
}
else version(FreeBSD)
{
auto nameStr = "hw.ncpu\0".ptr;
}
else version(NetBSD)
{
auto nameStr = "hw.ncpu\0".ptr;
}

uint result;
size_t len = result.sizeof;
sysctlbyname(nameStr, &result, &len, null, 0);
return result;
}
else
{
static assert(0, "Don't know how to get N CPUs on this OS.");
}
}
immutable uint totalCPUs;

/*
This class serves two purposes:
Expand Down Expand Up @@ -3341,7 +3294,11 @@ terminating the main thread.
}());
}

private shared uint _defaultPoolThreads = uint.max;
private shared uint _defaultPoolThreads;
shared static this()
{
atomicStore(_defaultPoolThreads, totalCPUs - 1);
}

/**
These properties get and set the number of worker threads in the $(D TaskPool)
Expand All @@ -3351,8 +3308,7 @@ number of worker threads in the instance returned by $(D taskPool).
*/
@property uint defaultPoolThreads() @trusted
{
const local = atomicLoad(_defaultPoolThreads);
return local < uint.max ? local : totalCPUs - 1;
return atomicLoad(_defaultPoolThreads);
}

/// Ditto
Expand Down