Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
gvcf/*
results/*
logs/*
logs/**
example_data/*
.snakemake/
slurm-*.out
Expand Down
44 changes: 36 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The pipeline can be run one of two ways, both from the repo root. **It is recomm
A default SLURM profile is provided under `profiles/slurm/`. Edit `profiles/slurm/config.yaml` to customize sbatch options if needed.
Defaults for account/partition and baseline resources are set in `config.yaml` (`slurm_account`, `slurm_partition`, `default_*`).
SLURM stdout/stderr logs are written to `logs/slurm/` by default.
The provided profile sets `jobs: 200` (Snakemake's max concurrent workflow jobs, equivalent to `-j 200`), which you can override on the command line if needed.

```bash
snakemake --profile profiles/slurm
Expand All @@ -55,7 +56,7 @@ snakemake -j 8

Common options:

- `-j <N>`: number of parallel jobs
- `-j <N>`: number of parallel Snakemake jobs (not per-job threads/CPUs)
- `--rerun-incomplete`: clean up partial outputs
- `--printshellcmds`: show executed commands
- `--notemp`: keep temporary intermediates (see Notes)
Expand All @@ -71,21 +72,48 @@ By default the workflow uses these locations (override in `config.yaml`):
- `results/combined/combined.<contig>.gvcf.gz` : merged gVCF per contig
- `results/split/combined.<contig>.inv` : invariant sites
- `results/split/combined.<contig>.filtered` : filtered sites
- `results/split/combined.<contig>.clean` : clean sites
- `results/split/combined.<contig>.clean.vcf.gz` : clean sites (bgzip-compressed by default)
- `results/split/combined.<contig>.missing.bed` : missing positions
- `results/split/combined.<contig>.filtered.bed` : merged mask bed
- `results/split/combined.<contig>.clean.mask.bed` : merged mask bed
- `results/split/combined.<contig>.coverage.txt` : split coverage validation summary
- `results/split/combined.<contig>.accessible.npz` : boolean accessibility array (union of clean + invariant sites), for scikit-allel statistics
- `results/summary.html` : HTML summary of jobs run, outputs created, and warnings
- Contig remaps (e.g., configured `chr1` remapped to reference contig `1`) are reported under the remap warning and are not repeated in the generic MAF-vs-reference mismatch warnings.

## Testing

Run from the repo root:

```bash
pytest -q
```

- Runs unit/fast tests by default.
- Some integration tests are gated and skipped unless enabled.

To run integration tests too:

```bash
RUN_INTEGRATION=1 pytest -q
```

- Requires the `argprep` conda environment and external tools used by the workflow (e.g., `snakemake`, `bcftools`, `tabix`, `bgzip`, `gatk`, `java`; some tests also need `samtools`).
- These tests take longer and may run parts of the Snakemake workflow.

For a quick SLURM-profile syntax/config sanity check without submitting jobs, use a dry run:

```bash
snakemake --profile profiles/slurm -n
```

## Notes

- If `bgzip_output: true`, the `.inv`, `.filtered`, `.clean`, and `.missing.bed` files will have a `.gz` suffix.
- `bgzip_output` defaults to `true`; by default the `.inv`, `.filtered`, `.clean.vcf`, and `.missing.bed` files are bgzip-compressed (`.gz` suffix).
- All gzipped outputs in this pipeline use bgzip (required for `tabix`).
- `scripts/dropSV.py` removes indels larger than `drop_cutoff` (if set in `config.yaml`).
- `scripts/split.py` supports `--filter-multiallelic` and `--bgzip-output` (toggle via `config.yaml`).
- `scripts/filt_to_bed.py` merges `<prefix>.filtered`, `<prefix>.missing.bed`, and `dropped_indels.bed` into a final mask bed.
- `make_accessibility` builds a per-contig accessibility array from the union of `combined.<contig>.clean` and `combined.<contig>.inv` using the reference `.fai` to size the array. The output is a compressed NumPy archive containing a boolean array named `mask`, intended for scikit-allel statistics.
- `make_accessibility` builds a per-contig accessibility array from the union of `combined.<contig>.clean.vcf.gz` and `combined.<contig>.inv.gz` (default names) using the reference `.fai` to size the array. The output is a compressed NumPy archive containing a boolean array named `mask`, intended for scikit-allel statistics.
- Ploidy is inferred from MAF block structure by default (max non-reference `s` lines per block, typically `1` for pairwise MAFs). You can override with `ploidy` in `config.yaml`.
- Optional: enable `vt_normalize: true` in `config.yaml` to normalize merged gVCFs with `vt normalize` after `SelectVariants`.
- If GenomicsDBImport fails with a buffer-size error, increase `genomicsdb_vcf_buffer_size` and `genomicsdb_segment_size` in `config.yaml` (set them above your longest gVCF line length).
Expand All @@ -104,7 +132,7 @@ By default the workflow uses these locations (override in `config.yaml`):
- New/expanded validation and tests: split coverage checks, filtered‑bed tests, integration tests gated by `RUN_INTEGRATION=1`.
- Example data regenerated via msprime with indels and missing data and AnchorWave‑style MAF formatting.
- `check_split_coverage.py` now reports overlap intervals with file names to aid debugging.
- `filt_to_bed.py` filters masks to the target contig, preventing cross‑contig lines in `combined.<contig>.filtered.bed`.
- `filt_to_bed.py` filters masks to the target contig, preventing cross‑contig lines in `combined.<contig>.clean.mask.bed`.
- SLURM default resources now read `default_*` from `config.yaml` instead of hardcoded profile values.

## Changes since v0.2
Expand All @@ -123,11 +151,11 @@ By default the workflow uses these locations (override in `config.yaml`):

### ARG estimation

Use Nate Pope's SINGER Snakemake [pipeline](https://github.com/nspope/singer-snakemake) with `combined.<contig>.clean` and `combined.<contig>.filtered.bed` as inputs.
Use Nate Pope's SINGER Snakemake [pipeline](https://github.com/nspope/singer-snakemake) with `combined.<contig>.clean.vcf.gz` and `combined.<contig>.clean.mask.bed` as inputs.

### Population genetic statistics

If you use scikit-allel, you can use the `combined.<contig>.clean` VCF and load the accessibility mask like this:
If you use scikit-allel, you can use the `combined.<contig>.clean.vcf.gz` VCF and load the accessibility mask like this:

```python
import numpy as np
Expand Down
162 changes: 104 additions & 58 deletions Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ FILL_GAPS = str(config.get("fill_gaps", "false")).lower()
OUTPUT_JUST_GT = bool(config.get("outputJustGT", False))
DROP_CUTOFF = config.get("drop_cutoff", "")
FILTER_MULTIALLELIC = bool(config.get("filter_multiallelic", False))
BGZIP_OUTPUT = bool(config.get("bgzip_output", False))
BGZIP_OUTPUT = bool(config.get("bgzip_output", True))
GENOMICSDB_VCF_BUFFER_SIZE = int(config.get("genomicsdb_vcf_buffer_size", 1048576))
GENOMICSDB_SEGMENT_SIZE = int(config.get("genomicsdb_segment_size", 1048576))
MAF_TO_GVCF_THREADS = int(config.get("maf_to_gvcf_threads", 2))
Expand All @@ -39,7 +39,6 @@ MERGE_CONTIG_TIME = str(config.get("merge_contig_time", config.get("default_time
DEFAULT_JAVA_MEM_MB = max(256, int(DEFAULT_MEM_MB * 0.9))
VT_NORMALIZE = bool(config.get("vt_normalize", False))
VT_PATH = str(config.get("vt_path", "vt"))
MERGED_GENOTYPER = "selectvariants"

workflow.global_resources["merge_gvcf_jobs"] = int(config.get("merge_gvcf_max_jobs", 4))

Expand Down Expand Up @@ -68,24 +67,49 @@ def _normalize_contig(name: str) -> str:
return name if name else "0"


def _read_maf_contigs() -> set[str]:
def _maf_path_for_sample(sample: str) -> Path:
maf = MAF_DIR / f"{sample}.maf"
maf_gz = MAF_DIR / f"{sample}.maf.gz"
if maf.exists():
return maf
if maf_gz.exists():
return maf_gz
return maf


def _read_maf_contigs(maf_path: Path) -> set[str]:
contigs = set()
maf_files = list(MAF_DIR.glob("*.maf")) + list(MAF_DIR.glob("*.maf.gz"))
for maf in sorted(maf_files):
try:
if maf.name.endswith(".gz"):
handle = gzip.open(maf, "rt", encoding="utf-8")
else:
handle = maf.open("r", encoding="utf-8")
with handle:
for line in handle:
if not line or line.startswith("#"):
continue
parts = line.split()
if parts and parts[0] == "s" and len(parts) >= 2:
contigs.add(parts[1])
except OSError:
continue
try:
if maf_path.name.endswith(".gz"):
handle = gzip.open(maf_path, "rt", encoding="utf-8")
else:
handle = maf_path.open("r", encoding="utf-8")
with handle:
first_src = None
for line in handle:
if not line or line.startswith("#"):
continue
stripped = line.strip()
if not stripped:
if first_src is not None:
contigs.add(first_src)
first_src = None
continue
parts = stripped.split()
if not parts:
continue
if parts[0] == "a":
if first_src is not None:
contigs.add(first_src)
first_src = None
continue
if parts[0] == "s" and len(parts) >= 2 and first_src is None:
# Use first sequence src in each block as the reference-side contig.
first_src = parts[1]
if first_src is not None:
contigs.add(first_src)
except OSError:
return set()
return contigs


Expand Down Expand Up @@ -172,6 +196,23 @@ def _discover_samples():
return sorted(samples)


def _read_maf_contig_sets(samples: list[str]) -> dict[str, set[str]]:
contigs_by_sample: dict[str, set[str]] = {}
for sample in samples:
maf_path = _maf_path_for_sample(sample)
contigs_by_sample[sample] = _read_maf_contigs(maf_path)
return contigs_by_sample


def _contigs_not_in_all_mafs(contigs_by_sample: dict[str, set[str]]) -> list[str]:
if not contigs_by_sample:
return []
all_sets = list(contigs_by_sample.values())
union = set().union(*all_sets) if all_sets else set()
intersection = set.intersection(*all_sets) if all_sets else set()
return sorted(union - intersection)


def _infer_ploidy_from_maf(maf_path: Path, max_blocks: int = 10000) -> int:
opener = gzip.open if maf_path.suffix == ".gz" else open
max_non_ref = 0
Expand Down Expand Up @@ -282,34 +323,14 @@ def _resolve_requested_contigs(
return kept, dropped, remapped


def _read_contigs():
orig_fai = REF_FASTA.with_suffix(REF_FASTA.suffix + ".fai")
if not orig_fai.exists():
raise ValueError(
"Reference FASTA index (.fai) not found. "
"Either run 'samtools faidx' on the reference or set 'contigs' in config.yaml."
)
target_fai = Path(REF_FAI)
if target_fai.exists():
ref_contigs = _read_fai_contigs(target_fai)
else:
ref_contigs = _read_fai_contigs(orig_fai)

if "contigs" in config:
requested = [str(c) for c in config["contigs"]]
kept, dropped, remapped = _resolve_requested_contigs(requested, ref_contigs)
if not kept:
raise ValueError(
"None of the configured contigs are present in reference .fai: "
+ ", ".join(requested[:10])
)
return kept, dropped, requested, remapped
return ref_contigs, [], list(ref_contigs), []


SAMPLES = _discover_samples()
PLOIDY, PLOIDY_SOURCE, PLOIDY_FILE_VALUES, PLOIDY_WARNINGS = _resolve_ploidy(SAMPLES)
CONTIGS, DROPPED_CONTIGS_NOT_IN_REF, REQUESTED_CONTIGS, REMAPPED_CONTIGS = _read_contigs()
MAF_CONTIGS_BY_SAMPLE = _read_maf_contig_sets(SAMPLES)
if MAF_CONTIGS_BY_SAMPLE:
MAF_CONTIG_INTERSECTION = sorted(set.intersection(*MAF_CONTIGS_BY_SAMPLE.values()))
else:
MAF_CONTIG_INTERSECTION = []
CONTIGS_NOT_IN_ALL_MAFS = _contigs_not_in_all_mafs(MAF_CONTIGS_BY_SAMPLE)

GVCF_BASES = [f"{sample}To{REF_BASE}" for sample in SAMPLES]

Expand All @@ -330,7 +351,20 @@ def _active_contig_resolution() -> tuple[list[str], list[str], list[str], list[t
+ ", ".join(requested[:10])
)
return kept, dropped, requested, remapped
return available, [], list(available), []

requested = list(MAF_CONTIG_INTERSECTION)
if not requested:
raise ValueError(
"No contigs are shared across all MAF files. "
"Set explicit 'contigs' in config.yaml to override."
)
kept, dropped, remapped = _resolve_requested_contigs(requested, available)
if not kept:
raise ValueError(
"No shared MAF contigs are present in renamed reference .fai. "
"Set explicit 'contigs' in config.yaml to override."
)
return kept, dropped, requested, remapped


def _active_contigs() -> list[str]:
Expand All @@ -341,7 +375,7 @@ def _all_targets(_wc):
contigs = _active_contigs()
return (
[str(_combined_out(c)) for c in contigs]
+ [str(_split_prefix(c)) + ".filtered.bed" for c in contigs]
+ [str(_split_prefix(c)) + MASK_BED_SUFFIX for c in contigs]
+ [str(_split_prefix(c)) + ".coverage.txt" for c in contigs]
+ [str(_accessibility_out(c)) for c in contigs]
+ [str(RESULTS_DIR / "summary.html")]
Expand Down Expand Up @@ -388,6 +422,8 @@ def _accessibility_out(contig):


SPLIT_SUFFIX = ".gz" if BGZIP_OUTPUT else ""
CLEAN_VCF_SUFFIX = ".clean.vcf" + SPLIT_SUFFIX
MASK_BED_SUFFIX = ".clean.mask.bed"

rule all:
# Final targets: merged gVCFs plus filtered bed masks per contig.
Expand Down Expand Up @@ -460,11 +496,17 @@ def _summary_jobs() -> list[tuple[str, list[str]]]:
[
str(_split_prefix(c)) + suffix
for c in contigs
for suffix in (".inv", ".filtered", ".clean", ".missing.bed")
for suffix in (
".inv" + SPLIT_SUFFIX,
".filtered" + SPLIT_SUFFIX,
CLEAN_VCF_SUFFIX,
".missing.bed" + SPLIT_SUFFIX,
".missing_gt_snp_by_sample.tsv",
)
],
),
("check_split_coverage", [str(_split_prefix(c)) + ".coverage.txt" for c in contigs]),
("mask_bed", [str(_split_prefix(c)) + ".filtered.bed" for c in contigs]),
("mask_bed", [str(_split_prefix(c)) + MASK_BED_SUFFIX for c in contigs]),
("make_accessibility", [str(_accessibility_out(c)) for c in contigs]),
]
)
Expand All @@ -491,8 +533,8 @@ def _summary_temp_paths() -> set[str]:
def _summary_arg_outputs() -> list[str]:
contigs = _active_contigs()
return (
[str(_split_prefix(c)) + ".clean" for c in contigs]
+ [str(_split_prefix(c)) + ".filtered.bed" for c in contigs]
[str(_split_prefix(c)) + CLEAN_VCF_SUFFIX for c in contigs]
+ [str(_split_prefix(c)) + MASK_BED_SUFFIX for c in contigs]
+ [str(_accessibility_out(c)) for c in contigs]
)

Expand All @@ -501,10 +543,11 @@ rule summary_report:
# Write an HTML summary of jobs, outputs, and warnings.
input:
combined=lambda wc: [str(_combined_out(c)) for c in _active_contigs()],
beds=lambda wc: [str(_split_prefix(c)) + ".filtered.bed" for c in _active_contigs()],
beds=lambda wc: [str(_split_prefix(c)) + MASK_BED_SUFFIX for c in _active_contigs()],
invs=lambda wc: [str(_split_prefix(c)) + ".inv" + SPLIT_SUFFIX for c in _active_contigs()],
filts=lambda wc: [str(_split_prefix(c)) + ".filtered" + SPLIT_SUFFIX for c in _active_contigs()],
cleans=lambda wc: [str(_split_prefix(c)) + ".clean" + SPLIT_SUFFIX for c in _active_contigs()],
cleans=lambda wc: [str(_split_prefix(c)) + CLEAN_VCF_SUFFIX for c in _active_contigs()],
missing_gt_stats=lambda wc: [str(_split_prefix(c)) + ".missing_gt_snp_by_sample.tsv" for c in _active_contigs()],
dropped=str(GVCF_DIR / "cleangVCF" / "dropped_indels.bed"),
output:
report=str(RESULTS_DIR / "summary.html"),
Expand All @@ -526,6 +569,7 @@ rule summary_report:
ploidy_source=PLOIDY_SOURCE,
ploidy_file_values=PLOIDY_FILE_VALUES,
ploidy_warnings=PLOIDY_WARNINGS,
contigs_not_in_all_mafs=CONTIGS_NOT_IN_ALL_MAFS,
dropped_contigs_not_in_ref=lambda wc: _active_contig_resolution()[1],
requested_contigs=lambda wc: _active_contig_resolution()[2],
remapped_contigs=lambda wc: _active_contig_resolution()[3],
Expand Down Expand Up @@ -790,8 +834,9 @@ rule split_gvcf:
output:
inv=str(RESULTS_DIR / "split" / ("combined.{contig}.inv" + SPLIT_SUFFIX)),
filt=str(RESULTS_DIR / "split" / ("combined.{contig}.filtered" + SPLIT_SUFFIX)),
clean=str(RESULTS_DIR / "split" / ("combined.{contig}.clean" + SPLIT_SUFFIX)),
clean=str(RESULTS_DIR / "split" / ("combined.{contig}.clean.vcf" + SPLIT_SUFFIX)),
missing=str(RESULTS_DIR / "split" / ("combined.{contig}.missing.bed" + SPLIT_SUFFIX)),
missing_gt_stats=str(RESULTS_DIR / "split" / "combined.{contig}.missing_gt_snp_by_sample.tsv"),
params:
filter_multiallelic=FILTER_MULTIALLELIC,
bgzip_output=BGZIP_OUTPUT,
Expand All @@ -807,6 +852,7 @@ rule split_gvcf:
if [ "{params.bgzip_output}" = "True" ]; then
cmd+=(--bgzip-output)
fi
cmd+=(--missing-gt-stats-out "{output.missing_gt_stats}")
cmd+=("{input.gvcf}")
"${{cmd[@]}}"
"""
Expand All @@ -815,9 +861,9 @@ rule split_gvcf:
rule check_split_coverage:
# Validate that clean + inv + filtered bed sum to contig length.
input:
clean=lambda wc: str(_split_prefix(wc.contig)) + ".clean" + SPLIT_SUFFIX,
clean=lambda wc: str(_split_prefix(wc.contig)) + CLEAN_VCF_SUFFIX,
inv=lambda wc: str(_split_prefix(wc.contig)) + ".inv" + SPLIT_SUFFIX,
bed=lambda wc: str(_split_prefix(wc.contig)) + ".filtered.bed",
bed=lambda wc: str(_split_prefix(wc.contig)) + MASK_BED_SUFFIX,
fai=REF_FAI,
output:
report=str(RESULTS_DIR / "split" / "combined.{contig}.coverage.txt"),
Expand All @@ -839,7 +885,7 @@ rule mask_bed:
missing=lambda wc: str(_split_prefix(wc.contig)) + ".missing.bed" + SPLIT_SUFFIX,
dropped=str(GVCF_DIR / "cleangVCF" / "dropped_indels.bed"),
output:
bed=str(RESULTS_DIR / "split" / "combined.{contig}.filtered.bed"),
bed=str(RESULTS_DIR / "split" / "combined.{contig}.clean.mask.bed"),
params:
prefix=lambda wc: str(_split_prefix(wc.contig)),
shell:
Expand All @@ -854,7 +900,7 @@ rule mask_bed:
rule make_accessibility:
# Build boolean accessibility array from clean + inv VCFs per contig.
input:
clean=lambda wc: str(_split_prefix(wc.contig)) + ".clean" + SPLIT_SUFFIX,
clean=lambda wc: str(_split_prefix(wc.contig)) + CLEAN_VCF_SUFFIX,
inv=lambda wc: str(_split_prefix(wc.contig)) + ".inv" + SPLIT_SUFFIX,
fai=REF_FAI,
output:
Expand Down
Loading