From 92d8bc5265dcd1cb8045b95f2674624a34d34b9e Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Mon, 25 Aug 2014 17:16:38 -0500 Subject: [PATCH 01/37] Factor out a _send method in mpi_engine. --- distarray/mpi_engine.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/distarray/mpi_engine.py b/distarray/mpi_engine.py index 1bba45c5..29a7dc44 100644 --- a/distarray/mpi_engine.py +++ b/distarray/mpi_engine.py @@ -115,7 +115,7 @@ def func_call(self, msg): res = new_func(*args, **kwargs) if autoproxyize and isinstance(res, LocalArray): res = module.proxyize(res) - Engine.INTERCOMM.send(res, dest=self.client_rank) + self._send(res) def execute(self, msg): main = import_module('__main__') @@ -134,7 +134,7 @@ def pull(self, msg): name = msg[1] module = import_module('__main__') res = reduce(getattr, [module] + name.split('.')) - Engine.INTERCOMM.send(res, dest=self.client_rank) + self._send(res) def free_comm(self, msg): comm = msg[1].dereference() @@ -156,4 +156,8 @@ def builtin_call(self, msg): args, kwargs = self.arg_kwarg_proxy_converter(args, kwargs) res = func(*args, **kwargs) - Engine.INTERCOMM.send(res, dest=self.client_rank) + self._send(res) + + def _send(self, msg, dest=None): + dest = self.client_rank if dest is None else dest + Engine.INTERCOMM.send(msg, dest=dest) From e27d079488a100b09734cba8584d94cda8fff839 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Mon, 25 Aug 2014 18:27:41 -0500 Subject: [PATCH 02/37] Another small refactoring for mpi_engine. --- distarray/mpi_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distarray/mpi_engine.py b/distarray/mpi_engine.py index 29a7dc44..e59969f1 100644 --- a/distarray/mpi_engine.py +++ b/distarray/mpi_engine.py @@ -34,7 +34,7 @@ def __init__(self): # make engines intracomm (Context._base_comm): Engine.INTERCOMM = initial_comm_setup() - assert self.world.rank != self.client_rank + assert self.is_engine() while True: msg = Engine.INTERCOMM.recv(source=self.client_rank) val = self.parse_msg(msg) From 7c0ae22f55dc847e25c3e06058b7231362cc82af Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Mon, 25 Aug 2014 18:29:45 -0500 Subject: [PATCH 03/37] WIP: Lazy evaluation for MPIContext. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Won’t yet handle multiple expressions. --- distarray/globalapi/context.py | 83 ++++++++++++++++++++--- distarray/globalapi/tests/test_context.py | 22 +++++- distarray/localapi/proxyize.py | 13 +++- distarray/mpi_engine.py | 31 +++++++-- 4 files changed, 130 insertions(+), 19 deletions(-) diff --git a/distarray/globalapi/context.py b/distarray/globalapi/context.py index c593fe5d..2829a899 100644 --- a/distarray/globalapi/context.py +++ b/distarray/globalapi/context.py @@ -28,7 +28,7 @@ from distarray.globalapi.ipython_utils import IPythonClient from distarray.utils import uid, nonce, has_exactly_one -from distarray.localapi.proxyize import Proxy +from distarray.localapi.proxyize import Proxy, lazy_proxyize # mpi context from distarray.mpionly_utils import (make_targets_comm, @@ -71,7 +71,7 @@ def make_subcomm(self, new_targets): pass @abstractmethod - def apply(self, func, args=None, kwargs=None, targets=None): + def apply(self, func, args=None, kwargs=None, targets=None, nresults=None): pass @abstractmethod @@ -743,7 +743,8 @@ def _execute(self, lines, targets): def _push(self, d, targets): return self.view.push(d, targets=targets, block=True) - def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): + def apply(self, func, args=None, kwargs=None, targets=None, + autoproxyize=False, nresults=1): """ Analogous to IPython.parallel.view.apply_sync @@ -758,6 +759,8 @@ def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): engines func is to be run on. autoproxyize: bool, default False If True, implicitly return a Proxy object from the function. + nresults: int, default 1 + Number of return values. Only implemented for MPIContext. Returns ------- @@ -856,7 +859,7 @@ def delete_key(self, key, targets=None): if MPIContext.INTERCOMM: self._send_msg(msg, targets=targets) - def __init__(self, targets=None): + def __init__(self, targets=None, lazy=False): if MPIContext.INTERCOMM is None: MPIContext.INTERCOMM = initial_comm_setup() @@ -866,6 +869,16 @@ def __init__(self, targets=None): self.all_targets = list(range(self.nengines)) self.targets = self.all_targets if targets is None else sorted(targets) + self.lazy = lazy # is the context in lazy-communication mode? + + # message queues used for lazy mode + # mapping: target -> queue of messages for that target + + # _sendq: batches up messages to send upon sync() + self._sendq = dict([(t, []) for t in self.targets]) + # _recvq: stores proxy objects for expected return values + self._recvq = dict([(t, []) for t in self.targets]) + # make/get comms # this is the object we want to use with push, pull, etc' self._comm_from_targets = {} @@ -910,14 +923,28 @@ def close(self): def _send_msg(self, msg, targets=None): targets = self.targets if targets is None else targets - for t in targets: - MPIContext.INTERCOMM.send(msg, dest=t) + if self.lazy and msg[0] != 'process_message_queue': + for t in targets: + self._sendq[t].append(msg) + else: + for t in targets: + MPIContext.INTERCOMM.send(msg, dest=t) - def _recv_msg(self, targets=None): + def _recv_msg(self, targets=None, nresults=1, sync=False): res = [] targets = self.targets if targets is None else targets for t in targets: - res.append(MPIContext.INTERCOMM.recv(source=t)) + if self.lazy and not sync: + if nresults in {0, 1}: + res.append(lazy_proxyize()) + else: + target_results = [] + for i in range(nresults): + target_results.append(lazy_proxyize()) + res.append(target_results) + self._recvq[t].append(res[-1]) + else: + res.append(MPIContext.INTERCOMM.recv(source=t)) return res def make_subcomm(self, targets): @@ -948,7 +975,8 @@ def _push(self, d, targets=None): msg = ('push', d) return self._send_msg(msg, targets=targets) - def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): + def apply(self, func, args=None, kwargs=None, targets=None, + autoproxyize=False, nresults=1): """ Analogous to IPython.parallel.view.apply_sync @@ -963,6 +991,8 @@ def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): engines func is to be run on. autoproxyize: bool, default False If True, implicitly return a Proxy object from the function. + nresults: int, default 1 + Number of return values. Only needed for lazy evaluation. Returns ------- @@ -992,11 +1022,44 @@ def apply(self, func, args=None, kwargs=None, targets=None, autoproxyize=False): msg = ('builtin_call', func, args, kwargs, autoproxyize) self._send_msg(msg, targets=targets) - return self._recv_msg(targets=targets) + return self._recv_msg(targets=targets, nresults=nresults) def push_function(self, key, func, targets=None): push_function(self, key, func, targets=targets) + def sync(self, targets=None): + """Send queued messages, fill in expected result values.""" + targets = self.targets if targets is None else targets + for t in targets: + msg = ('process_message_queue', self._sendq[t]) + self._send_msg(msg, targets=[t]) + self._sendq[t] = [] # empty the send queue + results = self._recv_msg(targets=[t], sync=True)[0] + lresults = self._recvq[t] + for lres, res in zip(lresults, results): + # multiple return values + if isinstance(res, collections.Sequence): + if len(lres) != len(res): + msg = ("Reserved lazy result object isn't the same" + " size as the actual result object: {} != {}") + raise TypeError(msg.format(len(lres), len(res))) + for sublres, subres in zip(lres, res): + if isinstance(subres, Proxy): + sublres.__dict__ = subres.__dict__ + else: + msg = ("Only DistArray return values are " + "supported in lazy mode.") + raise TypeError(msg) + # single return value + else: + if isinstance(res, Proxy): + lres.__dict__ = res.__dict__ + else: + msg = ("Only DistArray return values are " + "currently supported in lazy mode.") + raise TypeError(msg) + self._recvq[t] = [] # empty the recv queue + class ContextCreationError(RuntimeError): pass diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index bac00754..47d3b62a 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -19,11 +19,31 @@ from numpy.testing import assert_allclose, assert_array_equal -from distarray.testing import DefaultContextTestCase, IPythonContextTestCase, check_targets +from distarray.testing import (DefaultContextTestCase, IPythonContextTestCase, + MPIContextTestCase, check_targets) from distarray.globalapi.context import Context from distarray.globalapi.maps import Distribution from distarray.mpionly_utils import is_solo_mpi_process from distarray.localapi import LocalArray +from distarray.localapi.proxyize import LazyPlaceholder + + +@unittest.skipIf(is_solo_mpi_process(), # not in MPI mode + "Cannot test MPIContext in IPython mode") +class TestLazyEval(MPIContextTestCase): + + ntargets = 'any' + + def test_lazy_eval(self): + a = self.context.zeros((5, 5)) + b = self.context.ones((5, 5)) + with self.context.lazy_eval(): + c = a + b + d = 2*a + b + self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + assert_array_equal(c.toarray(), a.toarray() + b.toarray()) + assert_array_equal(d.toarray(), 2*a.toarray() + b.toarray()) class TestRegister(DefaultContextTestCase): diff --git a/distarray/localapi/proxyize.py b/distarray/localapi/proxyize.py index 97bc32b5..615ec97c 100644 --- a/distarray/localapi/proxyize.py +++ b/distarray/localapi/proxyize.py @@ -6,7 +6,12 @@ from importlib import import_module -from distarray.utils import DISTARRAY_BASE_NAME +from distarray.utils import DISTARRAY_BASE_NAME, nonce + + +class LazyPlaceholder(object): + pass + class Proxy(object): @@ -28,6 +33,12 @@ def cleanup(self): self.name = self.module_name = self.type_str = None +def lazy_proxyize(): + """Return a Proxy object for a delayed ("lazy") value.""" + name = DISTARRAY_BASE_NAME + "lazy_" + nonce() + return Proxy(name, LazyPlaceholder(), '__main__') + + class Proxyize(object): """Callable that, given an object, returns a Proxy object. diff --git a/distarray/mpi_engine.py b/distarray/mpi_engine.py index e59969f1..9bf513e5 100644 --- a/distarray/mpi_engine.py +++ b/distarray/mpi_engine.py @@ -35,6 +35,8 @@ def __init__(self): # make engines intracomm (Context._base_comm): Engine.INTERCOMM = initial_comm_setup() assert self.is_engine() + self.lazy = False + self._client_sendq = [] while True: msg = Engine.INTERCOMM.recv(source=self.client_rank) val = self.parse_msg(msg) @@ -75,7 +77,8 @@ def parse_msg(self, msg): 'free_comm': self.free_comm, 'delete': self.delete, 'make_targets_comm': self.engine_make_targets_comm, - 'builtin_call': self.builtin_call} + 'builtin_call': self.builtin_call, + 'process_message_queue': self.process_message_queue} func = what[to_do] ret = func(msg) return ret @@ -88,6 +91,7 @@ def delete(self, msg): name = obj try: module = import_module('__main__') + print('delete', module, name) delattr(module, name) except AttributeError: pass @@ -115,7 +119,7 @@ def func_call(self, msg): res = new_func(*args, **kwargs) if autoproxyize and isinstance(res, LocalArray): res = module.proxyize(res) - self._send(res) + self._client_send(res) def execute(self, msg): main = import_module('__main__') @@ -134,7 +138,7 @@ def pull(self, msg): name = msg[1] module = import_module('__main__') res = reduce(getattr, [module] + name.split('.')) - self._send(res) + self._client_send(res) def free_comm(self, msg): comm = msg[1].dereference() @@ -156,8 +160,21 @@ def builtin_call(self, msg): args, kwargs = self.arg_kwarg_proxy_converter(args, kwargs) res = func(*args, **kwargs) - self._send(res) + self._client_send(res) - def _send(self, msg, dest=None): - dest = self.client_rank if dest is None else dest - Engine.INTERCOMM.send(msg, dest=dest) + def process_message_queue(self, msg): + mq = msg[1] + self.lazy = True + for submsg in mq: + val = self.parse_msg(submsg) + if val == 'kill': + return val + self.lazy = False + self._client_send(self._client_sendq) + self._client_sendq = [] + + def _client_send(self, msg): + if self.lazy: + self._client_sendq.append(msg) + else: + Engine.INTERCOMM.send(msg, dest=self.client_rank) From ad5fb4daa0bc837c469509c83468debbcd439ed8 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Mon, 25 Aug 2014 18:30:19 -0500 Subject: [PATCH 04/37] Predict output dtype for ufuncs... MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit so we don’t have to deal with multiple return values. --- distarray/globalapi/functions.py | 56 ++++++++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 7 deletions(-) diff --git a/distarray/globalapi/functions.py b/distarray/globalapi/functions.py index 24a058dc..567a24ed 100644 --- a/distarray/globalapi/functions.py +++ b/distarray/globalapi/functions.py @@ -35,6 +35,25 @@ __all__.append(func_name) +def unary_output_dtype(ufunc, val): + """Determine the output dtype of a unary ufunc with input `val`. + + Use the ufunc.types attribute and the input dtype. + """ + input_dtype = numpy.result_type(val) # find out dtype of scalars + # Look at the built-in implementations to find an output type. + for input_type, _, _, output_type in ufunc.types: + if input_dtype.char == input_type: + return numpy.dtype(output_type) + # Nothing found. Try coercion. + for input_type, _, _, output_type in ufunc.types: + if numpy.can_cast(input_dtype, input_type): + return numpy.dtype(output_type) + else: # Can't even coerce to a known input type. Give up. + msg = "Unary ufunc doesn't have a mapping for this type: {}." + raise TypeError(msg.format(input_dtype)) + + def unary_proxy(name): def proxy_func(a, *args, **kwargs): context = determine_context(a) @@ -44,18 +63,41 @@ def func_call(func_name, arr_name, args, kwargs): dotted_name = 'distarray.localapi.%s' % (func_name,) func = get_from_dotted_name(dotted_name) res = func(arr_name, *args, **kwargs) - return proxyize(res), res.dtype # noqa + return proxyize(res) res = context.apply(func_call, args=(name, a.key, args, kwargs), targets=a.targets) - new_key = res[0][0] - dtype = res[0][1] + new_key = res[0] + dtype = unary_output_dtype(getattr(numpy, name), a) return DistArray.from_localarrays(new_key, distribution=a.distribution, dtype=dtype) return proxy_func +def binary_output_dtype(ufunc, val0, val1): + """Determine the output dtype of a binary ufunc, given input values. + + Use the ufunc.types attribute and the input dtypes. + """ + # find out dtype of scalars + input_dtype_0, input_dtype_1 = map(numpy.result_type, (val0, val1)) + # Look at the built-in implementations to find an output type. + for input_type_0, input_type_1, _, _, output_type in ufunc.types: + if ((input_dtype_0.char == input_type_0) and + (input_dtype_1.char == input_type_1)): + return numpy.dtype(output_type) + # Nothing found. Try coercion. + for input_type_0, input_type_1, _, _, output_type in ufunc.types: + if (numpy.can_cast(input_dtype_0, input_type_0) and + numpy.can_cast(input_dtype_1, input_type_1)): + return numpy.dtype(output_type) + else: # Can't even coerce to a known input type. Give up. + msg = ("Binary ufunc doesn't have a mapping for these input types: " + "{}, {}") + raise TypeError(msg.format(input_dtype_0, input_dtype_1)) + + def binary_proxy(name): def proxy_func(a, b, *args, **kwargs): context = determine_context(a, b) @@ -83,12 +125,12 @@ def func_call(func_name, a, b, args, kwargs): dotted_name = 'distarray.localapi.%s' % (func_name,) func = get_from_dotted_name(dotted_name) res = func(a, b, *args, **kwargs) - return proxyize(res), res.dtype # noqa + return proxyize(res) res = context.apply(func_call, args=(name, a_key, b_key, args, kwargs), - targets=distribution.targets) - new_key = res[0][0] - dtype = res[0][1] + targets=distribution.targets, nresults=1) + new_key = res[0] + dtype = binary_output_dtype(getattr(numpy, name), a, b) return DistArray.from_localarrays(new_key, distribution=distribution, dtype=dtype) From 9354bf3ca0ba5da309b735351557102bca3f92fb Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Mon, 25 Aug 2014 18:30:47 -0500 Subject: [PATCH 05/37] Add a context manager for the laziness. --- distarray/globalapi/context.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/distarray/globalapi/context.py b/distarray/globalapi/context.py index 2829a899..ad1392f2 100644 --- a/distarray/globalapi/context.py +++ b/distarray/globalapi/context.py @@ -17,6 +17,7 @@ from abc import ABCMeta, abstractmethod from functools import wraps +from contextlib import contextmanager import numpy @@ -921,6 +922,17 @@ def close(self): # End of key management routines. + @contextmanager + def lazy_eval(self): + """Context manager that enables lazy evaluation. + + On exit, call `sync()` and set `self.lazy` to False + """ + self.lazy = True + yield self + self.sync() + self.lazy = False + def _send_msg(self, msg, targets=None): targets = self.targets if targets is None else targets if self.lazy and msg[0] != 'process_message_queue': From 19fee1fdbcae194fb55d0d9a6345079034d2c625 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Mon, 25 Aug 2014 18:31:05 -0500 Subject: [PATCH 06/37] Remove trailing whitespace. --- distarray/globalapi/context.py | 4 ++-- distarray/globalapi/tests/test_context.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/distarray/globalapi/context.py b/distarray/globalapi/context.py index ad1392f2..e2237314 100644 --- a/distarray/globalapi/context.py +++ b/distarray/globalapi/context.py @@ -207,7 +207,7 @@ def local_allclose(la, lb, rtol, atol): from numpy import allclose return allclose(la.ndarray, lb.ndarray, rtol, atol) - local_results = self.apply(local_allclose, + local_results = self.apply(local_allclose, (a.key, b.key, rtol, atol), targets=a.targets) return all(local_results) @@ -581,7 +581,7 @@ def is_NoneType(pxy): return pxy.type_str == str(type(None)) def is_LocalArray(pxy): - return (isinstance(pxy, Proxy) and + return (isinstance(pxy, Proxy) and pxy.type_str == "") if all(is_LocalArray(r) for r in results): diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index 47d3b62a..b2e73bdd 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -73,7 +73,7 @@ def test_local_sin(self): def local_sin(da): return numpy.sin(da) self.context.register(local_sin) - + db = self.context.local_sin(self.da) assert_allclose(0, db.tondarray(), atol=1e-14) @@ -166,7 +166,7 @@ def local_none(da): self.assertTrue(dp is None) def test_parameterless(self): - + def parameterless(): """This is a parameterless function.""" return None From 8c9a6e3cdbbe1fdd26b72ac90793cdb1e1cf309c Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Tue, 26 Aug 2014 11:13:29 -0500 Subject: [PATCH 07/37] Remove the unsupported case in the lazy test. --- distarray/globalapi/tests/test_context.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index b2e73bdd..57132444 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -39,11 +39,8 @@ def test_lazy_eval(self): b = self.context.ones((5, 5)) with self.context.lazy_eval(): c = a + b - d = 2*a + b self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) - self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) assert_array_equal(c.toarray(), a.toarray() + b.toarray()) - assert_array_equal(d.toarray(), 2*a.toarray() + b.toarray()) class TestRegister(DefaultContextTestCase): From c468d0cfe6e213219595b3d84404af94d61f0597 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Tue, 26 Aug 2014 13:08:28 -0500 Subject: [PATCH 08/37] Add more passing tests and one failing one (skipped) --- distarray/globalapi/tests/test_context.py | 78 ++++++++++++++++++++++- 1 file changed, 75 insertions(+), 3 deletions(-) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index 57132444..d231121b 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -34,14 +34,86 @@ class TestLazyEval(MPIContextTestCase): ntargets = 'any' - def test_lazy_eval(self): - a = self.context.zeros((5, 5)) - b = self.context.ones((5, 5)) + def test_single_add(self): + a = self.context.zeros((5, 6)) + b = self.context.ones((5, 6)) with self.context.lazy_eval(): c = a + b self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) assert_array_equal(c.toarray(), a.toarray() + b.toarray()) + def test_single_mult(self): + a = self.context.zeros((5, 6)) + b = self.context.ones((5, 6)) + with self.context.lazy_eval(): + c = a * b + self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) + assert_array_equal(c.toarray(), a.toarray() * b.toarray()) + + def test_constant_mult(self): + a = self.context.zeros((5, 6)) + with self.context.lazy_eval(): + c = a * 2 + self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) + assert_array_equal(c.toarray(), a.toarray() * 2) + + def test_two_identical_add_expr(self): + a = self.context.zeros((5, 6)) + b = self.context.ones((5, 6)) + with self.context.lazy_eval(): + c = a + b + d = a + b + self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + assert_array_equal(c.toarray(), a.toarray() + b.toarray()) + assert_array_equal(d.toarray(), a.toarray() + b.toarray()) + + def test_two_lazy_add_expr(self): + a = self.context.zeros((5, 6)) + b = self.context.ones((5, 6)) + with self.context.lazy_eval(): + c = a + b + d = a + b + self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + assert_array_equal(c.toarray(), a.toarray() + b.toarray()) + assert_array_equal(d.toarray(), a.toarray() + b.toarray()) + + def test_different_adds(self): + a = self.context.zeros((5, 6)) + b = self.context.ones((5, 6)) + c = self.context.ones((5, 6)) + 1 + d = self.context.ones((5, 6)) + 2 + with self.context.lazy_eval(): + e = a + b + f = c + d + self.assertTrue(isinstance(e.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(f.key.dereference(), LazyPlaceholder)) + assert_array_equal(e.toarray(), a.toarray() + b.toarray()) + assert_array_equal(f.toarray(), c.toarray() + d.toarray()) + + def test_more_different_adds(self): + a = self.context.zeros((5, 6)) + b = self.context.ones((5, 6)) + c = self.context.ones((5, 6)) + 1 + with self.context.lazy_eval(): + e = a + b + f = b + c + self.assertTrue(isinstance(e.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(f.key.dereference(), LazyPlaceholder)) + assert_array_equal(e.toarray(), a.toarray() + b.toarray()) + assert_array_equal(f.toarray(), b.toarray() + c.toarray()) + + @unittest.skip('Not yet supported') + def test_double_add(self): + a = self.context.zeros((5, 6)) + b = self.context.ones((5, 6)) + c = self.context.ones((5, 6)) + 1 + with self.context.lazy_eval(): + d = a + b + c + self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + assert_array_equal(d.toarray(), a.toarray() + b.toarray() + c.toarray()) + class TestRegister(DefaultContextTestCase): From c307e753404abb49cedd440e5a244ad86e471d55 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Tue, 26 Aug 2014 17:51:28 -0500 Subject: [PATCH 09/37] Remove a print statement. --- distarray/mpi_engine.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distarray/mpi_engine.py b/distarray/mpi_engine.py index 9bf513e5..b099ac66 100644 --- a/distarray/mpi_engine.py +++ b/distarray/mpi_engine.py @@ -91,7 +91,6 @@ def delete(self, msg): name = obj try: module = import_module('__main__') - print('delete', module, name) delattr(module, name) except AttributeError: pass From fb019bd29bca06bfa32ac786e3bde99526983307 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Tue, 26 Aug 2014 18:33:35 -0500 Subject: [PATCH 10/37] Test unary ufuncs. --- distarray/globalapi/tests/test_context.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index d231121b..f028ccf6 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -21,6 +21,7 @@ from distarray.testing import (DefaultContextTestCase, IPythonContextTestCase, MPIContextTestCase, check_targets) +import distarray.globalapi as gapi from distarray.globalapi.context import Context from distarray.globalapi.maps import Distribution from distarray.mpionly_utils import is_solo_mpi_process @@ -104,7 +105,17 @@ def test_more_different_adds(self): assert_array_equal(e.toarray(), a.toarray() + b.toarray()) assert_array_equal(f.toarray(), b.toarray() + c.toarray()) - @unittest.skip('Not yet supported') + def test_unary_ufuncs(self): + a = self.context.ones((5, 6)) + b = -1 * self.context.ones((5, 6)) + with self.context.lazy_eval(): + c = -a + d = gapi.absolute(b) + self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + assert_array_equal(c.toarray(), -a.toarray()) + assert_array_equal(d.toarray(), numpy.absolute(b.toarray())) + def test_double_add(self): a = self.context.zeros((5, 6)) b = self.context.ones((5, 6)) From 900efd3037ad639681b899e01561a8ee511cd694 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Tue, 26 Aug 2014 18:36:17 -0500 Subject: [PATCH 11/37] Skip an unsupported test. --- distarray/globalapi/tests/test_context.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index f028ccf6..b6530e00 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -116,6 +116,7 @@ def test_unary_ufuncs(self): assert_array_equal(c.toarray(), -a.toarray()) assert_array_equal(d.toarray(), numpy.absolute(b.toarray())) + @unittest.skip('Not yet supported') def test_double_add(self): a = self.context.zeros((5, 6)) b = self.context.ones((5, 6)) From 6003ce69d27efda8a531f7b08993427e6099b393 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Wed, 27 Aug 2014 18:29:10 -0500 Subject: [PATCH 12/37] Fix dependent values (and leftover lazy proxies). --- distarray/globalapi/context.py | 2 +- distarray/globalapi/tests/test_context.py | 16 ++++++++++++++-- distarray/localapi/proxyize.py | 9 ++++++--- distarray/mpi_engine.py | 21 ++++++++++++++++----- 4 files changed, 37 insertions(+), 11 deletions(-) diff --git a/distarray/globalapi/context.py b/distarray/globalapi/context.py index e2237314..db0aac8f 100644 --- a/distarray/globalapi/context.py +++ b/distarray/globalapi/context.py @@ -1043,7 +1043,7 @@ def sync(self, targets=None): """Send queued messages, fill in expected result values.""" targets = self.targets if targets is None else targets for t in targets: - msg = ('process_message_queue', self._sendq[t]) + msg = ('process_message_queue', self._recvq[t], self._sendq[t]) self._send_msg(msg, targets=[t]) self._sendq[t] = [] # empty the send queue results = self._recv_msg(targets=[t], sync=True)[0] diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index b6530e00..49c7eb5b 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -116,11 +116,23 @@ def test_unary_ufuncs(self): assert_array_equal(c.toarray(), -a.toarray()) assert_array_equal(d.toarray(), numpy.absolute(b.toarray())) - @unittest.skip('Not yet supported') - def test_double_add(self): + def test_dependent_add(self): a = self.context.zeros((5, 6)) b = self.context.ones((5, 6)) c = self.context.ones((5, 6)) + 1 + self.context.lazy = True + with self.context.lazy_eval(): + t0 = a + b + d = t0 + c + self.assertTrue(isinstance(t0.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + assert_array_equal(d.toarray(), a.toarray() + b.toarray() + c.toarray()) + + def test_temporary_value(self): + a = self.context.zeros((5, 6)) + b = self.context.ones((5, 6)) + c = self.context.ones((5, 6)) + 1 + self.context.lazy = True with self.context.lazy_eval(): d = a + b + c self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) diff --git a/distarray/localapi/proxyize.py b/distarray/localapi/proxyize.py index 615ec97c..5298e290 100644 --- a/distarray/localapi/proxyize.py +++ b/distarray/localapi/proxyize.py @@ -15,9 +15,10 @@ class LazyPlaceholder(object): class Proxy(object): - def __init__(self, name, obj, module_name): + def __init__(self, name, obj, module_name, lazy=False): self.name = name self.module_name = module_name + self.lazy = lazy self.type_str = str(type(obj)) namespace = import_module(self.module_name) setattr(namespace, self.name, obj) @@ -29,14 +30,16 @@ def dereference(self): def cleanup(self): namespace = import_module(self.module_name) - delattr(namespace, self.name) + if 'lazy' not in self.name: + delattr(namespace, self.name) self.name = self.module_name = self.type_str = None def lazy_proxyize(): """Return a Proxy object for a delayed ("lazy") value.""" name = DISTARRAY_BASE_NAME + "lazy_" + nonce() - return Proxy(name, LazyPlaceholder(), '__main__') + return Proxy(name=name, obj=LazyPlaceholder(), + module_name='__main__', lazy=True) class Proxyize(object): diff --git a/distarray/mpi_engine.py b/distarray/mpi_engine.py index b099ac66..41e9aeba 100644 --- a/distarray/mpi_engine.py +++ b/distarray/mpi_engine.py @@ -7,16 +7,19 @@ The engine_loop function and utilities necessary for it. """ +from collections import OrderedDict from functools import reduce from importlib import import_module import types +from distarray.externals.six import next from distarray.localapi import LocalArray from distarray.localapi.proxyize import Proxy from distarray.mpionly_utils import (initial_comm_setup, make_targets_comm, get_comm_world) +from pprint import pprint class Engine(object): @@ -37,6 +40,7 @@ def __init__(self): assert self.is_engine() self.lazy = False self._client_sendq = [] + self._value_from_name = OrderedDict() while True: msg = Engine.INTERCOMM.recv(source=self.client_rank) val = self.parse_msg(msg) @@ -50,7 +54,7 @@ def arg_kwarg_proxy_converter(self, args, kwargs): args = list(args) for i, a in enumerate(args): if isinstance(a, module.Proxy): - args[i] = a.dereference() + args[i] = self._value_from_name.get(a.name, a).dereference() args = tuple(args) # convert kwargs @@ -96,7 +100,6 @@ def delete(self, msg): pass def func_call(self, msg): - func_data = msg[1] args = msg[2] kwargs = msg[3] @@ -162,18 +165,26 @@ def builtin_call(self, msg): self._client_send(res) def process_message_queue(self, msg): - mq = msg[1] - self.lazy = True - for submsg in mq: + # we need the recvq (msg[1]) to see which values are returned from + # which expression + lazy_proxies = msg[1] + self._value_from_name = OrderedDict([(lp.name, None) for lp in + lazy_proxies]) + self._current_rval = iter(self._value_from_name) + msgq = msg[2] + self.lazy = True # this msg is only received in lazy mode + for submsg in msgq: val = self.parse_msg(submsg) if val == 'kill': return val self.lazy = False self._client_send(self._client_sendq) self._client_sendq = [] + self._value_from_name = OrderedDict() def _client_send(self, msg): if self.lazy: + self._value_from_name[next(self._current_rval)] = msg self._client_sendq.append(msg) else: Engine.INTERCOMM.send(msg, dest=self.client_rank) From 6bc2dc323e71b021a6eaacecc653a80b40890a60 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Wed, 27 Aug 2014 18:29:19 -0500 Subject: [PATCH 13/37] Whitespace. --- distarray/globalapi/tests/test_context.py | 1 + distarray/localapi/proxyize.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index 49c7eb5b..04e8d5fc 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -514,6 +514,7 @@ def foo(): self.assertEqual(set(r[0].name for r in res), set([res[0][0].name])) self.assertEqual(set(r[-1].name for r in res), set([res[0][-1].name])) + class TestGetBaseComm(DefaultContextTestCase): ntargets = 'any' diff --git a/distarray/localapi/proxyize.py b/distarray/localapi/proxyize.py index 5298e290..82cda36d 100644 --- a/distarray/localapi/proxyize.py +++ b/distarray/localapi/proxyize.py @@ -24,7 +24,7 @@ def __init__(self, name, obj, module_name, lazy=False): setattr(namespace, self.name, obj) def dereference(self): - """ Callable only on the engines. """ + """Callable only on the engines.""" namespace = import_module(self.module_name) return getattr(namespace, self.name) From 28bb22a2216acc29e0c49a4b0c1a4902dd9f2bca Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Wed, 27 Aug 2014 18:34:30 -0500 Subject: [PATCH 14/37] Add a complicated expression test. --- distarray/globalapi/tests/test_context.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index 04e8d5fc..6d7d8e0e 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -138,6 +138,20 @@ def test_temporary_value(self): self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) assert_array_equal(d.toarray(), a.toarray() + b.toarray() + c.toarray()) + def test_complex_expressions(self): + a = self.context.zeros((5, 6)) + b = self.context.ones((5, 6)) + c = self.context.ones((5, 6)) + 1 + self.context.lazy = True + with self.context.lazy_eval(): + d = (2*a + (3*b + 4*c)) / 2 + e = gapi.negative(d * d) + self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + d_expected = (2*a.toarray() + (3*b.toarray() + 4*c.toarray())) / 2 + e_expected = numpy.negative(d * d) + assert_array_equal(d.toarray(), d_expected) + assert_array_equal(e.toarray(), e_expected) + class TestRegister(DefaultContextTestCase): From 78c3d008d1497b3ae0cb72c0a2b40a16ed3e3fd9 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Wed, 27 Aug 2014 18:35:27 -0500 Subject: [PATCH 15/37] Add one more check. --- distarray/globalapi/tests/test_context.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index 6d7d8e0e..86f665e3 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -147,6 +147,7 @@ def test_complex_expressions(self): d = (2*a + (3*b + 4*c)) / 2 e = gapi.negative(d * d) self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(e.key.dereference(), LazyPlaceholder)) d_expected = (2*a.toarray() + (3*b.toarray() + 4*c.toarray())) / 2 e_expected = numpy.negative(d * d) assert_array_equal(d.toarray(), d_expected) From 82b809af7efa7aeff025c83d49f5258a5b903e58 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Thu, 28 Aug 2014 12:13:18 -0500 Subject: [PATCH 16/37] Update the module docstring. --- distarray/mpi_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distarray/mpi_engine.py b/distarray/mpi_engine.py index 41e9aeba..b7c667ab 100644 --- a/distarray/mpi_engine.py +++ b/distarray/mpi_engine.py @@ -4,7 +4,7 @@ # Distributed under the terms of the BSD License. See COPYING.rst. # --------------------------------------------------------------------------- """ -The engine_loop function and utilities necessary for it. +The MPI-based `Engine` class. """ from collections import OrderedDict From b5fb98643ab82f6064f8c6ab4a782da0ebadd603 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Thu, 28 Aug 2014 13:04:23 -0500 Subject: [PATCH 17/37] Add some docstrings and comments. --- distarray/mpi_engine.py | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/distarray/mpi_engine.py b/distarray/mpi_engine.py index b7c667ab..a8e90edb 100644 --- a/distarray/mpi_engine.py +++ b/distarray/mpi_engine.py @@ -19,14 +19,16 @@ from distarray.mpionly_utils import (initial_comm_setup, make_targets_comm, get_comm_world) -from pprint import pprint class Engine(object): + """MPI-based worker class.""" + INTERCOMM = None def __init__(self): + """Setup and execute the main ``recv`` loop.""" self.world = get_comm_world() self.world_ranks = list(range(self.world.size)) @@ -49,6 +51,7 @@ def __init__(self): Engine.INTERCOMM.Free() def arg_kwarg_proxy_converter(self, args, kwargs): + """Dereference proxy arguments and update computed lazy proxies.""" module = import_module('__main__') # convert args args = list(args) @@ -66,12 +69,14 @@ def arg_kwarg_proxy_converter(self, args, kwargs): return args, kwargs def is_engine(self): + """Is this an engine (as opposed to the client)?""" if self.world.rank != self.client_rank: return True else: return False def parse_msg(self, msg): + """Given a message, execute the proper function.""" to_do = msg[0] what = {'func_call': self.func_call, 'execute': self.execute, @@ -88,6 +93,10 @@ def parse_msg(self, msg): return ret def delete(self, msg): + """Process the 'delete' message. + + Cleans up the namespace. + """ obj = msg[1] if isinstance(obj, Proxy): obj.cleanup() @@ -100,6 +109,7 @@ def delete(self, msg): pass def func_call(self, msg): + """Process the 'func_call' message.""" func_data = msg[1] args = msg[2] kwargs = msg[3] @@ -124,11 +134,13 @@ def func_call(self, msg): self._client_send(res) def execute(self, msg): + """Process the 'execute' message.""" main = import_module('__main__') code = msg[1] exec(code, main.__dict__) def push(self, msg): + """Process the 'push' message.""" d = msg[1] module = import_module('__main__') for k, v in d.items(): @@ -137,24 +149,31 @@ def push(self, msg): setattr(place, pieces[-1], v) def pull(self, msg): + """Process the 'pull' message.""" name = msg[1] module = import_module('__main__') res = reduce(getattr, [module] + name.split('.')) self._client_send(res) def free_comm(self, msg): + """Call `Free` on a communicator.""" comm = msg[1].dereference() comm.Free() def kill(self, msg): - """Break out of the engine loop.""" + """Process the 'kill' message. + + Breaks out of the engine loop. + """ return 'kill' def engine_make_targets_comm(self, msg): + """Process the 'make_targets_comm' message.""" targets = msg[1] make_targets_comm(targets) def builtin_call(self, msg): + """Process the 'builtin_call' message.""" func = msg[1] args = msg[2] kwargs = msg[3] @@ -165,14 +184,21 @@ def builtin_call(self, msg): self._client_send(res) def process_message_queue(self, msg): + """Process the 'process_message_queue' message. + + This temporarily puts the engine in ``lazy`` mode, queueing up all the + replies (`_client_send`) to be sent at once. + """ # we need the recvq (msg[1]) to see which values are returned from # which expression lazy_proxies = msg[1] + # set up mapping from lazy_proxy names to their computed values + # values to be filled in as the queue is processed self._value_from_name = OrderedDict([(lp.name, None) for lp in lazy_proxies]) self._current_rval = iter(self._value_from_name) msgq = msg[2] - self.lazy = True # this msg is only received in lazy mode + self.lazy = True # 'process_message_queue' only received in lazy mode for submsg in msgq: val = self.parse_msg(submsg) if val == 'kill': @@ -183,6 +209,10 @@ def process_message_queue(self, msg): self._value_from_name = OrderedDict() def _client_send(self, msg): + """Send a message to the client. + + If in lazy mode, just queue up the message. + """ if self.lazy: self._value_from_name[next(self._current_rval)] = msg self._client_sendq.append(msg) From 6f98892408eb863f56344bf16b24ac0044a38361 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Thu, 28 Aug 2014 13:39:07 -0500 Subject: [PATCH 18/37] Remove extraneous statements. --- distarray/globalapi/tests/test_context.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index 86f665e3..2d504c61 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -120,7 +120,6 @@ def test_dependent_add(self): a = self.context.zeros((5, 6)) b = self.context.ones((5, 6)) c = self.context.ones((5, 6)) + 1 - self.context.lazy = True with self.context.lazy_eval(): t0 = a + b d = t0 + c @@ -132,7 +131,6 @@ def test_temporary_value(self): a = self.context.zeros((5, 6)) b = self.context.ones((5, 6)) c = self.context.ones((5, 6)) + 1 - self.context.lazy = True with self.context.lazy_eval(): d = a + b + c self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) @@ -142,7 +140,6 @@ def test_complex_expressions(self): a = self.context.zeros((5, 6)) b = self.context.ones((5, 6)) c = self.context.ones((5, 6)) + 1 - self.context.lazy = True with self.context.lazy_eval(): d = (2*a + (3*b + 4*c)) / 2 e = gapi.negative(d * d) From 981646221712fa21bd91e90b3a1d849ce8299ff9 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Fri, 29 Aug 2014 13:13:24 -0500 Subject: [PATCH 19/37] Allow DistArray creation inside lazy eval. --- distarray/globalapi/context.py | 22 ++++--- distarray/globalapi/tests/test_context.py | 71 +++++++++-------------- distarray/localapi/proxyize.py | 9 ++- distarray/mpi_engine.py | 5 +- 4 files changed, 54 insertions(+), 53 deletions(-) diff --git a/distarray/globalapi/context.py b/distarray/globalapi/context.py index db0aac8f..eb83b51d 100644 --- a/distarray/globalapi/context.py +++ b/distarray/globalapi/context.py @@ -29,7 +29,7 @@ from distarray.globalapi.ipython_utils import IPythonClient from distarray.utils import uid, nonce, has_exactly_one -from distarray.localapi.proxyize import Proxy, lazy_proxyize +from distarray.localapi.proxyize import Proxy, lazy_proxyize, lazy_name # mpi context from distarray.mpionly_utils import (make_targets_comm, @@ -945,17 +945,21 @@ def _send_msg(self, msg, targets=None): def _recv_msg(self, targets=None, nresults=1, sync=False): res = [] targets = self.targets if targets is None else targets - for t in targets: - if self.lazy and not sync: - if nresults in {0, 1}: - res.append(lazy_proxyize()) + if self.lazy and not sync: + result_names = [lazy_name() for n in range(nresults)] + for t in targets: + if nresults == 0: + res.append(None) + elif nresults == 1: + res.append(lazy_proxyize(name=result_names[0])) else: target_results = [] - for i in range(nresults): - target_results.append(lazy_proxyize()) + for name in result_names: + target_results.append(lazy_proxyize(name)) res.append(target_results) self._recvq[t].append(res[-1]) - else: + else: + for t in targets: res.append(MPIContext.INTERCOMM.recv(source=t)) return res @@ -1046,6 +1050,8 @@ def sync(self, targets=None): msg = ('process_message_queue', self._recvq[t], self._sendq[t]) self._send_msg(msg, targets=[t]) self._sendq[t] = [] # empty the send queue + + for t in targets: results = self._recv_msg(targets=[t], sync=True)[0] lresults = self._recvq[t] for lres, res in zip(lresults, results): diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index 2d504c61..41a50437 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -36,42 +36,31 @@ class TestLazyEval(MPIContextTestCase): ntargets = 'any' def test_single_add(self): - a = self.context.zeros((5, 6)) - b = self.context.ones((5, 6)) + a = self.context.zeros((53, 63)) + b = self.context.ones((53, 63)) with self.context.lazy_eval(): c = a + b self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) assert_array_equal(c.toarray(), a.toarray() + b.toarray()) def test_single_mult(self): - a = self.context.zeros((5, 6)) - b = self.context.ones((5, 6)) + a = self.context.zeros((54, 64)) + b = self.context.ones((54, 64)) with self.context.lazy_eval(): c = a * b self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) assert_array_equal(c.toarray(), a.toarray() * b.toarray()) def test_constant_mult(self): - a = self.context.zeros((5, 6)) + a = self.context.zeros((55, 65)) with self.context.lazy_eval(): c = a * 2 self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) assert_array_equal(c.toarray(), a.toarray() * 2) def test_two_identical_add_expr(self): - a = self.context.zeros((5, 6)) - b = self.context.ones((5, 6)) - with self.context.lazy_eval(): - c = a + b - d = a + b - self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) - self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) - assert_array_equal(c.toarray(), a.toarray() + b.toarray()) - assert_array_equal(d.toarray(), a.toarray() + b.toarray()) - - def test_two_lazy_add_expr(self): - a = self.context.zeros((5, 6)) - b = self.context.ones((5, 6)) + a = self.context.zeros((56, 66)) + b = self.context.ones((56, 66)) with self.context.lazy_eval(): c = a + b d = a + b @@ -81,10 +70,10 @@ def test_two_lazy_add_expr(self): assert_array_equal(d.toarray(), a.toarray() + b.toarray()) def test_different_adds(self): - a = self.context.zeros((5, 6)) - b = self.context.ones((5, 6)) - c = self.context.ones((5, 6)) + 1 - d = self.context.ones((5, 6)) + 2 + a = self.context.zeros((58, 68)) + b = self.context.ones((58, 68)) + c = self.context.ones((58, 68)) + 1 + d = self.context.ones((58, 68)) + 2 with self.context.lazy_eval(): e = a + b f = c + d @@ -94,9 +83,9 @@ def test_different_adds(self): assert_array_equal(f.toarray(), c.toarray() + d.toarray()) def test_more_different_adds(self): - a = self.context.zeros((5, 6)) - b = self.context.ones((5, 6)) - c = self.context.ones((5, 6)) + 1 + a = self.context.zeros((59, 69)) + b = self.context.ones((59, 69)) + c = self.context.ones((59, 69)) + 1 with self.context.lazy_eval(): e = a + b f = b + c @@ -106,8 +95,8 @@ def test_more_different_adds(self): assert_array_equal(f.toarray(), b.toarray() + c.toarray()) def test_unary_ufuncs(self): - a = self.context.ones((5, 6)) - b = -1 * self.context.ones((5, 6)) + a = self.context.ones((60, 70)) + b = -1 * self.context.ones((60, 70)) with self.context.lazy_eval(): c = -a d = gapi.absolute(b) @@ -117,9 +106,9 @@ def test_unary_ufuncs(self): assert_array_equal(d.toarray(), numpy.absolute(b.toarray())) def test_dependent_add(self): - a = self.context.zeros((5, 6)) - b = self.context.ones((5, 6)) - c = self.context.ones((5, 6)) + 1 + a = self.context.zeros((61, 71)) + b = self.context.ones((61, 71)) + c = self.context.ones((61, 71)) + 1 with self.context.lazy_eval(): t0 = a + b d = t0 + c @@ -127,19 +116,10 @@ def test_dependent_add(self): self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) assert_array_equal(d.toarray(), a.toarray() + b.toarray() + c.toarray()) - def test_temporary_value(self): - a = self.context.zeros((5, 6)) - b = self.context.ones((5, 6)) - c = self.context.ones((5, 6)) + 1 - with self.context.lazy_eval(): - d = a + b + c - self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) - assert_array_equal(d.toarray(), a.toarray() + b.toarray() + c.toarray()) - def test_complex_expressions(self): - a = self.context.zeros((5, 6)) - b = self.context.ones((5, 6)) - c = self.context.ones((5, 6)) + 1 + a = self.context.zeros((52, 62)) + b = self.context.ones((52, 62)) + c = self.context.ones((52, 62)) + 1 with self.context.lazy_eval(): d = (2*a + (3*b + 4*c)) / 2 e = gapi.negative(d * d) @@ -150,6 +130,13 @@ def test_complex_expressions(self): assert_array_equal(d.toarray(), d_expected) assert_array_equal(e.toarray(), e_expected) + def test_lazy_creation(self): + with self.context.lazy_eval(): + a = self.context.zeros((50, 60)) + b = self.context.zeros((50, 60)) + assert_array_equal(b.toarray(), numpy.zeros((50, 60))) + assert_array_equal(a.toarray(), numpy.zeros((50, 60))) + class TestRegister(DefaultContextTestCase): diff --git a/distarray/localapi/proxyize.py b/distarray/localapi/proxyize.py index 82cda36d..893a673c 100644 --- a/distarray/localapi/proxyize.py +++ b/distarray/localapi/proxyize.py @@ -35,9 +35,14 @@ def cleanup(self): self.name = self.module_name = self.type_str = None -def lazy_proxyize(): +def lazy_name(): + return DISTARRAY_BASE_NAME + "lazy_" + nonce() + + +def lazy_proxyize(name=None): """Return a Proxy object for a delayed ("lazy") value.""" - name = DISTARRAY_BASE_NAME + "lazy_" + nonce() + if name is None: + name = lazy_name() return Proxy(name=name, obj=LazyPlaceholder(), module_name='__main__', lazy=True) diff --git a/distarray/mpi_engine.py b/distarray/mpi_engine.py index a8e90edb..2396a8d3 100644 --- a/distarray/mpi_engine.py +++ b/distarray/mpi_engine.py @@ -57,7 +57,10 @@ def arg_kwarg_proxy_converter(self, args, kwargs): args = list(args) for i, a in enumerate(args): if isinstance(a, module.Proxy): - args[i] = self._value_from_name.get(a.name, a).dereference() + if a.lazy: + args[i] = self._value_from_name[a.name].dereference() + else: + args[i] = a.dereference() args = tuple(args) # convert kwargs From c2f35ad8b71193d47e69eeee7696c8b23eeefa39 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Fri, 29 Aug 2014 13:13:44 -0500 Subject: [PATCH 20/37] Whitespace. --- distarray/globalapi/distarray.py | 1 - distarray/globalapi/maps.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/distarray/globalapi/distarray.py b/distarray/globalapi/distarray.py index 517ca133..69b97c47 100644 --- a/distarray/globalapi/distarray.py +++ b/distarray/globalapi/distarray.py @@ -77,7 +77,6 @@ def from_localarrays(cls, key, context=None, targets=None, distribution=None, If `dtype` is not provided, it will be fetched from the engines. """ - def get_dim_datas_and_dtype(arr): return (arr.dim_data, arr.dtype) diff --git a/distarray/globalapi/maps.py b/distarray/globalapi/maps.py index 69f644b3..a0961ddc 100644 --- a/distarray/globalapi/maps.py +++ b/distarray/globalapi/maps.py @@ -271,7 +271,7 @@ def view(self, new_dimsize): def is_compatible(self, other): return (isinstance(other, (NoDistMap, BlockMap, BlockCyclicMap)) and - other.grid_size == self.grid_size and + other.grid_size == self.grid_size and other.size == self.size) From f0f45e55bf5d8ea68c70013e69dc3d8b68615051 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Fri, 29 Aug 2014 13:18:31 -0500 Subject: [PATCH 21/37] Add a more complex test. --- distarray/globalapi/tests/test_context.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index 41a50437..8f70da2e 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -137,6 +137,20 @@ def test_lazy_creation(self): assert_array_equal(b.toarray(), numpy.zeros((50, 60))) assert_array_equal(a.toarray(), numpy.zeros((50, 60))) + def test_creation_and_expressions(self): + with self.context.lazy_eval(): + a = self.context.zeros((52, 62)) + b = self.context.ones((52, 62)) + c = self.context.ones((52, 62)) + 1 + d = (2*a + (3*b + 4*c)) / 2 + e = gapi.negative(d * d) + self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + self.assertTrue(isinstance(e.key.dereference(), LazyPlaceholder)) + d_expected = (2*a.toarray() + (3*b.toarray() + 4*c.toarray())) / 2 + e_expected = numpy.negative(d * d) + assert_array_equal(d.toarray(), d_expected) + assert_array_equal(e.toarray(), e_expected) + class TestRegister(DefaultContextTestCase): From 8311c577a84e53241d9bbd0ee9302bbba803e5a8 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Fri, 29 Aug 2014 16:35:07 -0500 Subject: [PATCH 22/37] Add a test for a user-defined function. --- distarray/globalapi/tests/test_context.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index 8f70da2e..2a06c8ad 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -22,7 +22,7 @@ from distarray.testing import (DefaultContextTestCase, IPythonContextTestCase, MPIContextTestCase, check_targets) import distarray.globalapi as gapi -from distarray.globalapi.context import Context +from distarray.globalapi.context import DistArray, Context from distarray.globalapi.maps import Distribution from distarray.mpionly_utils import is_solo_mpi_process from distarray.localapi import LocalArray @@ -151,6 +151,18 @@ def test_creation_and_expressions(self): assert_array_equal(d.toarray(), d_expected) assert_array_equal(e.toarray(), e_expected) + def test_user_function(self): + with self.context.lazy_eval(): + def local_square(la): + return la * la + da = self.context.ones((30, 40)) * 2 + new_key = self.context.apply(local_square, (da.key,), autoproxyize=True)[0] + new_da = DistArray.from_localarrays(key=new_key, + distribution=da.distribution, + dtype=int) + assert_array_equal(new_da.toarray(), (numpy.ones((30, 40)) * 2) ** 2) + + class TestRegister(DefaultContextTestCase): From 341a2f97ad50499040fb21bf74d38d616b4e2c65 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Tue, 2 Sep 2014 22:25:32 -0500 Subject: [PATCH 23/37] Add a test for multiple return values. --- distarray/globalapi/tests/test_context.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index 2a06c8ad..d54d551b 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -162,6 +162,23 @@ def local_square(la): dtype=int) assert_array_equal(new_da.toarray(), (numpy.ones((30, 40)) * 2) ** 2) + def test_multiple_return_values(self): + da = self.context.ones((30, 40)) * 2 + with self.context.lazy_eval(): + self.context.lazy = True + def local_powers(la): + return proxyize(la * la), proxyize(la * la * la) + key0, key1 = self.context.apply(local_powers, (da.key,), nresults=2)[0] + da0 = DistArray.from_localarrays(key=key0, + distribution=da.distribution, + dtype=int) + self.assertTrue(isinstance(da0.key.dereference(), LazyPlaceholder)) + da1 = DistArray.from_localarrays(key=key1, + distribution=da.distribution, + dtype=int) + self.assertTrue(isinstance(da1.key.dereference(), LazyPlaceholder)) + assert_array_equal(da0.toarray(), (numpy.ones((30, 40)) * 2) ** 2) + assert_array_equal(da1.toarray(), (numpy.ones((30, 40)) * 2) ** 3) class TestRegister(DefaultContextTestCase): From 82a31f8adb3f18c525eb7fa844972f7aa48b3c21 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Tue, 2 Sep 2014 22:25:56 -0500 Subject: [PATCH 24/37] Fix multiple return values. --- distarray/mpi_engine.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/distarray/mpi_engine.py b/distarray/mpi_engine.py index 2396a8d3..42da9cfc 100644 --- a/distarray/mpi_engine.py +++ b/distarray/mpi_engine.py @@ -7,7 +7,7 @@ The MPI-based `Engine` class. """ -from collections import OrderedDict +from collections import OrderedDict, Iterable from functools import reduce from importlib import import_module import types @@ -197,8 +197,17 @@ def process_message_queue(self, msg): lazy_proxies = msg[1] # set up mapping from lazy_proxy names to their computed values # values to be filled in as the queue is processed - self._value_from_name = OrderedDict([(lp.name, None) for lp in - lazy_proxies]) + self._value_from_name = [] + for val in lazy_proxies: + if isinstance(val, Proxy): + self._value_from_name.append((val.name, None)) + elif isinstance(val, Iterable): + self._value_from_name.extend([(lp.name, None) for lp in val]) + else: + msg = "recvq contains an unrecognized type." + raise TypeError(msg) + self._value_from_name = OrderedDict(self._value_from_name) + self._current_rval = iter(self._value_from_name) msgq = msg[2] self.lazy = True # 'process_message_queue' only received in lazy mode From d9a708e1851112ba2e473dfff4e4303a5b1dcf16 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Tue, 2 Sep 2014 22:26:04 -0500 Subject: [PATCH 25/37] Improve an error message. --- distarray/globalapi/context.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distarray/globalapi/context.py b/distarray/globalapi/context.py index eb83b51d..ac4a4bb5 100644 --- a/distarray/globalapi/context.py +++ b/distarray/globalapi/context.py @@ -1066,7 +1066,8 @@ def sync(self, targets=None): sublres.__dict__ = subres.__dict__ else: msg = ("Only DistArray return values are " - "supported in lazy mode.") + "supported in lazy mode. Type is: {}" + "".format(type(subres))) raise TypeError(msg) # single return value else: From 1f6e99d93ae38644383dc054f30e2640a37e99ee Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Tue, 2 Sep 2014 22:34:48 -0500 Subject: [PATCH 26/37] Add a test without the context manager. --- distarray/globalapi/tests/test_context.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index d54d551b..b3e928e5 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -36,6 +36,16 @@ class TestLazyEval(MPIContextTestCase): ntargets = 'any' def test_single_add(self): + a = self.context.zeros((53, 63)) + b = self.context.ones((53, 63)) + self.context.lazy = True + c = a + b + self.assertTrue(isinstance(c.key.dereference(), LazyPlaceholder)) + self.context.sync() + self.context.lazy = False + assert_array_equal(c.toarray(), a.toarray() + b.toarray()) + + def test_single_add_with_context_manager(self): a = self.context.zeros((53, 63)) b = self.context.ones((53, 63)) with self.context.lazy_eval(): From 1298a37c5315a87b70a17e3ab623e41dd9a55c98 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Tue, 2 Sep 2014 22:48:58 -0500 Subject: [PATCH 27/37] Improve a test. --- distarray/globalapi/tests/test_context.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index b3e928e5..64661a49 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -173,9 +173,8 @@ def local_square(la): assert_array_equal(new_da.toarray(), (numpy.ones((30, 40)) * 2) ** 2) def test_multiple_return_values(self): - da = self.context.ones((30, 40)) * 2 with self.context.lazy_eval(): - self.context.lazy = True + da = self.context.ones((30, 40)) * 2 def local_powers(la): return proxyize(la * la), proxyize(la * la * la) key0, key1 = self.context.apply(local_powers, (da.key,), nresults=2)[0] From d8b75ceec52e32419202ca9074979320617d8e3b Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Tue, 2 Sep 2014 23:06:25 -0500 Subject: [PATCH 28/37] Test a lazy loop. --- distarray/globalapi/tests/test_context.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index 64661a49..615efc58 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -189,6 +189,15 @@ def local_powers(la): assert_array_equal(da0.toarray(), (numpy.ones((30, 40)) * 2) ** 2) assert_array_equal(da1.toarray(), (numpy.ones((30, 40)) * 2) ** 3) + def test_lazy_loop(self): + rvalues = [] + with self.context.lazy_eval(): + for i in range(10): + rvalues.append(i * self.context.ones((10, 3))) + + for i, value in enumerate(rvalues): + assert_array_equal(value.toarray(), i * numpy.ones((10, 3))) + class TestRegister(DefaultContextTestCase): From 30ea47989c870d5ab01035c63d2e4057a4d1ee71 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Tue, 2 Sep 2014 23:29:11 -0500 Subject: [PATCH 29/37] Add a simple benchmark. --- examples/lazy_eval/__init__.py | 0 examples/lazy_eval/benchmark_lazy_eval.py | 33 +++++++++++++++++++++++ 2 files changed, 33 insertions(+) create mode 100644 examples/lazy_eval/__init__.py create mode 100644 examples/lazy_eval/benchmark_lazy_eval.py diff --git a/examples/lazy_eval/__init__.py b/examples/lazy_eval/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/lazy_eval/benchmark_lazy_eval.py b/examples/lazy_eval/benchmark_lazy_eval.py new file mode 100644 index 00000000..dfe1db65 --- /dev/null +++ b/examples/lazy_eval/benchmark_lazy_eval.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +# encoding: utf-8 +# --------------------------------------------------------------------------- +# Copyright (C) 2008-2014, IPython Development Team and Enthought, Inc. +# Distributed under the terms of the BSD License. See COPYING.rst. +# --------------------------------------------------------------------------- + +""" +See if lazy evaluation (promise pipelining) can be more efficeint than eager +evaluation. + +""" + +from distarray.globalapi import Context, tanh +from timeit import default_timer as time + +nops = 10000 +arr_shape = (100, 100) + +context = Context() + +arr = context.ones(arr_shape) +start = time() +for _ in range(nops): + arr = tanh(arr) +print("Eager time: {:0.2f}".format(time() - start)) + +arr = context.ones(arr_shape) +start = time() +with context.lazy_eval(): + for i in range(nops): + arr = tanh(arr) +print("Lazy time: {:0.2f}".format(time() - start)) From 6e43255a636be4f587d62c9706e81774dd941cad Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Wed, 3 Sep 2014 00:14:49 -0500 Subject: [PATCH 30/37] Improve benchmark. --- examples/lazy_eval/benchmark_lazy_eval.py | 48 +++++++++++++++++------ 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/examples/lazy_eval/benchmark_lazy_eval.py b/examples/lazy_eval/benchmark_lazy_eval.py index dfe1db65..81a1797a 100644 --- a/examples/lazy_eval/benchmark_lazy_eval.py +++ b/examples/lazy_eval/benchmark_lazy_eval.py @@ -11,23 +11,45 @@ """ -from distarray.globalapi import Context, tanh +from __future__ import print_function, division + +from sys import stderr from timeit import default_timer as time +from pprint import pprint + +from distarray.globalapi import Context, tanh -nops = 10000 -arr_shape = (100, 100) + +nops_list = range(1, 10002, 1000) +arr_shape = (10, 10) +arr_size = arr_shape[0] * arr_shape[1] context = Context() -arr = context.ones(arr_shape) -start = time() -for _ in range(nops): - arr = tanh(arr) -print("Eager time: {:0.2f}".format(time() - start)) +eager = [] +lazy = [] + +for nops in nops_list: -arr = context.ones(arr_shape) -start = time() -with context.lazy_eval(): - for i in range(nops): + arr = context.ones(arr_shape) + start = time() + for _ in range(nops): arr = tanh(arr) -print("Lazy time: {:0.2f}".format(time() - start)) + result = time() - start + eager.append(result) + print('.', end='', file=stderr, flush=True) + + arr = context.ones(arr_shape) + start = time() + with context.lazy_eval(): + for i in range(nops): + arr = tanh(arr) + result = time() - start + lazy.append(result) + print('.', end='', file=stderr, flush=True) + +print(file=stderr, flush=True) +pprint({"nops": list(nops_list), + "Eager": eager, + "Lazy": lazy, + }, stream=stderr) From 483eab6d71a792776244a6be8790be91515c4d0e Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Wed, 3 Sep 2014 22:09:31 -0500 Subject: [PATCH 31/37] Fix a spelling error. --- examples/lazy_eval/benchmark_lazy_eval.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/lazy_eval/benchmark_lazy_eval.py b/examples/lazy_eval/benchmark_lazy_eval.py index 81a1797a..2f4651f3 100644 --- a/examples/lazy_eval/benchmark_lazy_eval.py +++ b/examples/lazy_eval/benchmark_lazy_eval.py @@ -6,7 +6,7 @@ # --------------------------------------------------------------------------- """ -See if lazy evaluation (promise pipelining) can be more efficeint than eager +See if lazy evaluation (promise pipelining) can be more efficient than eager evaluation. """ From b5d0f247229050d23d3ee41999b6e3c4e13cdad9 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Thu, 4 Sep 2014 14:03:57 -0500 Subject: [PATCH 32/37] Improve variable names for readability. --- examples/lazy_eval/benchmark_lazy_eval.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/examples/lazy_eval/benchmark_lazy_eval.py b/examples/lazy_eval/benchmark_lazy_eval.py index 2f4651f3..40fdd35a 100644 --- a/examples/lazy_eval/benchmark_lazy_eval.py +++ b/examples/lazy_eval/benchmark_lazy_eval.py @@ -8,7 +8,6 @@ """ See if lazy evaluation (promise pipelining) can be more efficient than eager evaluation. - """ from __future__ import print_function, division @@ -26,8 +25,8 @@ context = Context() -eager = [] -lazy = [] +eager_times = [] +lazy_times = [] for nops in nops_list: @@ -36,7 +35,7 @@ for _ in range(nops): arr = tanh(arr) result = time() - start - eager.append(result) + eager_times.append(result) print('.', end='', file=stderr, flush=True) arr = context.ones(arr_shape) @@ -45,11 +44,11 @@ for i in range(nops): arr = tanh(arr) result = time() - start - lazy.append(result) + lazy_times.append(result) print('.', end='', file=stderr, flush=True) print(file=stderr, flush=True) pprint({"nops": list(nops_list), - "Eager": eager, - "Lazy": lazy, + "Eager": eager_times, + "Lazy": lazy_times, }, stream=stderr) From fe3cd02f317b19d121097120cdda44b31bc9fa71 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Thu, 4 Sep 2014 14:27:34 -0500 Subject: [PATCH 33/37] Print better status, dump to json. --- examples/lazy_eval/benchmark_lazy_eval.py | 42 ++++++++++++++++++----- 1 file changed, 33 insertions(+), 9 deletions(-) diff --git a/examples/lazy_eval/benchmark_lazy_eval.py b/examples/lazy_eval/benchmark_lazy_eval.py index 40fdd35a..1f2f1b15 100644 --- a/examples/lazy_eval/benchmark_lazy_eval.py +++ b/examples/lazy_eval/benchmark_lazy_eval.py @@ -12,32 +12,57 @@ from __future__ import print_function, division +import json from sys import stderr from timeit import default_timer as time -from pprint import pprint +import numpy from distarray.globalapi import Context, tanh -nops_list = range(1, 10002, 1000) +nops_list = range(1, 20002, 1000) arr_shape = (10, 10) arr_size = arr_shape[0] * arr_shape[1] context = Context() +numpy_times = [] eager_times = [] lazy_times = [] +data = {"nops": list(nops_list), + "Numpy": numpy_times, + "Eager": eager_times, + "Lazy": lazy_times, + } + +total_tests = len(nops_list) * 3 +test_num = 1 for nops in nops_list: + # bench Numpy + arr = numpy.ones(arr_shape) + start = time() + for _ in range(nops): + arr = numpy.tanh(arr) + result = time() - start + numpy_times.append(result) + print('({}/{}), Numpy, {} ops, {:0.3f} s'.format(test_num, total_tests, nops, result), + file=stderr, flush=True) + test_num += 1 + + # bench DistArray eager eval arr = context.ones(arr_shape) start = time() for _ in range(nops): arr = tanh(arr) result = time() - start eager_times.append(result) - print('.', end='', file=stderr, flush=True) + print('({}/{}), Eager, {} ops, {:0.3f} s'.format(test_num, total_tests, nops, result), + file=stderr, flush=True) + test_num += 1 + # bench DistArray lazy eval arr = context.ones(arr_shape) start = time() with context.lazy_eval(): @@ -45,10 +70,9 @@ arr = tanh(arr) result = time() - start lazy_times.append(result) - print('.', end='', file=stderr, flush=True) + print('({}/{}), Lazy, {} ops, {:0.3f} s'.format(test_num, total_tests, nops, result), + file=stderr, flush=True) + test_num += 1 -print(file=stderr, flush=True) -pprint({"nops": list(nops_list), - "Eager": eager_times, - "Lazy": lazy_times, - }, stream=stderr) + with open("benchmark_data.json", 'w') as fp: + json.dump(data, fp, indent=4) From 81bedc608e7c4ba97f35929d32b7e74db4d54e45 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Thu, 4 Sep 2014 15:06:11 -0500 Subject: [PATCH 34/37] Better filesnames. --- examples/lazy_eval/benchmark_lazy_eval.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/examples/lazy_eval/benchmark_lazy_eval.py b/examples/lazy_eval/benchmark_lazy_eval.py index 1f2f1b15..35a9044b 100644 --- a/examples/lazy_eval/benchmark_lazy_eval.py +++ b/examples/lazy_eval/benchmark_lazy_eval.py @@ -13,6 +13,7 @@ from __future__ import print_function, division import json +import datetime from sys import stderr from timeit import default_timer as time @@ -74,5 +75,7 @@ file=stderr, flush=True) test_num += 1 - with open("benchmark_data.json", 'w') as fp: - json.dump(data, fp, indent=4) +now = datetime.datetime.now() +filename = '_'.join((now.strftime("%Y-%m-%dT%H-%M-%S"), str(nops_list[-1]))) + ".json" +with open(filename, 'w') as fp: + json.dump(data, fp, indent=4) From a34b002ca17f58b237565646e15c19a88702b8eb Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Thu, 4 Sep 2014 15:31:42 -0500 Subject: [PATCH 35/37] Make into functions. --- examples/lazy_eval/benchmark_lazy_eval.py | 123 ++++++++++++---------- 1 file changed, 66 insertions(+), 57 deletions(-) diff --git a/examples/lazy_eval/benchmark_lazy_eval.py b/examples/lazy_eval/benchmark_lazy_eval.py index 35a9044b..fede8352 100644 --- a/examples/lazy_eval/benchmark_lazy_eval.py +++ b/examples/lazy_eval/benchmark_lazy_eval.py @@ -21,61 +21,70 @@ from distarray.globalapi import Context, tanh -nops_list = range(1, 20002, 1000) -arr_shape = (10, 10) -arr_size = arr_shape[0] * arr_shape[1] - -context = Context() - -numpy_times = [] -eager_times = [] -lazy_times = [] - -data = {"nops": list(nops_list), - "Numpy": numpy_times, - "Eager": eager_times, - "Lazy": lazy_times, - } - -total_tests = len(nops_list) * 3 -test_num = 1 -for nops in nops_list: - - # bench Numpy - arr = numpy.ones(arr_shape) - start = time() - for _ in range(nops): - arr = numpy.tanh(arr) - result = time() - start - numpy_times.append(result) - print('({}/{}), Numpy, {} ops, {:0.3f} s'.format(test_num, total_tests, nops, result), - file=stderr, flush=True) - test_num += 1 - - # bench DistArray eager eval - arr = context.ones(arr_shape) - start = time() - for _ in range(nops): - arr = tanh(arr) - result = time() - start - eager_times.append(result) - print('({}/{}), Eager, {} ops, {:0.3f} s'.format(test_num, total_tests, nops, result), - file=stderr, flush=True) - test_num += 1 - - # bench DistArray lazy eval - arr = context.ones(arr_shape) - start = time() - with context.lazy_eval(): - for i in range(nops): +def benchmark(nops_list, arr_shape): + context = Context() + + data = {"nops": list(nops_list), + "Numpy": [], + "Eager": [], + "Lazy": [], + } + + total_tests = len(nops_list) * 3 + test_num = 1 + for nops in nops_list: + + # bench Numpy + arr = numpy.ones(arr_shape) + start = time() + for _ in range(nops): + arr = numpy.tanh(arr) + result = time() - start + data['Numpy'].append(result) + print('({}/{}), Numpy, {} ops, {:0.3f} s'.format(test_num, total_tests, nops, result), + file=stderr, flush=True) + test_num += 1 + + # bench DistArray eager eval + arr = context.ones(arr_shape) + start = time() + for _ in range(nops): arr = tanh(arr) - result = time() - start - lazy_times.append(result) - print('({}/{}), Lazy, {} ops, {:0.3f} s'.format(test_num, total_tests, nops, result), - file=stderr, flush=True) - test_num += 1 - -now = datetime.datetime.now() -filename = '_'.join((now.strftime("%Y-%m-%dT%H-%M-%S"), str(nops_list[-1]))) + ".json" -with open(filename, 'w') as fp: - json.dump(data, fp, indent=4) + result = time() - start + data['Eager'].append(result) + print('({}/{}), Eager, {} ops, {:0.3f} s'.format(test_num, total_tests, nops, result), + file=stderr, flush=True) + test_num += 1 + + # bench DistArray lazy eval + arr = context.ones(arr_shape) + start = time() + with context.lazy_eval(): + for i in range(nops): + arr = tanh(arr) + result = time() - start + data['Lazy'].append(result) + print('({}/{}), Lazy, {} ops, {:0.3f} s'.format(test_num, total_tests, nops, result), + file=stderr, flush=True) + test_num += 1 + + return data + + +def save_data(data, note=''): + now = datetime.datetime.now() + filename = '_'.join((now.strftime("%Y-%m-%dT%H-%M-%S"), note)) + ".json" + with open(filename, 'w') as fp: + json.dump(data, fp, indent=4) + + +def main(nops_list=None, arr_shape=None, note=''): + nops_list = range(1, 20002, 1000) if nops_list is None else nops_list + arr_shape = (10, 10) if arr_shape is None else arr_shape + data = benchmark(nops_list, arr_shape) + save_data(data, note) + + return data + +if __name__ == '__main__': + main() From 4ca2c90fc7ab2ca0b58039ff7b2642f824296fbb Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Thu, 4 Sep 2014 17:04:41 -0500 Subject: [PATCH 36/37] Add another test. --- distarray/globalapi/tests/test_context.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index 933d66e2..6ab4968a 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -126,6 +126,18 @@ def test_dependent_add(self): self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) assert_array_equal(d.toarray(), a.toarray() + b.toarray() + c.toarray()) + def test_temporary_add(self): + from pprint import pprint + a = self.context.zeros((61, 71)) + b = self.context.ones((61, 71)) + c = self.context.ones((61, 71)) + 1 + with self.context.lazy_eval(): + d = a + b + c + pprint(('recvq:', self.context._recvq), indent=4) + pprint(('sendq:', self.context._sendq), indent=4) + self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) + assert_array_equal(d.toarray(), a.toarray() + b.toarray() + c.toarray()) + def test_complex_expressions(self): a = self.context.zeros((52, 62)) b = self.context.ones((52, 62)) From 13401ce401d8743182e1a8ab57484ba5648a82c2 Mon Sep 17 00:00:00 2001 From: Robert David Grant Date: Thu, 4 Sep 2014 18:14:51 -0500 Subject: [PATCH 37/37] Remove pprints in newest test. --- distarray/globalapi/tests/test_context.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/distarray/globalapi/tests/test_context.py b/distarray/globalapi/tests/test_context.py index 6ab4968a..5535ecde 100644 --- a/distarray/globalapi/tests/test_context.py +++ b/distarray/globalapi/tests/test_context.py @@ -127,14 +127,11 @@ def test_dependent_add(self): assert_array_equal(d.toarray(), a.toarray() + b.toarray() + c.toarray()) def test_temporary_add(self): - from pprint import pprint a = self.context.zeros((61, 71)) b = self.context.ones((61, 71)) c = self.context.ones((61, 71)) + 1 with self.context.lazy_eval(): d = a + b + c - pprint(('recvq:', self.context._recvq), indent=4) - pprint(('sendq:', self.context._sendq), indent=4) self.assertTrue(isinstance(d.key.dereference(), LazyPlaceholder)) assert_array_equal(d.toarray(), a.toarray() + b.toarray() + c.toarray())