Source code for admix.cli._geno

import os
from typing import List, Dict
import admix
import pandas as pd
import dapgen
import numpy as np
import dask.array as da
from tqdm import tqdm
from ._utils import log_params


def _append_to_file(path: str, df: pd.DataFrame):
    """
    Append a new data frame to an existing .pvar file. The index of the new data frame
    is assumed to be exactly the same as the index of the existing data frame.

    Parameters
    ----------
    pvar_path : str
        Path to the .pvar file.
    df : pd.DataFrame
        Data frame to append.

    """
    # if path does not exist, directly write to it
    if not os.path.exists(path):
        df.to_csv(path, sep="\t", float_format="%.8g")
    else:
        df_snp_info = pd.read_csv(path, sep="\t", index_col=0)
        assert df_snp_info.index.equals(
            df.index
        ), f"SNPs in {path} and the provided data frame do not match"
        # no overlap of columns
        overlap_cols = set(df_snp_info.columns) & set(df.columns)
        assert len(overlap_cols) == 0, f"Overlap of columns: {overlap_cols}"
        df_snp_info = pd.merge(df_snp_info, df, left_index=True, right_index=True)
        df_snp_info.to_csv(path, sep="\t", float_format="%.8g")


def grm(
    plink_file: str,
    out_prefix: str,
    subpopu: str = None,
    std_method: str = "std",
    snp_chunk_size: int = 256,
    snp_list: str = None,
) -> None:
    """
    Calculate the GRM for a given PLINK file

    Parameters
    ----------
    plink_file : str
        Path to the pfile
    out_prefix : str
        Prefix of the output files
    subpopu : str
        Path to the subpopulation file
    std_method : str
        Method to standardize the GRM. Currently supported:
        "std" (standardize to have mean 0 and variance 1),
        "allele" (standardize to have mean 0 but no scaling)
    snp_chunk_size : int, optional
        Number of SNPs to read at a time, by default 256
        This can be tuned to reduce memory usage
    snp_list : str, optional
        Path to a file containing a list of SNPs to use. Each line should be a SNP ID.
        Only SNPs in the list will be used for the analysis. By default None

    Returns
    -------
    GRM files: {out_prefix}.[grm.bin | grm.id | grm.n] will be generated
    Weight file: {out_prefix}.weight.tsv will be generated
    """
    assert std_method in ["std", "allele"], f"Unknown std_method: {std_method}"
    log_params("grm", locals())

    geno, df_snp, df_indiv = dapgen.read_plink(plink_file, snp_chunk=snp_chunk_size)
    n_indiv = len(df_indiv)

    if subpopu is not None:
        df_subpopu = pd.read_csv(
            "data/subpopu.txt", delim_whitespace=True, header=None, dtype=str
        )
        df_subpopu.columns = ["FID", "IID", "POPU"]
        df_indiv = pd.merge(df_indiv, df_subpopu, on=["FID", "IID"])
        assert (
            len(df_indiv) == n_indiv
        ), "Individuals in the subpopulation file do not match the PLINK file"

    # filter for SNPs
    snp_subset = np.ones(len(df_snp)).astype(bool)
    if snp_list is not None:
        with open(snp_list, "r") as f:
            filter_snp_list = [line.strip() for line in f]
        n_filter_snp = len(filter_snp_list)
        snp_subset = snp_subset & df_snp.index.isin(filter_snp_list)
        if sum(snp_subset) < n_filter_snp:
            admix.logger.warning(
                f"{n_filter_snp - sum(snp_subset)} SNPs in {snp_list} are not in the dataset"
            )

    admix.logger.info(f"{sum(snp_subset)} SNPs are used for GRM calculation")

    # subset
    df_snp = df_snp.loc[snp_subset, :]
    geno = geno[snp_subset, :]

    # calculate GRM
    if subpopu is None:
        grm = admix.data.grm(geno, subpopu=None, std_method=std_method)
    else:
        grm = admix.data.grm(
            geno, subpopu=df_indiv["POPU"].values, std_method=std_method
        )

    df_id = pd.DataFrame({"0": df_indiv.FID.values, "1": df_indiv.IID.values})

    # write GRM
    admix.tools.gcta.write_grm(
        out_prefix,
        K=grm,
        df_id=df_id,
        n_snps=np.repeat(len(df_snp), len(df_id)),
    )


def append_snp_info(
    pfile: str,
    out: str = None,
    info: List[str] = ["LANC_FREQ", "FREQ"],
):
    """
    Append information to .pvar file. Currently, 3 statistics are supported:
    (1) ancestry-specific frequency (2) ancestry-specific haplotype count (3) total
    allele frequency is supported. Please raise an issue if you need other statistics.

    Parameters
    ----------
    pfile : str
        Path to the .pvar file.
    out : str
        Path to the output file. If specified, the output file will be WRITTEN to
        this path. Otherwise, the output file will be appended to the <pfile>.snp_info
        file.
    info : List[str]
        List of information to append. Currently supported:

        * "LANC_FREQ": ancestry-specific allele frequency of each SNP. For example, for a two-way admixture population, `LANC_FREQ1` indicates the frequency of alternate allele in the first ancestry. `LANC_NHAPLO1` will also be added, indicating the number of haplotypes in the first ancestry.
        * "FREQ": allele frequency of each SNP regardless of ancestry.

    Examples
    --------
    .. code-block:: bash
        
        # toy-admix.snp_info will be created containing LANC_FREQ[1-n_anc], LANC_NHAPLO[1-n_anc], FREQ
        admix append-snp-info \\
            --pfile toy-admix \\
            --out toy-admix.snp_info
         
    """
    log_params("append-snp-info", locals())

    dset = admix.io.read_dataset(pfile)

    df_info: pd.DataFrame = []
    if "LANC_FREQ" in info:
        af = dset.af_per_anc()
        nhaplo = dset.nhaplo_per_anc()

        df_af = pd.DataFrame(
            af,
            columns=[f"LANC_FREQ{i + 1}" for i in range(af.shape[1])],
            index=dset.snp.index,
        )
        df_info.append(df_af)

        df_nhaplo = pd.DataFrame(
            nhaplo,
            columns=[f"LANC_NHAPLO{i + 1}" for i in range(nhaplo.shape[1])],
            index=dset.snp.index,
        )
        df_info.append(df_nhaplo)

    if "FREQ" in info:
        df_freq = dapgen.freq(pfile + ".pgen", memory=8)
        assert np.all(df_freq.ID.values == dset.snp.index)
        df_freq = pd.DataFrame(
            df_freq["ALT_FREQS"].values,
            columns=["FREQ"],
            index=dset.snp.index,
        )
        df_info.append(df_freq)

    df_info = pd.concat(df_info, axis=1)

    if out is None:
        out = pfile + ".snp_info"

    _append_to_file(out, df_info)


def da_sum(mat: da.Array):
    """Summation over a dask array to get individual-level sum

    Parameters
    ----------
    mat : da.Array
        dask array to sum over
    """
    chunks = mat.chunks[0]
    indices = np.insert(np.cumsum(chunks), 0, 0)
    mat_sum = np.zeros(mat.shape[1])
    for i in range(len(indices) - 1):
        start, stop = indices[i], indices[i + 1]
        mat_sum += mat[start:stop, :].sum(axis=0).compute()
    return mat_sum


def calc_pgs(
    plink_path: str,
    weights_path: str,
    out: str,
    weight_col: str = "WEIGHT",
):
    """Calculate PGS from a weight file and a PLINK file.

    Parameters
    ----------
    plink_path : str
        Path to plink files. Format examples:
          * /path/to/chr21.pgen
          * /path/to/genotype/directory
          * /path/to/file_list.txt # file_list.txt contains rows of file names
    weights_path : str
        path to PGS weights, containing CHROM, SNP, REF, ALT, WEIGHT columns
    out : str
        prefix of the output files.
    weight_col : str, optional
        column in 'weights_path' representing the weight, by default "WEIGHT"
    """
    log_params("calc-pgs", locals())
    df_weights = pd.read_csv(weights_path, sep="\t")
    pgen_files = dapgen.parse_plink_path(plink_path)
    if not isinstance(pgen_files, list):
        pgen_files = [pgen_files]
    plink_prefix_list = [pgen.rsplit(".", 1)[0] for pgen in pgen_files]

    df_snp_info: Dict = {"PLINK_SNP": [], "WEIGHTS_SNP": [], "WEIGHTS": []}
    all_partial_pgs = 0.0
    all_lanc = 0.0
    indiv_list = None
    all_n_snp = 0
    for plink_prefix in plink_prefix_list:
        dset = admix.io.read_dataset(plink_prefix)
        assert dset.n_anc == 2, "Only 2-way admixture is currently supported"
        if indiv_list is None:
            indiv_list = dset.indiv.index
        else:
            assert indiv_list.equals(
                dset.indiv.index
            ), f"Indiv list in {plink_prefix} does not match with previous ones."
        dset_idx, wgt_idx, flip = dapgen.align_snp(
            df1=dset.snp[["CHROM", "POS", "REF", "ALT"]], df2=df_weights
        )
        dset_subset = dset[dset_idx]
        admix.logger.info(f"# matched SNPs={len(wgt_idx)} for dset={plink_prefix}.")
        tmp_df_weights = df_weights[[weight_col]].loc[wgt_idx, :] * flip.reshape(-1, 1)
        tmp_df_weights.index = dset_idx

        partial_pgs = admix.data.calc_pgs(
            dset=dset_subset, df_weights=tmp_df_weights, method="partial"
        )

        df_snp_info["PLINK_SNP"].extend(dset_idx)
        df_snp_info["WEIGHTS_SNP"].extend(wgt_idx)
        df_snp_info["WEIGHTS"].extend(df_weights.loc[wgt_idx, weight_col].values * flip)
        all_partial_pgs += partial_pgs
        all_lanc += da_sum(dset_subset.lanc.sum(axis=2))
        all_n_snp += dset_subset.n_snp

    pd.DataFrame(df_snp_info).to_csv(out + ".snp_info.tsv", sep="\t", index=False)
    pd.DataFrame(
        {
            "indiv": dset.indiv.index,
            "PGS1": all_partial_pgs[:, 0],
            "PGS2": all_partial_pgs[:, 1],
            "PROP1": 1 - all_lanc / (all_n_snp * 2),
            "PROP2": all_lanc / (all_n_snp * 2),
        }
    ).to_csv(out + ".pgs.tsv", sep="\t", index=False)
    admix.logger.info(f"SNP information saved to {out}.snp_info.tsv")
    admix.logger.info(f"PGS saved to {out}.pgs.tsv")


[docs]def calc_partial_pgs( plink_path: str, weights_path: str, out: str, ref_plink_path: str = None, ref_pops: List[str] = None, weight_col: str = "WEIGHT", ref_pop_col: str = "Population", dset_build: str = None, weights_build: str = None, ): """Calculate PGS from a weight file and a PLINK file. Parameters ---------- plink_path : str Path to plink files. Some examples are: * :code:`/path/to/chr21.pgen` * :code:`/path/to/genotype/directory` * :code:`/path/to/file_list.txt` containing rows of file names weights_path : str path to PGS weights, containing :code:`CHROM`, :code:`SNP`, :code:`REF`, :code:`ALT`, :code:`WEIGHT` columns out : str prefix of the output files. :code:`{out}.sample_pgs.tsv` and :code:`{out}.ref_pgs.tsv` will be written to disk. ref_plink_path : str path to reference plink files. The :code:`ref_plink_path` should be a single plink2 file. ref_pops: list of str list of populations in reference plink files. For example, "CEU,YRI" weight_col : str, optional column in 'weights_path' representing the weight, by default "WEIGHT" dset_build: str, optional genome build transform for the dataset ("hg19->hg38" or "hg38->hg19"), by default None weights_build: str, optional genome build transform for the weights ("hg19->hg38" or "hg38->hg19"), by default None """ log_params("calc-partial-pgs", locals()) CHECK_COLS = ["CHROM", "POS", "REF", "ALT"] # read input data & basic checks pgen_files = dapgen.parse_plink_path(plink_path) if not isinstance(pgen_files, list): pgen_files = [pgen_files] plink_prefix_list = [pgen.rsplit(".", 1)[0] for pgen in pgen_files] # scoring weights df_weights = pd.read_csv(weights_path, sep="\t") df_weights = df_weights[CHECK_COLS + [weight_col]].copy() if weights_build is not None: df_weights["POS"] = admix.tools.liftover.run( df_weights[["CHROM", "POS"]], chain=weights_build ) df_weights = df_weights[df_weights.POS != -1].copy() assert (ref_plink_path is None) == ( ref_pops is None ), "Either both or none of ref_plink_path and ref_pops should be provided." # reference data CALC_REF = ref_plink_path is not None if CALC_REF: assert ref_plink_path.endswith(".pgen") or ref_plink_path.endswith( ".bed" ), "Reference plink file should be a single .pgen or .bed file." # remove .pgen or .bed from the file name ref_plink_path = ref_plink_path.rsplit(".", 1)[0] admix.logger.info(f"Reading reference plink file from {ref_plink_path}") dset_ref = admix.io.read_dataset(ref_plink_path) ref_pop_indiv: Dict = { pop: dset_ref.indiv.index[dset_ref.indiv[ref_pop_col] == pop].values for pop in ref_pops } admix.logger.info( f"Reading reference data with " + ", ".join( [f"{pop} #indiv={len(ref_pop_indiv[pop])}" for pop in ref_pop_indiv] ) ) total_sample_pgs: pd.DataFrame = 0 if CALC_REF: total_ref_pgs: Dict[str, pd.DataFrame] = {pop: 0 for pop in ref_pops} # iterate through each plink file for i, plink_prefix in enumerate(plink_prefix_list): admix.logger.info( f"Scoring for dataset {i + 1}/{len(plink_prefix_list)}: {plink_prefix}" ) _dset = admix.io.read_dataset(plink_prefix) if dset_build is not None: _dset.snp["POS"] = admix.tools.liftover.run( _dset.snp[["CHROM", "POS"]], chain=dset_build ) _n_total_snp = _dset.n_snp assert _dset.n_anc == 2, "Only 2-way admixture is currently supported" # align sample dset and weights idx1, idx2, flip = dapgen.align_snp( df1=_dset.snp[CHECK_COLS], df2=df_weights[CHECK_COLS] ) _dset = _dset[idx1] _df_weights = df_weights.loc[idx2, :].copy() _df_weights.index = idx1 if CALC_REF: # align sample dset and reference dset idx1, idx2, flip = dapgen.align_snp( df1=_dset.snp[CHECK_COLS], df2=dset_ref.snp[CHECK_COLS], ) _dset = _dset[idx1] _df_weights = _df_weights.loc[idx1, :] # original code: _dset_ref = dset_ref[idx2] # directly call admix.Dataset for potential unsorted scenarios: _dset_ref = admix.Dataset( dset_ref=dset_ref, snp_idx=dset_ref.snp.index.get_indexer(idx2), indiv_idx=slice(None), enforce_order=False, ) admix.logger.info(f"matched #SNPs={_dset.n_snp}/{_n_total_snp}") if CALC_REF: sample_pgs, ref_pgs = admix.data.calc_partial_pgs( dset=_dset, df_weights=_df_weights, dset_ref=_dset_ref, ref_pop_indiv=[ref_pop_indiv[pop] for pop in ref_pops], ) total_sample_pgs += sample_pgs for i, pop in enumerate(ref_pops): total_ref_pgs[pop] += ref_pgs[i] else: sample_pgs = admix.data.calc_partial_pgs(dset=_dset, df_weights=_df_weights) total_sample_pgs += sample_pgs total_sample_pgs.to_csv(out + ".sample_pgs.tsv", sep="\t") admix.logger.info(f"Sample PGS saved to {out}.sample_pgs.tsv") if CALC_REF: for pop in ref_pops: total_ref_pgs[pop].to_csv(out + f".ref_pgs_{pop}.tsv", sep="\t") admix.logger.info(f"Reference PGS saved to {out}.ref_pgs_{pop}.tsv")