Source code for admix.tools.plink2

import dapgen
import pandas as pd
import admix
import numpy as np

import dapgen
import pandas as pd
import admix
import numpy as np
from typing import List
import os
import glob
import subprocess
from ._utils import get_dependency


def run(cmd: str, **kwargs):
    """Shortcut for running plink commands

    Parameters
    ----------
    cmd : str
        plink command
    """
    bin_path = get_dependency("plink2")
    add_cmds = [f" --{k.replace('_', '-')} {kwargs[k]}" for k in kwargs]
    cmd += " ".join(add_cmds)
    subprocess.check_call(f"{bin_path} {cmd}", shell=True)


def gwas(
    df_sample_info: pd.DataFrame,
    pheno_col: str,
    out_prefix: str,
    pfile: str = None,
    bfile: str = None,
    family: str = "linear",
    covar_cols: List[str] = None,
    cat_cols: List[str] = None,
    pheno_quantile_normalize=False,
    covar_quantile_normalize=False,
    clean_tmp_file=False,
    **kwargs,
):
    assert family in ["linear", "logistic"], "family must be linear or logistic"
    # only one of pfile or bfile must be provided
    assert (pfile is None) != (bfile is None), "pfile or bfile must be provided"

    # clean up phenotype
    non_nan_index = ~np.isnan(df_sample_info[pheno_col])
    df_pheno = df_sample_info.loc[non_nan_index, :].rename(columns={pheno_col: "trait"})
    pheno_vals = df_pheno.trait.values
    if family == "logistic":
        assert (
            pheno_quantile_normalize == False
        ), "quantile normalization can not be used for logistic regression"
        assert np.all(
            (pheno_vals == 0) | (pheno_vals == 1)
        ), "phenotype values must be 0 or 1 for logistic regression"
        df_pheno["trait"] = df_pheno.trait.astype(int) + 1
    elif family == "linear":
        df_pheno["trait"] = (pheno_vals - pheno_vals.mean()) / pheno_vals.std()
    else:
        raise ValueError("family must be linear or logistic")
    pheno_path = out_prefix + f".plink2_tmp_pheno"

    if bfile is not None:
        # PLINK1 format
        # FID, IID must be in the column
        assert "FID" in df_sample_info.columns
        assert "IID" in df_sample_info.columns
        df_pheno[["FID", "IID", "trait"]].to_csv(
            pheno_path, sep="\t", index=False, na_rep="NA"
        )
        cmds = [f"--bfile {bfile}"]
    elif pfile is not None:
        # PLINK2 format
        df_pheno.index.name = "#IID"
        df_pheno[["trait"]].to_csv(pheno_path, sep="\t", na_rep="NA")
        cmds = [f"--pfile {pfile}"]

    cmds.extend(
        [
            f"--pheno {pheno_path} --no-psam-pheno",
            f"--out {out_prefix}",
        ]
    )
    if family == "linear":
        cmds.append("--linear hide-covar omit-ref")
    elif family == "logistic":
        cmds.append("--logistic hide-covar omit-ref no-firth")
    else:
        raise ValueError("family must be linear or logistic")

    if covar_cols is not None:
        covar_path = out_prefix + f".plink2_tmp_covar"
        cmds.append(f"--covar {covar_path}")
        if bfile is not None:
            # PLINK1 format
            # FID, IID must be in the column
            assert "FID" in df_sample_info.columns
            assert "IID" in df_sample_info.columns
            df_covar = df_sample_info.loc[
                non_nan_index, ["FID", "IID"] + covar_cols
            ].copy()
            df_covar.to_csv(covar_path, sep="\t", index=False, na_rep="NA")
        elif pfile is not None:
            # PLINK2 format
            df_covar = df_sample_info.loc[non_nan_index, covar_cols].copy()
            df_covar.index.name = "#IID"
            df_covar.to_csv(covar_path, sep="\t", na_rep="NA")

    else:
        cmds[-1] += " allow-no-covars"

    if pheno_quantile_normalize:
        cmds.append("--pheno-quantile-normalize")
    if covar_quantile_normalize:
        cmds.append("--covar-quantile-normalize")
    if cat_cols:
        cmds.append(f"--split-cat-pheno omit-most {' '.join(cat_cols)}")

    print("\n".join(cmds))

    run(" ".join(cmds), **kwargs)

    if family == "linear":
        os.rename(out_prefix + ".trait.glm.linear", out_prefix + ".assoc")
    else:
        os.rename(out_prefix + ".trait.glm.logistic", out_prefix + ".assoc")

    if clean_tmp_file:
        for f in glob.glob(out_prefix + ".plink2_tmp_*"):
            os.remove(f)


def clump(
    pfile,
    assoc_path: str,
    out_prefix: str,
    p1: float = 5e-8,
    p2: float = 1e-4,
    r2: float = 0.1,
    kb=3000,
    **kwargs,
):
    """
    Wrapper for plink2 clump
    For now, first need to export to .bed format then perform the clump
    """
    tmp_prefix = out_prefix + ".admix_plink2_clump_tmp"
    # convert to bed
    admix.tools.plink2.run(f"--pfile {pfile} --make-bed --out {tmp_prefix}")

    # perform clump
    admix.tools.plink.clump(
        bfile=tmp_prefix,
        assoc_path=assoc_path,
        out_prefix=out_prefix,
        p1=p1,
        p2=p2,
        r2=r2,
        kb=kb,
        **kwargs,
    )
    # # convert plink2 association to plink1 format ID -> SNP
    # import shutil

    # from_file = open(assoc_path)
    # to_file = open(tmp_prefix + ".assoc", "w")
    # to_file.writelines(from_file.readline().replace("ID", "SNP"))
    # shutil.copyfileobj(from_file, to_file)
    # from_file.close()
    # to_file.close()
    # cmds = [
    #     f"--bfile {tmp_prefix} --clump {tmp_prefix + '.assoc'}",
    #     f"--clump-p1 {p1} --clump-p2 {p2} --clump-r2 {r2} --clump-kb {kb}",
    #     f"--out {tmp_prefix}",
    # ]

    # admix.tools.plink.run(" ".join(cmds))
    # if os.path.exists(tmp_prefix + ".clumped"):
    #     os.rename(tmp_prefix + ".clumped", out_prefix + ".clumped")
    # else:
    #     # no clumped region
    #     # write a comment to the output file
    #     with open(out_prefix + ".clumped", "w") as file:
    #         file.write("# No clumped region")

    for f in glob.glob(tmp_prefix + "*"):
        os.remove(f)


[docs]def lift_over(pfile: str, out_prefix: str, chain="hg19->hg38"): """Lift over a plink file to another genome. First, use liftover to convert to another genome, then use plink2 --sort-vars to sort the variants. Parameters ---------- pfile : str Path to plink file out_prefix : str output prefix, <out_prefix>.pgen, <out_prefix>.psam, <out_prefix>.pvar will be generated chain : str, optional lift over direction, by default "hg19->hg38" """ assert chain in ["hg19->hg38", "hg38->hg19"] # read the input pgen pgen, pvar, psam = dapgen.read_pfile(pfile) df_snp = pd.DataFrame( {"CHROM": pvar.CHROM.values, "POS": pvar.POS.values}, index=pvar.index.values ) # perform the liftover df_lifted = admix.tools.liftover.run(df_snp, chain=chain) n_snp1 = len(df_lifted) df_lifted = df_lifted[df_lifted.POS != -1] n_snp2 = len(df_lifted) print( f"remove {n_snp1 - n_snp2} / {n_snp1} SNPs" f" ({(n_snp1 - n_snp2) / n_snp1 * 100:.2g}%)" " for unmapped or ambiguous SNPs" ) # extract the lifted SNPs np.savetxt(f"{out_prefix}-tmp.snp", df_lifted.index.values, fmt="%s") admix.tools.plink2.run( f"--pfile {pfile} --extract {out_prefix}-tmp.snp --sort-vars --make-pgen --out {out_prefix}-tmp" ) # substitute the coordinates obtained from liftover pgen, pvar, psam = dapgen.read_pfile(out_prefix + "-tmp") assert np.all(pvar.index == df_lifted.index) pvar["POS"] = df_lifted["POS"] pvar.insert(2, "ID", pvar.index) pvar = pvar.reset_index(drop=True).rename(columns={"CHROM": "#CHROM"}) pvar.to_csv(f"{out_prefix}-tmp.pvar", sep="\t", index=False) admix.tools.plink2.run( f"--pfile {out_prefix}-tmp --sort-vars --make-pgen --out {out_prefix}" ) # remove the tmp files for f in glob.glob(out_prefix + "-tmp*"): os.remove(f)
def align_snp(pfile1: str, pfile2: str, out_prefix: str): """Given two plink files from different individuals, take the common set of SNPs. Matching by (1) chromosome (2) position (3) ref allele (4) alt allele This can be useful for software that requires an aligned set of SNPs Parameters ---------- pfile1 : str plink file 1 pfile2 : str plink file 2 out_prefix: str prefix to the output files """ # Step 1: read pvar and find the common SNPs df_snp1 = dapgen.read_pvar(pfile1 + ".pvar").drop_duplicates( subset=["CHROM", "POS"] ) df_snp2 = dapgen.read_pvar(pfile2 + ".pvar").drop_duplicates( subset=["CHROM", "POS"] ) idx1, idx2, flip = dapgen.align_snp(df_snp1, df_snp2) admix.logger.info( f"{len(idx1)} / {len(df_snp1)} ({len(idx1) / len(df_snp1) * 100:.3g}%) SNPs extracted in pfile1: {pfile1}" ) admix.logger.info( f"{len(idx2)} / {len(df_snp2)} ({len(idx2) / len(df_snp2) * 100:.3g}%) SNPs extracted in pfile2: {pfile2}" ) # Step 2: extract the common SNPs from the two files and fix the allele order allele1 = df_snp1.loc[idx1]["REF"] allele2 = df_snp2.loc[idx2]["REF"] allele1.to_csv(out_prefix + "-tmp.1.refallele", sep="\t", header=False) np.savetxt(out_prefix + "-tmp.1.snplist", allele1.index.values, fmt="%s") # use sample ref-allele values, but use ref-allele index pd.Series(allele1.values, index=allele2.index).to_csv( out_prefix + "-tmp.2.refallele", sep="\t", header=False ) np.savetxt(out_prefix + "-tmp.2.snplist", allele2.index.values, fmt="%s") # extract common SNPs and align REF ALT admix.tools.plink2.run( f"--pfile {pfile1} --extract {out_prefix + '-tmp.1.snplist'}" f" --ref-allele {out_prefix + '-tmp.1.refallele'} 2 1 --make-pgen --out {out_prefix}.1" ) admix.tools.plink2.run( f"--pfile {pfile2} --extract {out_prefix + '-tmp.2.snplist'} " f" --ref-allele force {out_prefix + '-tmp.2.refallele'} 2 1 --make-pgen --out {out_prefix}.2" ) # remove the tmp files for f in glob.glob(out_prefix + "-tmp*"): os.remove(f) def merge_indiv(pfile1: str, pfile2: str, out_prefix: str): """Given two plink files from different individuals, take the common set of SNPs and merge them into one data set. This is a wrapper for various PLINK functions. Since PLINK2's pmerge function is not complete now. We use PLINK1 merge functionality instead. A PLINK1 bfile is produced (rather than PLINK2 pfile). This can be useful for example when we want to perform a joint PCA for the two data sets. Parameters ---------- pfile1 : str plink file 1 pfile2 : str plink file 2 out_prefix: str prefix to the output files """ admix.tools.plink2.align_snp( pfile1=pfile1, pfile2=pfile2, out_prefix=out_prefix + "-tmp" ) # Step 3: unify SNP ID admix.tools.plink2.run( f"--pfile {out_prefix}-tmp.1 --set-all-var-ids @:#:\$r:\$a --make-bed --out {out_prefix}-tmp.1.fixid" ) admix.tools.plink2.run( f"--pfile {out_prefix}-tmp.2 --set-all-var-ids @:#:\$r:\$a --make-bed --out {out_prefix}-tmp.2.fixid" ) # Step 4: make final merged bfile admix.tools.plink.run( f"--bfile {out_prefix}-tmp.1.fixid --bmerge {out_prefix}-tmp.2.fixid --make-bed --out {out_prefix}" ) # remove the tmp files for f in glob.glob(out_prefix + "-tmp*"): os.remove(f) def subset( pfile: str, out_prefix: str, snp_list: List = None, indiv_list: List = None, ): cmds = [f"--pfile {pfile}", f"--make-pgen --out {out_prefix}", "--memory 16000"] if snp_list is not None: snplist_path = out_prefix + ".admix-plink2-subset.snplist" np.savetxt(snplist_path, snp_list, fmt="%s") cmds.append(f"--extract {snplist_path}") if indiv_list is not None: indivlist_path = out_prefix + ".admix-plink2-subset.indivlist" np.savetxt(indivlist_path, indiv_list, fmt="%s") cmds.append(f"--keep {indivlist_path}") run(" ".join(cmds)) # clean up for f in glob.glob(out_prefix + ".admix-plink2-subset.*"): os.remove(f) # TODO: check difference with plink2.gwas, consider merging these two functions def assoc( bfile: str, pheno: pd.DataFrame, out_prefix: str, covar: pd.DataFrame = None, indiv: pd.DataFrame = None, snp: pd.DataFrame = None, ): """Run plink association test Parameters ---------- bfile : str plink binary file pheno : pd.DataFrame phenotype data out_prefix : str prefix for output files covar : pd.DataFrame, optional covariate data indiv : pd.DataFrame, optional individual to subset, should contain columns FID and IID snp : pd.DataFrame, optional snp to subset, should contain column SNP """ assert indiv is None and snp is None, "indiv and snp are not supported" pheno_path = out_prefix + ".tmp_pheno" covar_path = out_prefix + ".tmp_covar" pheno.to_csv(pheno_path, sep="\t", index=False) if covar is not None: covar.to_csv(covar_path, sep="\t", index=False) cmd = [ f"--bfile {bfile}", f"--out {out_prefix}", f"--pheno {pheno_path}", ] if covar is not None: cmd.append("--ci 0.95 --glm omit-ref hide-covar") cmd.append(f"--covar {covar_path}") else: cmd.append("--ci 0.95 --glm omit-ref hide-covar allow-no-covars") run(" ".join(cmd)) def prune( pfile: str, out_prefix: str, indep_params: List = None, indep_pairwise_params: List = None, ): """Run plink2 prune For example, indep_params = [200, 5, 1.15], indep-pairwise = [100 5 0.1], then two-step pruning will be performed. Parameters ---------- pfile : str plink binary file indep_params : List list of parameters for indep indep_pairwise_params : List list of parameters for indep-pairwise out_prefix : str prefix for output files """ assert indep_params is None, "indep_params is not supported yet" tmp_prefix = out_prefix + ".admix_plink2_prune_tmp" assert (indep_params is None) != ( indep_pairwise_params is None ), "only one of indep_params and indep_pairwise_params can be specified" # step 1: indep pruning cmd = [ f"--pfile {pfile}", f"--out {tmp_prefix}", ] if indep_params is not None: cmd.append(f"--indep {' '.join([str(p) for p in indep_params])}") if indep_pairwise_params is not None: cmd.append( f"--indep-pairwise {' '.join([str(p) for p in indep_pairwise_params])}" ) run(" ".join(cmd)) cmd = [ f"--pfile {pfile}", f"--extract {tmp_prefix}.prune.in", f"--make-pgen --out {out_prefix}", ] run(" ".join(cmd)) # clean up for f in glob.glob(tmp_prefix + ".*"): os.remove(f) def pca(pfile: str, out_prefix: str, approx: bool = False, **kwargs): """Run plink2 pca TODO: include more options Parameters ---------- pfile : str plink binary file out_prefix : str prefix for output files approx: bool """ cmd = [ f"--pfile {pfile}", f"--out {out_prefix}", ] if approx: cmd.append("--pca approx") else: cmd.append("--pca") run(" ".join(cmd), **kwargs) def subset_hapmap3(pfile: str, build: str, out_prefix: str = None, chrom: int = None): """Overlap a pfile with HapMap3 SNPs Parameters ---------- pfile : str input pfile build: str hg19 or hg38 out_prefix: str output pfile prefix Returns ------- <out_prefix>.{pgen|pvar|psam} will be written to disk if out_prefix is specified otherwise, snp_list will be returned """ assert build in ["hg19", "hg38"], "build must be hg19 or hg38" try: import pyreadr except ImportError: raise ImportError("pyreadr is not installed. Use pip install pyreadr") hm3_snps = pyreadr.read_r(admix.tools.get_cache_data("hapmap3_snps"))[None] if build == "hg19": hm3_snps = hm3_snps[["chr", "pos"]].rename( columns={"chr": "CHROM", "pos": "POS"} ) elif build == "hg38": hm3_snps = hm3_snps[["chr", "pos_hg38"]].rename( columns={"chr": "CHROM", "pos_hg38": "POS"} ) else: raise NotImplementedError TOTAL_HM3_SNPS = hm3_snps.shape[0] if chrom is not None: hm3_snps = hm3_snps[hm3_snps.CHROM == chrom] df_snp = dapgen.read_pvar(pfile + ".pvar") snp_list = df_snp.reset_index().merge(hm3_snps, on=["CHROM", "POS"])["snp"].values admix.logger.info(f"{len(snp_list)}/{TOTAL_HM3_SNPS} SNPs are retained") if out_prefix is None: return snp_list else: subset(pfile, out_prefix, snp_list=snp_list)