Last active
July 15, 2025 10:49
-
-
Save Phlya/4b87f9ec1a459aafa2fac4464d697813 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Coverage analysis API for genomic pairs data. | |
| This module provides functions to calculate coverage from pairs files, | |
| supporting various filtering and binning options. | |
| """ | |
| import multiprocessing | |
| from functools import partial | |
| from typing import Iterator, Optional, Tuple, Union | |
| import bioframe | |
| import numpy as np | |
| import pandas as pd | |
| from pairtools.lib import fileio, headerops | |
| pd.set_option("mode.copy_on_write", True) | |
| def read_pairs( | |
| pairs: Union[str, object], chunksize: int, threads: int = 1 | |
| ) -> Tuple[pd.DataFrame, dict]: | |
| """ | |
| Read pairs file and extract chromosome sizes. | |
| Args: | |
| pairs: Path to pairs file or file-like object | |
| chunksize: Number of pairs to process at once | |
| threads: Number of threads for reading | |
| Returns: | |
| Tuple of (pairs DataFrame iterator, chromsizes dict) | |
| """ | |
| pairs_stream = ( | |
| fileio.auto_open( | |
| pairs, | |
| mode="r", | |
| nproc=threads, | |
| ) | |
| if isinstance(pairs, str) | |
| else pairs | |
| ) | |
| header, pairs_body = headerops.get_header(pairs_stream) | |
| cols = headerops.extract_column_names(header) | |
| chromsizes = headerops.extract_chromsizes(header) | |
| pairs_df = pd.read_csv( | |
| pairs_body, | |
| header=None, | |
| names=cols, | |
| chunksize=chunksize, | |
| sep="\t", | |
| dtype={"chrom1": str, "chrom2": str}, | |
| low_memory=False, | |
| ) | |
| return pairs_df, chromsizes | |
| def intervals_to_increments(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Convert genomic intervals to increment/decrement events. | |
| Args: | |
| df: DataFrame with columns 'chrom', 'start', 'end' | |
| Returns: | |
| DataFrame with columns 'chrom', 'pos', 'inc' | |
| """ | |
| inc_df = pd.concat( | |
| [ | |
| df[["chrom", "start"]].rename(columns={"start": "pos"}).eval("inc=1"), | |
| df[["chrom", "end"]].rename(columns={"end": "pos"}).eval("inc=-1"), | |
| ] | |
| ) | |
| return inc_df | |
| def aggregate_increments(inc_df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Aggregate increment events by position. | |
| Args: | |
| inc_df: DataFrame with increment events | |
| Returns: | |
| Aggregated DataFrame sorted by chromosome and position | |
| """ | |
| inc_df = ( | |
| inc_df.groupby(["chrom", "pos"]) | |
| .sum() | |
| .reset_index() | |
| .sort_values(["chrom", "pos"], ignore_index=True) | |
| ) | |
| return inc_df | |
| def coverage_chunk(df_chunk: pd.DataFrame, binsize: int) -> pd.DataFrame: | |
| """ | |
| Calculate coverage for a chunk of intervals. | |
| Args: | |
| df_chunk: DataFrame with genomic intervals | |
| binsize: Size of bins for aggregation | |
| Returns: | |
| DataFrame with coverage counts per bin | |
| """ | |
| count_df = aggregate_increments(intervals_to_increments(df_chunk)) | |
| count_df.insert(2, "count", count_df["inc"].cumsum()) | |
| count_df["bin"] = count_df["pos"] // binsize | |
| if binsize > 1: | |
| count_df = count_df.groupby(["chrom", "bin"], as_index=False).agg( | |
| {"count": "sum"} | |
| ) | |
| count_df["bin"] = count_df["bin"].astype(int) | |
| return count_df | |
| def produce_chunks( | |
| pairs_stream: Iterator[pd.DataFrame], side: int, end: int | |
| ) -> Iterator[pd.DataFrame]: | |
| """ | |
| Generate chunks of genomic intervals from pairs stream. | |
| Args: | |
| pairs_stream: Iterator of pairs DataFrames | |
| side: Which side to use (0=both, 1=first, 2=second) | |
| end: Which end to use (0=default, 3=3', 5=5') | |
| Yields: | |
| DataFrames with genomic intervals | |
| """ | |
| if end == 0: | |
| end = "" | |
| for pairs_df in pairs_stream: | |
| if pairs_df.shape[0] == 0: | |
| continue | |
| if side != 0: | |
| pairs_df["start"] = pairs_df[f"pos{end}{side}"] - 1 | |
| pairs_df["end"] = pairs_df[f"pos{end}{side}"] | |
| pairs_df["chrom"] = pairs_df[f"chrom{side}"] | |
| pairs_df["strand"] = pairs_df[f"strand{side}"] | |
| yield pairs_df[["chrom", "start", "end", "strand"]] | |
| else: | |
| for side in [1, 2]: | |
| pairs_df["start"] = pairs_df[f"pos{end}{side}"] - 1 | |
| pairs_df["end"] = pairs_df[f"pos{end}{side}"] | |
| pairs_df["chrom"] = pairs_df[f"chrom{side}"] | |
| pairs_df["strand"] = pairs_df[f"strand{side}"] | |
| yield pairs_df[["chrom", "start", "end", "strand"]] | |
| def process_chunk( | |
| pairs_chunk: pd.DataFrame, shift: int, binsize: int, chromsizes: dict | |
| ) -> pd.DataFrame: | |
| """ | |
| Process a single chunk of pairs data. | |
| Args: | |
| pairs_chunk: DataFrame with pairs data | |
| shift: Shift value for strand-specific adjustment | |
| binsize: Bin size for coverage calculation | |
| chromsizes: Dictionary of chromosome sizes | |
| Returns: | |
| DataFrame with coverage data | |
| """ | |
| shift_values = np.where(pairs_chunk["strand"] == "+", shift, -shift) | |
| pairs_chunk[["start", "end"]] = pairs_chunk[["start", "end"]].add( | |
| shift_values, axis=0 | |
| ) | |
| pairs_chunk = pairs_chunk[ | |
| (pairs_chunk["start"] >= 0) | |
| & (pairs_chunk["end"] <= (pairs_chunk["chrom"].map(chromsizes))) | |
| ] | |
| pairs_chunk = pairs_chunk[["chrom", "start", "end"]] | |
| pairs_chunk.sort_values(["chrom", "start", "end"], inplace=True) | |
| pairs_chunk.reset_index(drop=True, inplace=True) | |
| return coverage_chunk(pairs_chunk, binsize) | |
| def calculate_coverage( | |
| pairs_chunks: Iterator[pd.DataFrame], | |
| chromsizes: Union[dict, pd.Series], | |
| side: int = 1, | |
| end: int = 0, | |
| shift: int = 0, | |
| window_size: int = 10, | |
| threads: int = 1, | |
| ) -> pd.DataFrame: | |
| """ | |
| Calculate coverage from pairs chunks. | |
| Args: | |
| pairs_chunks: Iterator of DataFrames with pairs data | |
| chromsizes: Dictionary or pandas Series with chromosome sizes | |
| side: Which side to use (0=both, 1=first, 2=second) | |
| end: Which end to use (0=default, 3=3', 5=5') | |
| shift: Shift value for strand-specific adjustment | |
| window_size: Window size for binning | |
| threads: Number of threads to use | |
| Returns: | |
| DataFrame with coverage data | |
| """ | |
| chunks = produce_chunks(pairs_chunks, side, end) | |
| func = partial( | |
| process_chunk, shift=shift, binsize=window_size, chromsizes=chromsizes | |
| ) | |
| if threads > 1: | |
| with multiprocessing.Pool(threads) as pool: | |
| coverage_chunks = pool.map(func, chunks) | |
| else: | |
| coverage_chunks = map(func, chunks) | |
| coverage_df = ( | |
| pd.concat(coverage_chunks, ignore_index=True) | |
| .groupby(["chrom", "bin"], as_index=False) | |
| .agg({"count": "sum"}) | |
| .reset_index(drop=True) | |
| ) | |
| coverage_df["start"] = (coverage_df["bin"] * window_size).astype(int) | |
| coverage_df["end"] = coverage_df["start"] + window_size | |
| coverage_df["end"] = np.where( | |
| coverage_df["end"] <= coverage_df["chrom"].map(chromsizes), | |
| coverage_df["end"], | |
| coverage_df["chrom"].map(chromsizes), | |
| ) | |
| # Filter out empty intervals | |
| coverage_df = coverage_df[coverage_df["end"] - coverage_df["start"] > 0] | |
| coverage_df = coverage_df[["chrom", "start", "end", "count"]] | |
| return coverage_df | |
| def save_coverage( | |
| coverage_df: pd.DataFrame, | |
| output_file: str, | |
| chromsizes: dict, | |
| output_bigwig: Optional[str] = None, | |
| ) -> None: | |
| """ | |
| Save coverage data to file(s). | |
| Args: | |
| coverage_df: DataFrame with coverage data | |
| output_file: Path to output tab-separated file | |
| chromsizes: Dictionary of chromosome sizes | |
| output_bigwig: Optional path to output BigWig file | |
| """ | |
| coverage_df.to_csv(output_file, sep="\t", index=False, header=False) | |
| if output_bigwig is not None: | |
| bioframe.to_bigwig( | |
| coverage_df[["chrom", "start", "end", "count"]], chromsizes, output_bigwig | |
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Command-line interface for coverage analysis. | |
| This module provides a Click-based CLI that wraps the coverage_api module. | |
| """ | |
| import click | |
| import bioframe | |
| from coverage_api import calculate_coverage, read_pairs, save_coverage | |
| UTIL_NAME = "pairtools_coverage" | |
| @click.command() | |
| @click.argument("pairs_path", type=str, required=False) | |
| @click.option( | |
| "-o", | |
| "--output", | |
| type=str, | |
| default="", | |
| help="output file." | |
| " If the path ends with .gz or .lz4, the output is bgzip-/lz4c-compressed." | |
| " By default, the output is printed into stdout.", | |
| ) | |
| @click.option( | |
| "--side", | |
| "-s", | |
| type=click.Choice(["0", "1", "2"]), | |
| default="1", | |
| show_default=True, | |
| help="0: both sides, 1: first side, 2: second side", | |
| ) | |
| @click.option( | |
| "--end", | |
| type=click.Choice(["5", "3", "0"]), | |
| default="0", | |
| show_default=True, | |
| help="5' end, 3' end, or whatever is reported as pos1 and pos2 in pairs", | |
| ) | |
| @click.option( | |
| "--shift", | |
| type=int, | |
| default=0, | |
| show_default=True, | |
| help="Shift value for strand-specific adjustment", | |
| ) | |
| @click.option( | |
| "--window-size", | |
| type=int, | |
| default=100, | |
| show_default=True, | |
| help="Window size for binning", | |
| ) | |
| @click.option( | |
| "--chunksize", | |
| type=int, | |
| default=1000000, | |
| show_default=True, | |
| help="Number of pairs to process at once", | |
| ) | |
| @click.option( | |
| "--threads", | |
| "-t", | |
| type=int, | |
| default=1, | |
| show_default=True, | |
| help="Number of threads", | |
| ) | |
| @click.option( | |
| "--chromsizes", | |
| type=str, | |
| default=None, | |
| help="Chromosome order used to filter pairs: " | |
| "path to a chromosomes file (e.g. UCSC chrom.sizes or similar) whose " | |
| "first column lists scaffold names. If not provided, will be extracted " | |
| "from the header of the input pairs file.", | |
| ) | |
| @click.option( | |
| "--output-bigwig", | |
| type=str, | |
| default=None, | |
| help="Output BigWig file (optional)", | |
| ) | |
| def coverage_py( | |
| pairs_path, | |
| output, | |
| side, | |
| end, | |
| shift, | |
| window_size, | |
| chunksize, | |
| threads, | |
| chromsizes, | |
| output_bigwig, | |
| ): | |
| """Generate coverage from pairs file. | |
| PAIRS_PATH : input .pairs/.pairsam file. If the path ends with .gz or .lz4, the | |
| input is decompressed by bgzip/lz4c. By default, the input is read from stdin. | |
| """ | |
| # Handle chromsizes - either from file or extract from pairs header | |
| if chromsizes is not None: | |
| chromsizes_dict = bioframe.read_chromsizes( | |
| chromsizes, filter_chroms=False, natsort=False | |
| ) | |
| pairs_chunks, _ = read_pairs( | |
| pairs_path if pairs_path else "-", chunksize, threads | |
| ) | |
| else: | |
| # Read pairs and get chromsizes from header | |
| pairs_chunks, chromsizes_dict = read_pairs( | |
| pairs_path if pairs_path else "-", chunksize, threads | |
| ) | |
| # Calculate coverage using the API | |
| coverage_df = calculate_coverage( | |
| pairs_chunks=pairs_chunks, | |
| chromsizes=chromsizes_dict, | |
| side=int(side), | |
| end=int(end), | |
| shift=shift, | |
| window_size=window_size, | |
| threads=threads, | |
| ) | |
| # Save the results | |
| save_coverage( | |
| coverage_df, output if output else "-", chromsizes_dict, output_bigwig | |
| ) | |
| if __name__ == "__main__": | |
| coverage_py() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment