diff --git a/examples/genomicsdb_common.py b/examples/genomicsdb_common.py index 803e145..cdc5ea8 100644 --- a/examples/genomicsdb_common.py +++ b/examples/genomicsdb_common.py @@ -25,8 +25,8 @@ # import json +import logging import os -import re import sys import genomicsdb @@ -53,37 +53,49 @@ def parse_vidmap_json(vidmap_file, intervals=None): all_intervals = not intervals if not intervals: intervals = [] + column_offset = 0 for contig in contigs: if isinstance(contig, str): # Old style vidmap json contig_name = contig - contigs_map[contig] = { + contig_elem = { "length": contigs[contig]["length"], "tiledb_column_offset": contigs[contig]["tiledb_column_offset"], } else: # Generated with vcf2genomicsdb_init contig_name = contig["name"] - contigs_map[contig["name"]] = contig + contig_elem = contig + # Sanity check of column offsets + if column_offset != contig_elem["tiledb_column_offset"]: + logging.critical( + f"vidmap file({vidmap_file}) has wrong column offset for {contig_name}. Cannot process vidmap file from contigs({contig_name})" # noqa + ) + break + contigs_map[contig_name] = contig_elem + column_offset += contig_elem["length"] if all_intervals: intervals.append(contig_name) return contigs_map, intervals -interval_pattern = re.compile(r"(.*):(.*)-(.*)|(.*):(.*)|(.*)") +def parse_interval(interval: str): + results = interval.split(":") + contig = results[0] + if len(results) > 1: + results = results[1].split("-") + if len(results) > 1: + return contig, int(results[0]), int(results[1]) + else: + return contig, int(results[0]), None + else: + return contig, 1, None -# get tiledb offsets for interval -def parse_interval(interval: str): - result = re.match(interval_pattern, interval) - if result: - length = len(result.groups()) - if length == 6: - if result.group(1) and result.group(2) and result.group(3): - return result.group(1), int(result.group(2)), int(result.group(3)) - elif result.group(4) and result.group(5): - return result.group(4), int(result.group(5)), None - elif result.group(6): - return result.group(6), 1, None - raise RuntimeError(f"Interval {interval} could not be parsed") +def get_partitions(intervals, contigs_map, partitions): + arrays = [] + for interval in intervals: + _, _, _, array_names = get_arrays(interval, contigs_map, partitions) + arrays.extend([name.replace("$", ":", 1).replace("$", "-", 1) for name in array_names]) + return arrays def get_arrays(interval, contigs_map, partitions): @@ -97,11 +109,12 @@ def get_arrays(interval, contigs_map, partitions): end = length contig_end = contigs_map[contig]["tiledb_column_offset"] + length - 1 else: - print(f"Contig({contig}) not found in vidmap.json") + logging.error(f"Contig({contig}) not found in vidmap.json") + return 0, 0, 0, [] arrays = [] for idx, partition in enumerate(partitions): - if isinstance(partition["begin"], int): # Old style vidmap json + if isinstance(partition["begin"], int): # Old style loader json column_begin = partition["begin"] if "end" in partition.keys(): column_end = partition["end"] diff --git a/examples/genomicsdb_query b/examples/genomicsdb_query index 39afb68..f952d55 100755 --- a/examples/genomicsdb_query +++ b/examples/genomicsdb_query @@ -96,13 +96,32 @@ def parse_callset_json_for_row_ranges(callset_file, samples=None): return row_tuples -def parse_loader_json(loader_file): +def parse_vidmap_json_for_attributes(vidmap_file, attributes=None): + if attributes is None: + return ["GT"] + else: + vidmap = json.loads(genomicsdb.read_entire_file(vidmap_file)) + fields = vidmap["fields"] + if isinstance(fields, list): + fields = [field["name"] for field in fields] + else: # Old style vidmap json + fields = fields.keys() + if len(attributes) == 0: + return fields + attributes = attributes.replace(" ", "").split(",") + not_found = [attribute for attribute in attributes if attribute not in fields] + if len(not_found) > 0: + raise RuntimeError(f"Attributes({not_found}) not found in vid mapping({vidmap_file})") + return attributes + + +def parse_loader_json(loader_file, interval_form=True): loader = json.loads(genomicsdb.read_entire_file(loader_file)) partitions = loader["column_partitions"] array_names = [ partition["array_name"] if "array_name" in partition else partition["array"] for partition in partitions ] - return [name.replace("$", ":", 1).replace("$", "-", 1) for name in array_names] + return [name.replace("$", ":", 1).replace("$", "-", 1) if interval_form else name for name in array_names] def check_nproc(value): @@ -119,7 +138,7 @@ def check_nproc(value): def setup(): parser = argparse.ArgumentParser( prog="query", - description="GenomicsDB simple query with samples/intervals/filter as inputs", + description="GenomicsDB simple query with samples/intervals/attributes/filter as inputs", formatter_class=argparse.RawTextHelpFormatter, usage="%(prog)s [options]", ) @@ -153,12 +172,17 @@ def setup(): ) parser.add_argument("--list-samples", action="store_true", help="List samples ingested into the workspace and exit") parser.add_argument( - "--list-contigs", action="store_true", help="List contigs for the ingested samples in the workspace and exit" + "--list-contigs", action="store_true", help="List contigs configured in vid mapping for the workspace and exit" + ) + parser.add_argument( + "--list-fields", + action="store_true", + help="List genomic fields configured in vid mapping for the workspace and exit", ) parser.add_argument( "--list-partitions", action="store_true", - help="List interval partitions(genomicsdb arrays in the workspace) for the ingested samples in the workspace and exit", # noqa + help="List interval partitions(genomicsdb arrays in the workspace) for the given intervals(-i/--interval or -I/--interval-list) or all the intervals for the workspace and exit", # noqa ) parser.add_argument( "-i", @@ -186,6 +210,12 @@ def setup(): required=False, help="sample file containing list of samples, one sample per line, to operate upon. \nNote: \n\t1. -s/--sample and -S/--sample-list are mutually exclusive \n\t2. either samples and/or intervals using -i/-I/-s/-S options has to be specified", # noqa ) + parser.add_argument( + "-a", + "--attributes", + required=False, + help="Optional - comma separated list of genomic attributes or fields described in the vid mapping for the query, eg. GT,AC,PL,DP... Defaults to GT", # noqa + ) parser.add_argument( "-f", "--filter", @@ -247,40 +277,57 @@ def setup(): ): raise RuntimeError(f"callset({callset_file}) vidmap({vidmap_file}) or loader({loader_file}) not found") - if args.list_contigs: - _, intervals = genomicsdb_common.parse_vidmap_json(vidmap_file) - print(*intervals, sep="\n") - sys.exit(0) - + # List samples if args.list_samples: samples = parse_callset_json(callset_file) print(*samples, sep="\n") sys.exit(0) - if args.list_partitions: - contigs_map, _ = genomicsdb_common.parse_vidmap_json(vidmap_file) - # parse loader.json for partitions - partitions = parse_loader_json(loader_file) - print(*partitions, sep="\n") + # List fields + if args.list_fields: + fields = parse_vidmap_json_for_attributes(vidmap_file, attributes="") + print(*fields, sep="\n") sys.exit(0) intervals = args.interval interval_list = args.interval_list samples = args.sample sample_list = args.sample_list - if not intervals and not samples and not interval_list and not sample_list: - raise RuntimeError( - "Specify at least one of either -i/-interval -I/--interval-list -s/--sample -S/--sample-list has to be specified" # noqa - ) + if not args.list_partitions and not args.list_contigs: + if not intervals and not samples and not interval_list and not sample_list: + raise RuntimeError( + "one of either -i/-interval -I/--interval-list -s/--sample -S/--sample-list has to be specified" # noqa + ) contigs_map, intervals = genomicsdb_common.parse_vidmap_json(vidmap_file, intervals or interval_list) - row_tuples = parse_callset_json_for_row_ranges(callset_file, samples or sample_list) - # parse loader.json for partitions + # List contigs + if args.list_contigs: + if intervals: + for interval in intervals: + if interval.split(":")[0] in contigs_map.keys(): + print(interval) + else: + print(*contigs_map.keys(), sep="\n") + sys.exit(0) + loader = json.loads(genomicsdb.read_entire_file(loader_file)) partitions = loader["column_partitions"] - return workspace, callset_file, vidmap_file, partitions, contigs_map, intervals, row_tuples, args + # List partitions + if args.list_partitions: + if args.interval or args.interval_list: + partition_names = genomicsdb_common.get_partitions(intervals, contigs_map, partitions) + else: + # just parse loader.json for partitions + partition_names = parse_loader_json(loader_file) + print(*partition_names, sep="\n") + sys.exit(0) + + row_tuples = parse_callset_json_for_row_ranges(callset_file, samples or sample_list) + attributes = parse_vidmap_json_for_attributes(vidmap_file, args.attributes) + + return workspace, callset_file, vidmap_file, partitions, contigs_map, intervals, row_tuples, attributes, args def generate_output_filename(output, output_type, interval, idx): @@ -320,6 +367,8 @@ class GenomicsDBExportConfig(NamedTuple): workspace: str vidmap_file: str callset_file: str + attributes: str + filter: str class GenomicsDBQueryConfig(NamedTuple): @@ -329,7 +378,6 @@ class GenomicsDBQueryConfig(NamedTuple): end: int array_name: str row_tuples: List[tuple] - filter: str class OutputConfig(NamedTuple): @@ -348,11 +396,14 @@ class Config(NamedTuple): def configure_export(config: GenomicsDBExportConfig): export_config = query_pb.ExportConfiguration() export_config.workspace = config.workspace - export_config.attributes.extend(["REF", "ALT", "GT"]) export_config.vid_mapping_file = config.vidmap_file export_config.callset_mapping_file = config.callset_file export_config.bypass_intersecting_intervals_phase = False export_config.enable_shared_posixfs_optimizations = True + if config.attributes: + export_config.attributes.extend(config.attributes) + if config.filter: + export_config.query_filter = config.filter return export_config @@ -384,49 +435,58 @@ def process(config): msg = f"array({query_config.array_name}) for interval({query_config.interval})" if not genomicsdb.array_exists(export_config.workspace, query_config.array_name): logging.error(msg + f" not imported into workspace({export_config.workspace})") - return + return -1 global gdb try: if gdb: logging.info("Found gdb to process " + msg) + else: + logging.error("Something wrong, gdb seems to be None") + return -1 except NameError: logging.info("Instantiating genomicsdb to process " + msg + "...") gdb = genomicsdb.connect_with_protobuf(configure_export(export_config)) logging.info("Instantiating genomicsdb to process " + msg + " DONE") query_protobuf = configure_query(query_config) - if output_config.type == "csv": - df = gdb.query_variant_calls(query_protobuf=query_protobuf, flatten_intervals=True) - df.to_csv(output_config.filename, index=False) - elif output_config.type == "json": - json_output = gdb.query_variant_calls(query_protobuf=query_protobuf, json_output=output_config.json_type) - with open(output_config.filename, "wb") as f: - f.write(json_output) - elif output_config.type == "arrow": - nbytes = 0 - writer = None - i = 0 - for out in gdb.query_variant_calls(query_protobuf=query_protobuf, arrow_output=True, batching=True): - reader = pa.ipc.open_stream(out) - for batch in reader: - if nbytes > output_config.max_arrow_bytes: - i += 1 - nbytes = 0 - if writer: - writer.close() - writer = None - if not writer: - writer = pq.ParquetWriter(f"{output_config.filename}__{i}.parquet", batch.schema) - nbytes += batch.nbytes - writer.write_batch(batch) - if writer: - writer.close() - writer = None + try: + if output_config.type == "csv": + df = gdb.query_variant_calls(query_protobuf=query_protobuf, flatten_intervals=True) + df.to_csv(output_config.filename, index=False) + elif output_config.type == "json": + json_output = gdb.query_variant_calls(query_protobuf=query_protobuf, json_output=output_config.json_type) + with open(output_config.filename, "wb") as f: + f.write(json_output) + elif output_config.type == "arrow": + nbytes = 0 + writer = None + i = 0 + for out in gdb.query_variant_calls(query_protobuf=query_protobuf, arrow_output=True, batching=True): + reader = pa.ipc.open_stream(out) + for batch in reader: + if nbytes > output_config.max_arrow_bytes: + i += 1 + nbytes = 0 + if writer: + writer.close() + writer = None + if not writer: + writer = pq.ParquetWriter(f"{output_config.filename}__{i}.parquet", batch.schema) + nbytes += batch.nbytes + writer.write_batch(batch) + if writer: + writer.close() + writer = None + except Exception as e: + logging.critical(f"Unexpected exception : {e}") + gdb = None + return -1 logging.info(f"Processed array {query_config.array_name} for interval {query_config.interval}") + return 0 def main(): - workspace, callset_file, vidmap_file, partitions, contigs_map, intervals, row_tuples, args = setup() + workspace, callset_file, vidmap_file, partitions, contigs_map, intervals, row_tuples, attributes, args = setup() if row_tuples is not None and len(row_tuples) == 0: return @@ -445,19 +505,19 @@ def main(): max_arrow_bytes = parse_args_for_max_bytes(args.max_arrow_byte_size) print(f"Using {args.max_arrow_byte_size} number of bytes as hint for writing out parquet files") - export_config = GenomicsDBExportConfig(workspace, vidmap_file, callset_file) + export_config = GenomicsDBExportConfig(workspace, vidmap_file, callset_file, attributes, args.filter) configs = [] for interval in intervals: print(f"Processing interval({interval})...") contig, start, end, arrays = genomicsdb_common.get_arrays(interval, contigs_map, partitions) if len(arrays) == 0: - print(f"No arrays in the workspace matched input interval({interval})") + logging.error(f"No arrays in the workspace matched input interval({interval})") continue print(f"\tArrays:{arrays} under consideration for interval({interval})") for idx, array in enumerate(arrays): - query_config = GenomicsDBQueryConfig(interval, contig, start, end, array, row_tuples, filter) + query_config = GenomicsDBQueryConfig(interval, contig, start, end, array, row_tuples) output_config = OutputConfig( generate_output_filename(output, output_type, interval, idx), output_type, @@ -466,14 +526,27 @@ def main(): ) configs.append(Config(export_config, query_config, output_config)) - if len(configs) == 1: - process(configs[0]) + if len(configs) == 0: + print("Nothing to process!!. Check output for possible errors") + sys.exit(1) + if min(len(configs), args.nproc) == 1: + results = list(map(process, configs)) else: with multiprocessing.Pool(processes=min(len(configs), args.nproc)) as pool: - pool.map(process, configs) + results = list(pool.map(process, configs)) + + msg = "successfully" + for result in results: + if result != 0: + msg = "unsuccessfully for some arrays. Check output for errors" + break - print(f"genomicsdb_query for workspace({workspace}) and intervals({intervals}) completed successfully") + print(f"genomicsdb_query for workspace({workspace}) and intervals({intervals}) completed {msg}") if __name__ == "__main__": - main() + try: + main() + except RuntimeError as e: + logging.error(e) + sys.exit(1) diff --git a/examples/test.sh b/examples/test.sh index a65a2f6..a4a648d 100755 --- a/examples/test.sh +++ b/examples/test.sh @@ -44,6 +44,7 @@ SAMPLE_ARGS="-s HG00096 -s HG00097 -s HG00099" VIDMAP_FILE=vidmap_file.json LOADER_FILE=loader_file.json FILTER='ISHOMREF' +FIELDS="DP,GT" if [[ $(uname) == "Darwin" ]]; then TEMP_DIR=$(mktemp -d -t test-examples) @@ -141,9 +142,12 @@ run_command "genomicsdb_query -w $WORKSPACE -I $TEMP_DIR/contigs.list -s HG00096 run_command "genomicsdb_query -w $WORKSPACE -I $TEMP_DIR/contigs.list -s HG00097 -s HG00100 -s HG00096 -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE -I $TEMP_DIR/contigs.list -s HG00096 -s NON_EXISTENT_SAMPLE -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE -I $TEMP_DIR/contigs.list -s NON_EXISTENT_SAMPLE -o $OUTPUT" +run_command "genomicsdb_query -w $WORKSPACE -I $TEMP_DIR/contigs.list -s HG00096 -s NON_EXISTENT_SAMPLE -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE -I $TEMP_DIR/contigs.list -S $TEMP_DIR/samples.list -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -f $FILTER -o $OUTPUT" +run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -a $FIELDS -o $OUTPUT" +run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -a NON_EXISTENT_FIELD,$FIELDS -o $OUTPUT" 1 rm -f loader.json callset.json vidmap.json run_command "genomicsdb_cache -w $WORKSPACE $INTERVAL_ARGS" @@ -173,16 +177,28 @@ WORKSPACE=$OLDSTYLE_DIR/ws run_command "genomicsdb_query -w $WORKSPACE --list-samples" run_command "genomicsdb_query -w $WORKSPACE --list-contigs" +run_command "genomicsdb_query -w $WORKSPACE --list-fields" run_command "genomicsdb_query -w $WORKSPACE --list-partitions" +run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS --list-partitions" +cmd="genomicsdb_query -w $WORKSPACE -i 1:1-10 --list-partitions" +PARTITION=$($cmd) +if [[ $PARTITION != "t0_1_2" ]]; then + echo "Expected output for $cmd should be t0_1_2, but got $PARTITION" + exit 1 +fi run_command "genomicsdb_query -w $WORKSPACE -s HG00097 -s HG00100 -s HG00096 -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -o $OUTPUT" +run_command "genomicsdb_query -w $WORKSPACE $INTERVAL_ARGS -S $TEMP_DIR/samples.list -a GT -o $OUTPUT" OLDSTYLE_JSONS="-l $OLDSTYLE_DIR/loader.json -c $OLDSTYLE_DIR/callset_t0_1_2.json -v $OLDSTYLE_DIR/vid.json" run_command "genomicsdb_cache -w $WORKSPACE $OLDSTYLE_JSONS $INTERVAL_ARGS" run_command "genomicsdb_query -w $WORKSPACE $OLDSTYLE_JSONS --list-samples" run_command "genomicsdb_query -w $WORKSPACE $OLDSTYLE_JSONS --list-contigs" +run_command "genomicsdb_query -w $WORKSPACE $OLDSTYLE_JSONS --list-fields" run_command "genomicsdb_query -w $WORKSPACE $OLDSTYLE_JSONS --list-partitions" +run_command "genomicsdb_query -w $WORKSPACE $OLDSTYLE_JSONS $INTERVAL_ARGS --list-partitions" run_command "genomicsdb_query -w $WORKSPACE $OLDSTYLE_JSONS -s HG00097 -s HG00100 -s HG00096 -o $OUTPUT" run_command "genomicsdb_query -w $WORKSPACE $OLDSTYLE_JSONS $INTERVAL_ARGS -S $TEMP_DIR/samples.list -o $OUTPUT" +run_command "genomicsdb_query -w $WORKSPACE $OLDSTYLE_JSONS $INTERVAL_ARGS -S $TEMP_DIR/samples.list -a GT -o $OUTPUT" cleanup