Skip to content
This repository was archived by the owner on Jun 19, 2025. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 75 additions & 71 deletions bqueryd/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,78 +266,80 @@ def handle_work(self, msg):
buf_file_fd, buf_file = tempfile.mkstemp(prefix='tar_')
os.close(buf_file_fd)

args, kwargs = msg.get_args_kwargs()
self.logger.info('doing calc %s' % args)
filename = args[0]
groupby_col_list = args[1]
aggregation_list = args[2]
where_terms_list = args[3]
expand_filter_column = kwargs.get('expand_filter_column')
aggregate = kwargs.get('aggregate', True)

# create rootdir
rootdir = os.path.join(self.data_dir, filename)
if not os.path.exists(rootdir):
raise Exception('Path %s does not exist' % rootdir)

ct = bquery.ctable(rootdir=rootdir, mode='r', auto_cache=True)

# prepare filter
if not where_terms_list:
bool_arr = None
else:
# quickly verify the where_terms_list
if not ct.where_terms_factorization_check(where_terms_list):
# return an empty result because the where terms do not give a result for this ctable
msg['data'] = ''
return msg
# else create the boolean array
bool_arr = ct.where_terms(where_terms_list, cache=True)

# expand filter column check
if expand_filter_column:
bool_arr = ct.is_in_ordered_subgroups(basket_col=expand_filter_column, bool_arr=bool_arr)

# retrieve & aggregate if needed
rm_file_or_dir(tmp_dir)
if aggregate:
# aggregate by groupby parameters
result_ctable = ct.groupby(groupby_col_list, aggregation_list, bool_arr=bool_arr,
rootdir=tmp_dir)
else:
# direct result from the ctable
column_list = groupby_col_list + [x[0] for x in aggregation_list]
if bool_arr is not None:
result_ctable = bcolz.fromiter(ct[column_list].where(bool_arr), ct[column_list].dtype, sum(bool_arr),
rootdir=tmp_dir, mode='w')
try:
args, kwargs = msg.get_args_kwargs()
self.logger.info('doing calc %s' % args)
filename = args[0]
groupby_col_list = args[1]
aggregation_list = args[2]
where_terms_list = args[3]
expand_filter_column = kwargs.get('expand_filter_column')
aggregate = kwargs.get('aggregate', True)

# create rootdir
rootdir = os.path.join(self.data_dir, filename)
if not os.path.exists(rootdir):
raise Exception('Path %s does not exist' % rootdir)

ct = bquery.ctable(rootdir=rootdir, mode='r', auto_cache=True)

# prepare filter
if not where_terms_list:
bool_arr = None
else:
result_ctable = bcolz.fromiter(ct[column_list], ct[column_list].dtype, ct.len,
rootdir=tmp_dir, mode='w')

# *** clean up temporary files and memory objects
# filter
del bool_arr

# input
ct.free_cachemem()
ct.clean_tmp_rootdir()
del ct

# save result to archive
result_ctable.flush()
result_ctable.free_cachemem()
with tarfile.open(buf_file, mode='w') as archive:
archive.add(tmp_dir, arcname=os.path.basename(tmp_dir))
del result_ctable
rm_file_or_dir(tmp_dir)

# create message
with open(buf_file, 'r') as file:
# add result to message
msg['data'] = file.read()
rm_file_or_dir(buf_file)

return msg
# quickly verify the where_terms_list
if not ct.where_terms_factorization_check(where_terms_list):
# return an empty result because the where terms do not give a result for this ctable
msg['data'] = ''
return msg
# else create the boolean array
bool_arr = ct.where_terms(where_terms_list, cache=True)

# expand filter column check
if expand_filter_column:
bool_arr = ct.is_in_ordered_subgroups(basket_col=expand_filter_column, bool_arr=bool_arr)

# retrieve & aggregate if needed
if aggregate:
# aggregate by groupby parameters
result_ctable = ct.groupby(groupby_col_list, aggregation_list, bool_arr=bool_arr,
rootdir=tmp_dir)
else:
# direct result from the ctable
column_list = groupby_col_list + [x[0] for x in aggregation_list]
if bool_arr is not None:
result_ctable = bcolz.fromiter(ct[column_list].where(bool_arr), ct[column_list].dtype, sum(bool_arr),
rootdir=tmp_dir, mode='w')
else:
result_ctable = bcolz.fromiter(ct[column_list], ct[column_list].dtype, ct.len,
rootdir=tmp_dir, mode='w')

# *** clean up temporary files and memory objects
# filter
del bool_arr

# input
ct.free_cachemem()
ct.clean_tmp_rootdir()
del ct

# save result to archive
result_ctable.flush()
result_ctable.free_cachemem()
with tarfile.open(buf_file, mode='w') as archive:
archive.add(tmp_dir, arcname=os.path.basename(tmp_dir))
del result_ctable

# create message
with open(buf_file, 'r') as file:
# add result to message
msg['data'] = file.read()

return msg
finally:
# Make sure temp files are removed
rm_file_or_dir(tmp_dir)
rm_file_or_dir(buf_file)


class DownloaderNode(WorkerBase):
Expand Down Expand Up @@ -512,6 +514,8 @@ def movebcolz(self, ticket):
open(metadata_filepath, 'w').write(json.dumps(metadata, indent=2))
self.logger.debug("Moving %s %s" % (ready_path, prod_path))
shutil.move(ready_path, prod_path)
# Ensure files are written to disk
os.system('sync')
self.logger.debug('Now removing entire ticket %s', ticket_path)
shutil.rmtree(ticket_path, ignore_errors=True)
else:
Expand Down