From 829670823f0b0f3c9894a4204d1a06321e22c43d Mon Sep 17 00:00:00 2001 From: Rasmus Villemoes Date: Mon, 14 Nov 2016 22:10:45 +0100 Subject: [PATCH 1/6] oelite/function.py: add some thoughts on async PythonFunction --- lib/oelite/function.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/lib/oelite/function.py b/lib/oelite/function.py index 456ada18..41394508 100644 --- a/lib/oelite/function.py +++ b/lib/oelite/function.py @@ -68,6 +68,22 @@ def start(self, cwd): self.result = True +# Making a PythonFunction run asynchronously is not that easy: +# +# (1) we cannot use threads, since many of the functions +# (e.g. do_fetch, do_unpack) expect to have a specific $CWD, and +# that's a global resource in the process - those functions would fail +# immediately when the main thread chdirs away. +# +# (2) we cannot just fork() and do everything in the child, since some +# PythonFunctions really must mutate state in the main oe process +# (most notably all hook functions that run during and immediately +# after recipe parsing). +# +# (3) even if we do (2) on an opt-in basis, I'm not entirely convinced +# we never rely on e.g. do_unpack changing that task's +# metadata. Nevertheless, this is what we'll try to do. + class PythonFunction(OEliteFunction): def __init__(self, meta, var, name=None, tmpdir=None, recursion_path=None, From beb0af6c4a71df738e2bfdca12126cf55fbbf8f5 Mon Sep 17 00:00:00 2001 From: Rasmus Villemoes Date: Mon, 14 Nov 2016 22:27:03 +0100 Subject: [PATCH 2/6] oelite/function.py: implement _start and wait in PythonFunction For now this is just a copy of the generic implementations in OEliteFunction. When we implement async python functions in terms of fork(), we could just do the umask and chdir in the child, but since we only do some python functions asynchronously, we'd have to duplicate the try..finally stuff in the synchronous case, and that's not worth it for saving four system calls in the parent. --- lib/oelite/function.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/oelite/function.py b/lib/oelite/function.py index 41394508..aa2c67a5 100644 --- a/lib/oelite/function.py +++ b/lib/oelite/function.py @@ -118,6 +118,12 @@ def __init__(self, meta, var, name=None, tmpdir=None, recursion_path=None, super(PythonFunction, self).__init__(meta, var, name, tmpdir) return + def _start(self): + self.result = self() + + def wait(self, poll=False): + return self.result + def __call__(self): if self.set_os_environ: From 466c4ad96778fdabb8a984154594b69fdfc1b04f Mon Sep 17 00:00:00 2001 From: Rasmus Villemoes Date: Mon, 14 Nov 2016 23:24:30 +0100 Subject: [PATCH 3/6] oelite/function.py: allow python functions to set __async flag Well, sort of. For now we just implement retrieving e.g. do_fetch[__async], but it has to be False. I do get_flag with expand=CLEAN_EXPANSION to allow me to do something like do_fetch[__async] = "${__ASYNC_FETCH}" with the latter set in local.conf, or not set at all. The bool(int( ... or 0)) dance is so that all of unset, "", "0", "1", False, True etc. do the expected thing. The double underscores are to ensure that these variables and flags to not affect metadata hashes. --- lib/oelite/function.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/oelite/function.py b/lib/oelite/function.py index aa2c67a5..d820569d 100644 --- a/lib/oelite/function.py +++ b/lib/oelite/function.py @@ -115,14 +115,21 @@ def __init__(self, meta, var, name=None, tmpdir=None, recursion_path=None, self.function = l[var] self.set_os_environ = set_os_environ self.result = False + self.async = bool(int(meta.get_flag(var, "__async", expand=oelite.meta.CLEAN_EXPANSION) or 0)) super(PythonFunction, self).__init__(meta, var, name, tmpdir) return def _start(self): - self.result = self() + if not self.async: + self.result = self() + return + raise NotImplementedError("async PythonFunction not implemented yet") def wait(self, poll=False): - return self.result + if not self.async: + assert(self.result is True or self.result is False) + return self.result + raise NotImplementedError("async PythonFunction not implemented yet") def __call__(self): From 9ca3b09ca4142ed9cb858439b7224556becfab64 Mon Sep 17 00:00:00 2001 From: Rasmus Villemoes Date: Tue, 15 Nov 2016 00:39:46 +0100 Subject: [PATCH 4/6] oelite/function.py: implement support for asynchronous PythonFunction The list of PythonFunction tasks includes at least: do_fetch do_unpack do_stage do_split do_package These are all rather I/O bound, and do_fetch in particular is prone to stall the entire build if it is trying to fetch from an unresponsive or just slow server - worst case, with our default timeout settings, we can end up waiting 10 minutes, during which we may completely fail to start other tasks. This implements support for making a particular python task function run asynchronously; simply add e.g. do_fetch[__async] = True above the do_fetch definition in fetch.oeclass. In order for a task function implemented as a PythonFunction to safely run asynchronously, it must not rely on mutating state in the OE-lite process. Checking that is a rather tedious and error-prone job, so this is mostly an experimental feature for now. --- lib/oelite/function.py | 70 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/lib/oelite/function.py b/lib/oelite/function.py index d820569d..2c602e6e 100644 --- a/lib/oelite/function.py +++ b/lib/oelite/function.py @@ -8,6 +8,7 @@ import warnings import re import subprocess +import traceback class OEliteFunction(object): @@ -123,13 +124,78 @@ def _start(self): if not self.async: self.result = self() return - raise NotImplementedError("async PythonFunction not implemented yet") + # prevent duplicate output from stdio buffers + sys.stdout.flush() + sys.stderr.flush() + + self.childpid = os.fork() + # This raise OSError on error, so there's no < 0 case to consider. + if self.childpid > 0: + # parent + return + + # child + + # If there's an exception, we want to get as much info as + # possible printed, not just the stringification of the + # exception object itself. The traceback module "exactly + # mimics the behavior of the Python interpreter when it prints + # a stack trace". + + # We can only tell our parent how it went via our exit + # code. Important: We cannot call sys.exit(), since that is + # implemented by raising SystemExit, and we really must not + # return from this function - otherwise we go all the way back + # to the main loop in baker.py, get caught by the try-finally + # block, which then triggers the "wait for remaining tasks" + # logic, and we fail miserably since we do not have the child + # being waited for (that's us!). So we use + # os._exit(). However, we then need to ensure proper buffer + # flushing etc. manually. + exitcode = 0 + try: + ret = self() + if not ret: + exitcode = 1 + except: + traceback.print_exc() + exitcode = 2 + # We don't want any silly error during what should be the + # proper way to shutdown manually to interfere with the exit + # code. + try: + sys.stdout.flush() + sys.stderr.flush() + # What else do we need to do? + finally: + os._exit(exitcode) + assert(0) # not reached def wait(self, poll=False): if not self.async: assert(self.result is True or self.result is False) return self.result - raise NotImplementedError("async PythonFunction not implemented yet") + + flags = 0 + if poll: + flags = os.WNOHANG + + pid, status = os.waitpid(self.childpid, flags) + if not pid: + # This should only happen if we passed WNOHANG. + assert(poll) + return None + + assert(pid == self.childpid) + if os.WIFEXITED(status): + if os.WEXITSTATUS(status) == 0: + return True + print "forked python process exited with status %d" % os.WEXITSTATUS(status) + elif os.WIFSIGNALED(status): + print "forked python process killed from signal %d" % os.WTERMSIG(status) + else: + print "forked python process died for unknown reason (%d)" % status + return False def __call__(self): From 3ab17fccbfd9ffc278aedb9c2e228061223bbb13 Mon Sep 17 00:00:00 2001 From: Rasmus Villemoes Date: Tue, 15 Nov 2016 08:35:17 +0100 Subject: [PATCH 5/6] classes: maybe set __async flag on various task functions For now, this allows controlling/experimenting with which of the tasks fetch, unpack, stage, split and package that are run asynchronously by setting variables __ASYNC_FOO in local.conf. --- classes/fetch.oeclass | 2 ++ classes/package.oeclass | 2 ++ classes/stage.oeclass | 1 + 3 files changed, 5 insertions(+) diff --git a/classes/fetch.oeclass b/classes/fetch.oeclass index e60e3160..1dc9b60a 100644 --- a/classes/fetch.oeclass +++ b/classes/fetch.oeclass @@ -65,6 +65,7 @@ def fetch_init(d): do_fetch[dirs] = "${INGREDIENTS}" +do_fetch[__async] = "${__ASYNC_FETCH}" def do_fetch(d): sigfile_changed = False for uri in d.get("__fetch"): @@ -91,6 +92,7 @@ def do_fetch(d): do_unpack[dirs] = "${SRCDIR}" do_unpack[cleandirs] = "${SRCDIR}" +do_unpack[__async] = "${__ASYNC_UNPACK}" def do_unpack(d): for uri in d.get("__fetch"): if "unpack" in uri.params: diff --git a/classes/package.oeclass b/classes/package.oeclass index b0c2bf43..7033b695 100644 --- a/classes/package.oeclass +++ b/classes/package.oeclass @@ -17,6 +17,7 @@ do_split[dirs] = "${PKGD} ${D}" # The FILES_* and ALLOW_EMPTY_* variables are exclusive for do_split task META_EMIT_PREFIX += "split:FILES_${PN} split:ALLOW_EMPTY" +do_split[__async] = "${__ASYNC_SPLIT}" def do_split(d): import errno, stat @@ -151,6 +152,7 @@ LICENSE[emit] = "do_package" HOMEPAGE[emit] = "do_package" MAINTAINER[emit] = "do_package" +do_package[__async] = "${__ASYNC_PACKAGE}" def do_package(d): import bb, os diff --git a/classes/stage.oeclass b/classes/stage.oeclass index ada46cda..19107562 100644 --- a/classes/stage.oeclass +++ b/classes/stage.oeclass @@ -15,6 +15,7 @@ do_stage[dirs] = "${STAGE_DIR}" do_stage[recdeptask] = "DEPENDS:do_package" do_stage[import] = "set_stage" +do_stage[__async] = "${__ASYNC_STAGE}" def do_stage(d): def get_dstdir(cwd, package): return os.path.join(cwd, package.type) From 5eb766b45bf67a7a9d6839bd25b61ddf638d1c00 Mon Sep 17 00:00:00 2001 From: Rasmus Villemoes Date: Thu, 17 Nov 2016 12:44:19 +0100 Subject: [PATCH 6/6] chrpath.oeclass: also run do_chrpath asynchronously --- classes/chrpath.oeclass | 1 + 1 file changed, 1 insertion(+) diff --git a/classes/chrpath.oeclass b/classes/chrpath.oeclass index 2153cc61..9683b096 100644 --- a/classes/chrpath.oeclass +++ b/classes/chrpath.oeclass @@ -36,6 +36,7 @@ CHRPATH_REPLACE_STAGEDIRS:cross = "1" CHRPATH_REPLACE_STAGEDIRS:sdk-cross = "1" do_chrpath[dirs] = "${D}" +do_chrpath[__async] = "${__ASYNC_CHRPATH}" def do_chrpath(d): import stat import oelite.magiccache