Source code for admix.simulate._geno

import numpy as np
from typing import List
import dask.array as da
import admix
import pandas as pd
from tqdm import tqdm
from ._lanc import hap_lanc as simulate_hap_lanc


[docs]def admix_geno( geno_list: List[da.Array], df_snp: pd.DataFrame, anc_props: List[float], mosaic_size: float, n_indiv: int, return_sparse_lanc=False, ) -> admix.Dataset: """Simulate admixed genotype The generative model is: - for each ancestry, the allele frequencies are drawn - for each individual, breakpoints are drawn from a Poisson process. and the ancestry will be filled based a multinomial distribution with `n_anc` components - for each SNP, given the ancestry and the allele frequencies, the haplotype is drawn. Haplotype are simulated under some frequencies Parameters ---------- geno_list : List[da.Array] List of ancestral data sets, each with (n_snp, n_indiv) df_snp : pd.DataFrame Dataframe of SNPs shared across ancestral data sets anc_props : list of float Proportion of ancestral populations mosaic_size : float Expected mosaic size in # of SNPs. use admix.lanc.calculate_mosaic_size() to calculate the mosaic size n_indiv : int Number of individuals to simulate Returns ------- admix.Dataset Simulated admixed dataset """ n_anc = len(geno_list) n_snp = geno_list[0].shape[0] assert all( n_snp == geno.shape[0] for geno in geno_list ), "all geno must have the same number of SNPs" assert n_snp == df_snp.shape[0], "df_snp must have the same number of SNPs" assert np.sum(anc_props) == 1, "anc_props must sum to 1" assert len(anc_props) == n_anc, "anc_props must have the same length as n_anc" anc_props = np.array(anc_props) dset_hap_list = [ da.hstack([geno[:, :, 0], geno[:, :, 1]]).compute() for geno in geno_list ] hap_lanc_breaks, hap_lanc_values = admix.simulate.hap_lanc( n_snp=n_snp, n_hap=n_indiv * 2, mosaic_size=mosaic_size, anc_props=anc_props ) geno = np.zeros((n_snp, n_indiv * 2), dtype=np.int8) for hap_i in tqdm(range(n_indiv * 2)): start = 0 for stop, val in zip(hap_lanc_breaks[hap_i], hap_lanc_values[hap_i]): geno[start:stop, hap_i] = dset_hap_list[val][ start:stop, np.random.randint(dset_hap_list[val].shape[1]) ] start = stop lanc_breaks, lanc_values = admix.data.haplo2diplo( breaks=hap_lanc_breaks, values=hap_lanc_values ) lanc = admix.data.Lanc(breaks=lanc_breaks, values=lanc_values) # a = np.random.randn(4, 5, 2) # b = np.zeros((4, 10)) # b[:, 0::2] = a[:, :, 0] # b[:, 1::2] = a[:, :, 1] # assert np.all(b == a.reshape(4, 10)) # assert np.all(b.reshape(4, 5, 2) == a) # equivalent: geno = np.dstack([geno[:, 0::2], geno[:, 1::2]]) geno = geno.reshape(n_snp, n_indiv, 2) dset = admix.Dataset( geno=da.from_array(geno, chunks=-1), lanc=lanc.dask(snp_chunk=n_snp), n_anc=n_anc, snp=df_snp, indiv=pd.DataFrame( {"indiv": ["indiv_" + str(i) for i in np.arange(n_indiv)]} ).set_index("indiv"), ) if return_sparse_lanc: return dset, lanc else: return dset
def admix_geno_simple( n_indiv: int, n_snp: int, n_anc: int, mosaic_size: float, anc_props: np.ndarray, allele_freqs: List[np.ndarray] = None, af_maf_low: float = 0.05, af_maf_high: float = 0.5, ) -> admix.Dataset: """Simulate admixed genotype The generative model is: - for each ancestry, the allele frequencies are drawn - for each individual, breakpoints are drawn from a Poisson process. and the ancestry will be filled based a multinomial distribution with `n_anc` components - for each SNP, given the ancestry and the allele frequencies, the haplotype is drawn. Haplotype are simulated under some frequencies Parameters ---------- n_indiv : int Number of individuals n_snp : int Number of SNPs n_anc : int Number of ancestries mosaic_size : float Expected mosaic size in # of SNPs anc_props : list of float Proportion of ancestral populations, if not specified, the proportion is uniform over the ancestral populations. allele_freqs: List[np.ndarray] Allele frequencies, if not specified, the frequencies are drawn from using `af_maf_low` and `af_maf_high` af_maf_low : float Lowest allowed minor allele frequency af_maf_low : float Highest allowed minor allele frequency Returns ------- admix.Dataset Simulated admixed dataset References ---------- 1. https://github.com/slowkoni/rfmix/blob/master/simulate.cpp 2. https://github.com/slowkoni/admixture-simulation 3. https://github.com/williamslab/admix-simu """ if anc_props is None: anc_props = np.ones(n_anc) / n_anc else: anc_props = np.array(anc_props) assert np.sum(anc_props) == 1, "anc_props must sum to 1" anc_props = np.array(anc_props) assert anc_props.size == n_anc, "anc_props must have the same length as n_anc" breaks, values = simulate_hap_lanc( n_hap=n_indiv * 2, n_snp=n_snp, mosaic_size=mosaic_size, anc_props=anc_props ) breaks, values = admix.data.haplo2diplo(breaks=breaks, values=values) rls_lanc = ( admix.data.Lanc(breaks=breaks, values=values).dask().swapaxes(0, 1).compute() ) # n_indiv x n_snp x 2 (2 for each haplotype) # rls_lanc = np.dstack([rls_lanc[0:n_indiv, :], rls_lanc[n_indiv:, :]]) rls_geno = np.zeros_like(rls_lanc) if allele_freqs is None: # allele frequencies for the two populations allele_freqs = [ np.random.uniform(low=af_maf_low, high=af_maf_high, size=n_snp) for _ in range(n_anc) ] else: assert ( len(allele_freqs) == n_anc ), "allele_freqs must have the same length as n_anc" assert np.all( [len(af) == n_snp for af in allele_freqs] ), "each element in allele_freqs must have the same length as n_snp" for i_anc in range(n_anc): rls_geno[rls_lanc == i_anc] = np.random.binomial( n=1, p=allele_freqs[i_anc][np.where(rls_lanc == i_anc)[1]] ) return admix.Dataset( geno=da.from_array(np.swapaxes(rls_geno, 0, 1), chunks=-1), lanc=da.from_array(np.swapaxes(rls_lanc, 0, 1), chunks=-1), n_anc=n_anc, ) def admix_geno2( n_indiv: int, n_snp: int, n_anc: int, mosaic_size: float, anc_props: np.ndarray = None, ancestral_af_range: List[float] = [0.1, 0.9], F_st: float = 0.3, ) -> admix.Dataset: """Simulate admixed genotype The generative model is: 1. for each ancestry, the allele frequencies are drawn 2. for each individual, breakpoints are drawn from a Poisson process. and the ancestry will be filled based a multinomial distribution with `n_anc` components 3. for each SNP, given the ancestry and the allele frequencies, the haplotype is drawn Haplotype are simulated under some frequencies Parameters ---------- n_indiv : int Number of individuals n_snp : int Number of SNPs n_anc : int Number of ancestries mosaic_size : float Expected mosaic size in # of SNPs anc_props : list of float Proportion of ancestral populations, if not specified, the proportion is uniform over the ancestral populations. ancestral_af_range: [float, float] allele frequencies range. F_st: float Distance for the two populations that form the admixed population. Returns ------- admix.Dataset Simulated admixed dataset References ---------- We follow Zaitlen et al. 2014, Nature Genetics See description in subsection "Simulations with simulated genotypes" in Methods """ assert n_anc == 2, "n_anc must be 2, only two-way admixture is currently supported" # global ancestry if anc_props is None: anc_props = np.ones(n_anc) / n_anc else: anc_props = np.array(anc_props) assert np.sum(anc_props) == 1, "anc_props must sum to 1" anc_props = np.array(anc_props) assert anc_props.size == n_anc, "anc_props must have the same length as n_anc" # local ancestry rls_lanc = ( simulate_hap_lanc( n_hap=n_indiv * 2, n_snp=n_snp, mosaic_size=mosaic_size, anc_props=anc_props ) .dask() .T ) # n_indiv x n_snp x 2 (2 for each haplotype) rls_lanc = np.dstack([rls_lanc[0:n_indiv, :], rls_lanc[n_indiv:, :]]) rls_geno = np.zeros_like(rls_lanc) ancestral_afs = np.random.uniform( low=ancestral_af_range[0], high=ancestral_af_range[1], size=n_snp ) # allele frequencies for the two populations allele_freqs = [ np.random.beta( a=ancestral_afs * (1 - F_st) / F_st, b=(1 - ancestral_afs) * (1 - F_st) / F_st, ) for _ in range(n_anc) ] for i_anc in range(n_anc): rls_geno[rls_lanc == i_anc] = np.random.binomial( n=1, p=allele_freqs[i_anc][np.where(rls_lanc == i_anc)[1]] ) return admix.Dataset( geno=da.from_array(np.swapaxes(rls_geno, 0, 1)), lanc=da.from_array(np.swapaxes(rls_lanc, 0, 1)), n_anc=n_anc, )