Source code for admix.cli._lanc

import admix
import subprocess
import dapgen
import os
import glob
import numpy as np
import pandas as pd
from ._utils import log_params


[docs]def lanc( pfile: str, ref_pfile: str, ref_pop_col: str, ref_pops: str, out: str, ): log_params("lanc", locals()) sample_dset = admix.io.read_dataset(pfile=pfile) ref_dset = admix.io.read_dataset(pfile=ref_pfile) assert set(sample_dset.snp.index) == set(ref_dset.snp.index), ( "`pfile` and `ref_pfile` must have the same snp index" "(snp match feature coming soon)." ) ref_dsets = [ ref_dset[:, (ref_dset.indiv[ref_pop_col] == pop).values] for pop in ref_pops ] est = admix.ancestry.lanc(sample_dset=sample_dset, ref_dsets=ref_dsets) admix.data.Lanc(array=est).write(out)
def lanc_count(lanc: str, out: str, n_anc: int = None): """Count the number / proportion of local ancestries for each individual Parameters ---------- lanc : str path to the lanc file, this can be a .lanc file, a wildcard of .lanc files, or a directory containing .lanc files. If the corresponding .psam file is present, the .psam file will be used as the individual list. out : str path to the output file n_anc : int number of ancestral populations in the data """ log_params("lanc-count", locals()) if lanc.endswith(".lanc"): lanc_path = [lanc] elif "*" in lanc: lanc_path = glob.glob(lanc) elif os.path.isdir(lanc): lanc_path = [p for p in glob.glob(lanc + "/*.lanc")] else: raise ValueError("Unable to parse lanc pathname") admix.logger.info(f"Found {len(lanc_path)} lanc files: {','.join(lanc_path)}") # read psam if available psam_path = [p.replace(".lanc", ".psam") for p in lanc_path] if all(os.path.exists(p) for p in psam_path): # check all psam files have the same individual ID psam_indiv = [dapgen.read_psam(p).index for p in psam_path] assert all( psam_indiv[0].equals(i) for i in psam_indiv[1:] ), "Individuals in psam files do not match" indiv_list = psam_indiv[0].values elif not any(os.path.exists(p) for p in psam_path): indiv_list = None else: raise ValueError("either .psam all exists or none exists") lanc_mat = admix.data.Lanc(lanc_path[0]) n_indiv = lanc_mat.n_indiv if indiv_list is not None: assert n_indiv == len( indiv_list ), "Number of individuals in lanc and psam files do not match" else: indiv_list = np.arange(n_indiv).astype(str) lanc_count = lanc_mat.lanc_count() if n_anc is not None: assert ( lanc_count.shape[1] == n_anc ), "Number of ancestral populations do not match" else: n_anc = lanc_count.shape[1] admix.logger.info(f"Inferred number of ancestral populations: {n_anc}") for p in lanc_path[1:]: lanc_count += admix.data.Lanc(p).lanc_count(n_anc=n_anc) lanc_prop = lanc_count / lanc_count.sum(axis=1, keepdims=True) admix.logger.info(f"Writing lanc count file: {out}") count_cols = [f"COUNT{i + 1}" for i in range(n_anc)] prop_cols = [f"PROP{i+1}" for i in range(n_anc)] df_res = pd.DataFrame( data=np.concatenate([lanc_count, lanc_prop], axis=1), index=indiv_list, columns=count_cols + prop_cols, ) df_res[count_cols] = df_res[count_cols].astype(int) df_res.index.name = "indiv" df_res.to_csv(out, sep="\t", float_format="%.4g") def lanc_convert(pfile: str, out: str, rfmix: str = None, raw: str = None): """Convert local ancestry inference results (e.g. RFmix .msp.tsv) to a .lanc file Parameters ---------- pfile : str Path to the pfile. The path is without the .pgen suffix out : str Path to the output file rfmix : str Path to the rfmix .msp.tsv file, raw : str Path to the raw file """ log_params("lanc-convert", locals()) # only one of rfmix and raw should be specified assert (rfmix is None) + ( raw is None ) == 1, "Only one of rfmix and raw should be specified" if rfmix is not None: geno, df_snp, df_indiv = dapgen.read_pfile(pfile, phase=True) admix.logger.info(f"Reading rfmix file: {rfmix}") lanc = admix.io.read_rfmix( path=rfmix, df_snp=df_snp, df_indiv=df_indiv, ) admix.logger.info(f"Obtaining local ancestry {lanc}") admix.logger.info(f"Writing lanc file: {out}") lanc.write(out) if raw is not None: assert False, "raw not implemented yet" def lanc_impute(pfile: str, ref_pfile: str, out: str = None): """Impute the local ancestry for `pfile` using `ref_pfile` Parameters ---------- pfile : str Path to the pfile ref_pfile : str Path to the reference pfile out : str Path to the output pfile (default to pfile + ".lanc") """ log_params("lanc-impute", locals()) # check <pfile>.lanc does not exist assert not os.path.exists(pfile + ".lanc"), "`pfile` already has a .lanc file" sample_dset = admix.io.read_dataset(pfile=pfile) ref_dset = admix.io.read_dataset(pfile=ref_pfile) ref_lanc = admix.data.Lanc(ref_pfile + ".lanc") sample_lanc = ref_lanc.impute( ref_dset.snp[["CHROM", "POS"]].values, sample_dset.snp[["CHROM", "POS"]].values ) if out is None: out = pfile + ".lanc" assert not os.path.exists(out), f"out={out} already exists" sample_lanc.write(out)