diff --git a/bqueryd/worker.py b/bqueryd/worker.py index 4c77ec8..4d966fb 100644 --- a/bqueryd/worker.py +++ b/bqueryd/worker.py @@ -266,78 +266,80 @@ def handle_work(self, msg): buf_file_fd, buf_file = tempfile.mkstemp(prefix='tar_') os.close(buf_file_fd) - args, kwargs = msg.get_args_kwargs() - self.logger.info('doing calc %s' % args) - filename = args[0] - groupby_col_list = args[1] - aggregation_list = args[2] - where_terms_list = args[3] - expand_filter_column = kwargs.get('expand_filter_column') - aggregate = kwargs.get('aggregate', True) - - # create rootdir - rootdir = os.path.join(self.data_dir, filename) - if not os.path.exists(rootdir): - raise Exception('Path %s does not exist' % rootdir) - - ct = bquery.ctable(rootdir=rootdir, mode='r', auto_cache=True) - - # prepare filter - if not where_terms_list: - bool_arr = None - else: - # quickly verify the where_terms_list - if not ct.where_terms_factorization_check(where_terms_list): - # return an empty result because the where terms do not give a result for this ctable - msg['data'] = '' - return msg - # else create the boolean array - bool_arr = ct.where_terms(where_terms_list, cache=True) - - # expand filter column check - if expand_filter_column: - bool_arr = ct.is_in_ordered_subgroups(basket_col=expand_filter_column, bool_arr=bool_arr) - - # retrieve & aggregate if needed - rm_file_or_dir(tmp_dir) - if aggregate: - # aggregate by groupby parameters - result_ctable = ct.groupby(groupby_col_list, aggregation_list, bool_arr=bool_arr, - rootdir=tmp_dir) - else: - # direct result from the ctable - column_list = groupby_col_list + [x[0] for x in aggregation_list] - if bool_arr is not None: - result_ctable = bcolz.fromiter(ct[column_list].where(bool_arr), ct[column_list].dtype, sum(bool_arr), - rootdir=tmp_dir, mode='w') + try: + args, kwargs = msg.get_args_kwargs() + self.logger.info('doing calc %s' % args) + filename = args[0] + groupby_col_list = args[1] + aggregation_list = args[2] + where_terms_list = args[3] + expand_filter_column = kwargs.get('expand_filter_column') + aggregate = kwargs.get('aggregate', True) + + # create rootdir + rootdir = os.path.join(self.data_dir, filename) + if not os.path.exists(rootdir): + raise Exception('Path %s does not exist' % rootdir) + + ct = bquery.ctable(rootdir=rootdir, mode='r', auto_cache=True) + + # prepare filter + if not where_terms_list: + bool_arr = None else: - result_ctable = bcolz.fromiter(ct[column_list], ct[column_list].dtype, ct.len, - rootdir=tmp_dir, mode='w') - - # *** clean up temporary files and memory objects - # filter - del bool_arr - - # input - ct.free_cachemem() - ct.clean_tmp_rootdir() - del ct - - # save result to archive - result_ctable.flush() - result_ctable.free_cachemem() - with tarfile.open(buf_file, mode='w') as archive: - archive.add(tmp_dir, arcname=os.path.basename(tmp_dir)) - del result_ctable - rm_file_or_dir(tmp_dir) - - # create message - with open(buf_file, 'r') as file: - # add result to message - msg['data'] = file.read() - rm_file_or_dir(buf_file) - - return msg + # quickly verify the where_terms_list + if not ct.where_terms_factorization_check(where_terms_list): + # return an empty result because the where terms do not give a result for this ctable + msg['data'] = '' + return msg + # else create the boolean array + bool_arr = ct.where_terms(where_terms_list, cache=True) + + # expand filter column check + if expand_filter_column: + bool_arr = ct.is_in_ordered_subgroups(basket_col=expand_filter_column, bool_arr=bool_arr) + + # retrieve & aggregate if needed + if aggregate: + # aggregate by groupby parameters + result_ctable = ct.groupby(groupby_col_list, aggregation_list, bool_arr=bool_arr, + rootdir=tmp_dir) + else: + # direct result from the ctable + column_list = groupby_col_list + [x[0] for x in aggregation_list] + if bool_arr is not None: + result_ctable = bcolz.fromiter(ct[column_list].where(bool_arr), ct[column_list].dtype, sum(bool_arr), + rootdir=tmp_dir, mode='w') + else: + result_ctable = bcolz.fromiter(ct[column_list], ct[column_list].dtype, ct.len, + rootdir=tmp_dir, mode='w') + + # *** clean up temporary files and memory objects + # filter + del bool_arr + + # input + ct.free_cachemem() + ct.clean_tmp_rootdir() + del ct + + # save result to archive + result_ctable.flush() + result_ctable.free_cachemem() + with tarfile.open(buf_file, mode='w') as archive: + archive.add(tmp_dir, arcname=os.path.basename(tmp_dir)) + del result_ctable + + # create message + with open(buf_file, 'r') as file: + # add result to message + msg['data'] = file.read() + + return msg + finally: + # Make sure temp files are removed + rm_file_or_dir(tmp_dir) + rm_file_or_dir(buf_file) class DownloaderNode(WorkerBase): @@ -512,6 +514,8 @@ def movebcolz(self, ticket): open(metadata_filepath, 'w').write(json.dumps(metadata, indent=2)) self.logger.debug("Moving %s %s" % (ready_path, prod_path)) shutil.move(ready_path, prod_path) + # Ensure files are written to disk + os.system('sync') self.logger.debug('Now removing entire ticket %s', ticket_path) shutil.rmtree(ticket_path, ignore_errors=True) else: