Source code for admix.data._geno

import numpy as np
import pandas as pd
from tqdm import tqdm
import dask.array as da
import admix
import dask
from typing import Union, Tuple, List
import dapgen


def calc_snp_prior_var(df_snp_info, her_model):
    """
    Calculate the SNP prior variance from SNP information
    """
    assert her_model in ["uniform", "gcta", "ldak", "mafukb"]
    if her_model == "uniform":
        return np.ones(len(df_snp_info))
    elif her_model == "gcta":
        freq = df_snp_info["FREQ"].values
        assert np.all(freq > 0), "frequencies should be larger than zero"
        return np.float_power(freq * (1 - freq), -1)
    elif her_model == "mafukb":
        # MAF-dependent genetic architecture, \alpha = -0.38 estimated from meta-analysis in UKB traits
        freq = df_snp_info["FREQ"].values
        assert np.all(freq > 0), "frequencies should be larger than zero"
        return np.float_power(freq * (1 - freq), -0.38)
    elif her_model == "ldak":
        freq, weight = df_snp_info["FREQ"].values, df_snp_info["LDAK_WEIGHT"].values
        return np.float_power(freq * (1 - freq), -0.25) * weight
    else:
        raise NotImplementedError


def impute_with_mean(mat, inplace=False, axis=1):
    """impute the each entry using the mean of the input matrix np.mean(mat, axis=axis)
    axis = 1 corresponds to row-wise imputation
    axis = 0 corresponds to column-wise imputation

    Parameters
    ----------
    mat : np.ndarray
        input matrix. For reminder, the genotype matrix is with shape (n_snp, n_indiv)
    inplace : bool
        whether to return a new dataset or modify the input dataset
    axis : int
        axis to impute along

    Returns
    -------
    if inplace:
        mat : np.ndarray
            (n_snp, n_indiv) matrix
    else:
        None
    """
    assert axis in [0, 1], "axis should be 0 or 1"
    if not inplace:
        mat = mat.copy()

    # impute the missing genotypes with the mean of each row
    mean = np.nanmean(mat, axis=axis)
    nanidx = np.where(np.isnan(mat))

    # index the mean using the nanidx[1 - axis]
    # axis = 1, row-wise imputation, index the mean using the nanidx[0]
    # axis = 0, columnw-ise imputation, index the mean using the nanidx[1]
    mat[nanidx] = mean[nanidx[1 - axis]]

    if not inplace:
        return mat
    else:
        return None


def geno_mult_mat(
    geno: da.Array,
    mat: np.ndarray,
    impute_geno: bool = True,
    mat_dim: str = "snp",
    return_snp_var: bool = False,
) -> np.ndarray:
    """Multiply genotype matrix with another matrix

    Chunk of genotype matrix will be read sequentially along the SNP dimension,
    and multiplied with the `mat`.

    Without transpose, result will be (n_snp, n_rep)
    With transpose, result will be (n_indiv, n_rep)

    Missing values in geno will be imputed with the mean of the genotype matrix.

    Parameters
    ----------
    geno : da.Array
        Genotype matrix with shape (n_snp, n_indiv)
        geno.chunk contains the chunk of genotype matrix to be multiplied
    mat : np.ndarray
        Matrix to be multiplied with the genotype matrix. If the passed variable
        is a vector, it will be transformed to be a 1-column matrix.
    impute_geno : bool
        Whether to impute missing values with the mean of the genotype matrix
    mat_dim : str
        First dimension of the `mat`, either "snp" or "indiv"
        Whether to transpose the genotype matrix and calulate geno.T @ mat
    return_snp_var : bool
        Whether to return the variance of each SNP, useful in simple linear
        regression

    Returns
    -------
    np.ndarray
        Result of the multiplication
    """
    assert mat_dim in ["snp", "indiv"], "mat_dim should be `snp` or `indiv`"

    if mat.ndim == 1:
        mat = mat[:, np.newaxis]
    # chunks over SNPs
    chunks = geno.chunks[0]
    indices = np.insert(np.cumsum(chunks), 0, 0)
    n_snp, n_indiv = geno.shape
    n_rep = mat.shape[1]

    snp_var = np.zeros(n_snp)
    if mat_dim == "indiv":
        # geno: (n_snp, n_indiv)
        # mat: (n_indiv, n_rep)
        assert (
            mat.shape[0] == n_indiv
        ), "when mat_dim is 'indiv', matrix should be of shape (n_indiv, n_rep)"
        ret = np.zeros((n_snp, n_rep))
        for i in tqdm(range(len(indices) - 1), desc="admix.data.geno_mult_mat"):
            start, stop = indices[i], indices[i + 1]
            geno_chunk = geno[start:stop, :].compute()
            # impute missing genotype
            if impute_geno:
                impute_with_mean(geno_chunk, inplace=True)
            ret[start:stop, :] = np.dot(geno_chunk, mat)

            if return_snp_var:
                snp_var[start:stop] = np.var(geno_chunk, axis=0)
    elif mat_dim == "snp":
        # geno: (n_indiv, n_snp)
        # mat: (n_snp, n_rep)
        assert (
            mat.shape[0] == n_snp
        ), "when mat_dim is 'snp', matrix should be of shape (n_snp, n_rep)"
        ret = np.zeros((n_indiv, n_rep))
        for i in tqdm(range(len(indices) - 1), desc="admix.data.geno_mult_mat"):
            start, stop = indices[i], indices[i + 1]
            geno_chunk = geno[start:stop, :].compute()
            # impute missing genotype
            if impute_geno:
                impute_with_mean(geno_chunk, inplace=True)
            ret += np.dot(geno_chunk.T, mat[start:stop, :])

            if return_snp_var:
                snp_var[start:stop] = np.var(geno_chunk, axis=0)
    else:
        raise ValueError("mat_dim should be `snp` or `indiv`")
    if return_snp_var:
        return ret, snp_var
    else:
        return ret


def grm(geno: da.Array, subpopu: np.ndarray = None, std_method: str = "std"):
    """Calculate the GRM matrix
    This function is to serve as an alternative of GCTA --make-grm

    Parameters
    ----------
    geno: admix.Dataset
        genotype (n_snp, n_indiv) matrix
    subpopu : np.ndarray
        subpopulation labels, with shape (n_indiv,). The allele frequencies and
        normalization are performed separately within each subpopulation.
    std_method : str
        Method to standardize the GRM. Currently supported:
        "std" (standardize to have mean 0 and variance 1),
        "allele" (standardize to have mean 0 but no scaling)

    Returns
    -------
    np.ndarray
        GRM matrix (n_indiv, n_indiv)
    """

    def normalize_geno(g):
        """Normalize the genotype matrix"""
        # impute missing genotypes
        g = impute_with_mean(g, inplace=False, axis=1)
        # normalize
        if std_method == "std":
            g = (g - np.mean(g, axis=1)[:, None]) / np.std(g, axis=1)[:, None]
        elif std_method == "allele":
            g = g - np.mean(g, axis=1)[:, None]
        else:
            raise ValueError("std_method should be either `std` or `allele`")
        return g

    assert std_method in ["std", "allele"], "std_method should be `std` or `allele`"
    n_snp = geno.shape[0]
    n_indiv = geno.shape[1]

    if subpopu is not None:
        assert (
            n_indiv == subpopu.shape[0]
        ), "subpopu should have the same length as the number of individuals"
        unique_subpopu = np.unique(subpopu)
        admix.logger.info(
            f"{len(unique_subpopu)} subpopulations found: {unique_subpopu}"
        )

    admix.logger.info(
        f"Calculating GRM matrix with {n_snp} SNPs and {n_indiv} individuals"
    )
    mat = 0
    snp_chunks = geno.chunks[0]
    indices = np.insert(np.cumsum(snp_chunks), 0, 0)
    for i in tqdm(range(len(indices) - 1), desc="admix.data.grm"):
        start, stop = indices[i], indices[i + 1]
        geno_chunk = geno[start:stop, :].compute()
        if subpopu is not None:
            for popu in np.unique(subpopu):
                geno_chunk[:, subpopu == popu] = normalize_geno(
                    geno_chunk[:, subpopu == popu]
                )
        else:
            geno_chunk = normalize_geno(geno_chunk)

        mat += np.dot(geno_chunk.T, geno_chunk) / n_snp
    return mat


def admix_grm(
    geno: da.Array, lanc: da.Array, n_anc: int = 2, snp_prior_var: np.ndarray = None
):
    """Calculate ancestry specific GRM matrix

    Parameters
    ----------
    geno : da.Array
        Genotype matrix with shape (n_snp, n_indiv, 2)
    lanc : np.ndarray
        Local ancestry matrix with shape (n_snp, n_indiv, 2)
    n_anc : int
        Number of ancestral populations
    snp_prior_var : np.ndarray
        Prior variance of each SNP, shape (n_snp,)

    Returns
    -------
    G1: np.ndarray
        ancestry specific GRM matrix for the 1st ancestry
    G2: np.ndarray
        ancestry specific GRM matrix for the 2nd ancestry
    G12: np.ndarray
        ancestry specific GRM matrix for cross term of the 1st and 2nd ancestry
    """

    assert n_anc == 2, "only two-way admixture is implemented"
    assert np.all(geno.shape == lanc.shape)

    apa = admix.data.allele_per_anc(geno, lanc, n_anc=n_anc)
    n_snp, n_indiv = apa.shape[0:2]

    if snp_prior_var is None:
        snp_prior_var = np.ones(n_snp)
    snp_prior_var_sum = snp_prior_var.sum()
    G1 = np.zeros([n_indiv, n_indiv])
    G2 = np.zeros([n_indiv, n_indiv])
    G12 = np.zeros([n_indiv, n_indiv])

    snp_chunks = apa.chunks[0]
    indices = np.insert(np.cumsum(snp_chunks), 0, 0)

    for i in tqdm(range(len(indices) - 1), desc="admix.data.admix_grm"):
        start, stop = indices[i], indices[i + 1]
        apa_chunk = apa[start:stop, :, :].compute()

        # multiply by the prior variance on each SNP
        apa_chunk *= np.sqrt(snp_prior_var[start:stop])[:, None, None]
        a1_chunk, a2_chunk = apa_chunk[:, :, 0], apa_chunk[:, :, 1]

        G1 += np.dot(a1_chunk.T, a1_chunk) / snp_prior_var_sum
        G2 += np.dot(a2_chunk.T, a2_chunk) / snp_prior_var_sum
        G12 += np.dot(a1_chunk.T, a2_chunk) / snp_prior_var_sum

    return G1, G2, G12


def admix_grm_equal_var(
    geno: da.Array, lanc: da.Array, n_anc: int, snp_prior_var: np.ndarray = None
):
    """Calculate ancestry specific GRM matrix K1, K2 (assuming equal variances for ancestries)

    Parameters
    ----------
    geno : da.Array
        Genotype matrix with shape (n_snp, n_indiv, 2)
    lanc : np.ndarray
        Local ancestry matrix with shape (n_snp, n_indiv, 2)
    n_anc : int
        Number of ancestral populations
    snp_prior_var : np.ndarray
        Prior variance of each SNP, shape (n_snp,)

    Returns
    -------
    K1: np.ndarray
        sum of diagonal terms
    K2: np.ndarray
        off-diagonal terms
    """
    assert np.all(geno.shape == lanc.shape)

    apa = admix.data.allele_per_anc(geno, lanc, n_anc=n_anc)
    n_snp, n_indiv = apa.shape[0:2]

    if snp_prior_var is None:
        snp_prior_var = np.ones(n_snp)
    snp_prior_var_sum = snp_prior_var.sum()

    K1 = np.zeros([n_indiv, n_indiv])
    K2 = np.zeros([n_indiv, n_indiv])

    snp_chunks = apa.chunks[0]
    indices = np.insert(np.cumsum(snp_chunks), 0, 0)

    for i in tqdm(range(len(indices) - 1), desc="admix.data.admix_grm_equal_var"):
        start, stop = indices[i], indices[i + 1]
        apa_chunk = apa[start:stop, :, :].compute()

        # multiply by the prior variance on each SNP
        apa_chunk *= np.sqrt(snp_prior_var[start:stop])[:, None, None]

        # diagonal terms
        for i_anc in range(n_anc):
            a_chunk = apa_chunk[:, :, i_anc]
            K1 += np.dot(a_chunk.T, a_chunk) / snp_prior_var_sum

        # off-diagonal terms
        for i_anc in range(n_anc):
            for j_anc in range(i_anc + 1, n_anc):
                a1_chunk, a2_chunk = apa_chunk[:, :, i_anc], apa_chunk[:, :, j_anc]
                K2 += np.dot(a1_chunk.T, a2_chunk) / snp_prior_var_sum

    K2 = K2 + K2.T
    return K1, K2


def admix_ld(dset: admix.Dataset, cov: np.ndarray = None):
    """Calculate ancestry specific LD matrices

    Parameters
    ----------
    dset: admix.Dataset
        dataset containing geno, lanc
    cov : Optional[np.ndarray]
        (n_indiv, n_cov) covariates of the genotypes, an all `1` intercept covariate will always be added
        so there is no need to add the intercept in covariates.
    Returns
    -------
    K1: np.ndarray
        ancestry specific LD matrix for the 1st ancestry
    K2: np.ndarray
        ancestry specific LD matrix for the 2nd ancestry
    K12: np.ndarray
        ancestry specific LD matrix for cross term of the 1st and 2nd ancestry
    """
    assert dset.n_anc == 2, "admix_ld only works for 2 ancestries for now"
    apa = dset.allele_per_anc()

    n_snp, n_indiv = apa.shape[0:2]

    a1, a2 = apa[:, :, 0], apa[:, :, 1]
    if cov is None:
        cov = np.ones((n_indiv, 1))
    else:
        cov = np.hstack([np.ones((n_indiv, 1)), cov])
    # projection = I - X * (X'X)^-1 * X'
    cov_proj_mat = np.eye(n_indiv) - np.linalg.multi_dot(
        [cov, np.linalg.inv(np.dot(cov.T, cov)), cov.T]
    )
    a1 = np.dot(a1, cov_proj_mat)
    a2 = np.dot(a2, cov_proj_mat)
    # center with row mean
    # a1 -= a1.mean(axis=1, keepdims=True)
    # a2 -= a2.mean(axis=1, keepdims=True)
    ld1 = np.dot(a1, a1.T) / n_indiv
    ld2 = np.dot(a2, a2.T) / n_indiv
    ld12 = np.dot(a1, a2.T) / n_indiv
    ld1, ld2, ld12 = dask.compute(ld1, ld2, ld12)
    return {"11": ld1, "22": ld2, "12": ld12}


[docs]def af_per_anc( geno, lanc, n_anc=2, return_nhaplo=False ) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: """ Calculate allele frequency per ancestry If at one particular SNP locus, no SNP from one particular ancestry can be found the corresponding entries will be filled with np.NaN. Parameters ---------- geno: np.ndarray genotype matrix lanc: np.ndarray local ancestry matrix n_anc: int number of ancestries return_nhaplo: bool whether to return the number of haplotypes per ancestry Returns ------- np.ndarray (n_snp, n_anc) length list of allele frequencies. """ assert np.all(geno.shape == lanc.shape) n_snp = geno.shape[0] af = np.zeros((n_snp, n_anc)) lanc_nhaplo = np.zeros((n_snp, n_anc)) snp_chunks = geno.chunks[0] indices = np.insert(np.cumsum(snp_chunks), 0, 0) for i in tqdm(range(len(indices) - 1), desc="admix.data.af_per_anc"): start, stop = indices[i], indices[i + 1] geno_chunk = geno[start:stop, :, :].compute() lanc_chunk = lanc[start:stop, :, :].compute() for anc_i in range(n_anc): lanc_mask = lanc_chunk == anc_i lanc_nhaplo[start:stop, anc_i] = np.sum(lanc_mask, axis=(1, 2)) # mask SNPs with local ancestry not `i_anc` af[start:stop, anc_i] = ( np.ma.masked_where(np.logical_not(lanc_mask), geno_chunk) .sum(axis=(1, 2)) .data ) / lanc_nhaplo[start:stop, anc_i] if return_nhaplo: return af, lanc_nhaplo else: return af
[docs]def allele_per_anc( geno: da.Array, lanc: da.Array, n_anc: int, center=False, ): """Get allele count per ancestry Parameters ---------- geno: da.Array genotype data lanc: da.Array local ancestry data n_anc: int number of ancestries Returns ------- Return allele counts per ancestries """ assert center is False, "center=True should not be used" assert np.all(geno.shape == lanc.shape), "shape of `hap` and `lanc` are not equal" assert geno.ndim == 3, "`hap` and `lanc` should have three dimension" n_snp, n_indiv, n_haplo = geno.shape assert n_haplo == 2, "`n_haplo` should equal to 2, check your data" assert isinstance(geno, da.Array) & isinstance( lanc, da.Array ), "`geno` and `lanc` should be dask array" # make sure the chunk size along the ploidy axis to be 2 geno = geno.rechunk({2: 2}) lanc = lanc.rechunk({2: 2}) assert ( geno.chunks == lanc.chunks ), "`geno` and `lanc` should have the same chunk size" assert len(geno.chunks[1]) == 1, ( "geno / lanc should not be chunked across the second dimension" "(individual dimension)" ) def helper(geno_chunk, lanc_chunk, n_anc): n_snp, n_indiv, n_haplo = geno_chunk.shape apa = np.zeros((n_snp, n_indiv, n_anc), dtype=np.float64) for i_haplo in range(n_haplo): haplo_hap = geno_chunk[:, :, i_haplo] haplo_lanc = lanc_chunk[:, :, i_haplo] for i_anc in range(n_anc): apa[:, :, i_anc][haplo_lanc == i_anc] += haplo_hap[haplo_lanc == i_anc] return apa # the resulting chunk sizes will be the same as the input for snp, indiv # while the third dimension will be (n_anc, ) output_chunks = (geno.chunks[0], geno.chunks[1], (n_anc,)) res = da.map_blocks( lambda geno_chunk, lanc_chunk: helper( geno_chunk=geno_chunk, lanc_chunk=lanc_chunk, n_anc=n_anc ), geno, lanc, dtype=np.float64, chunks=output_chunks, ) return res
def calc_pgs(dset: admix.Dataset, df_weights: pd.DataFrame, method: str): """Calculate PGS for each individual Parameters ---------- dset: admix.Dataset dataset object df_weights: pd.DataFrame weights for each individual method: str method to calculate PGS. Options are: - "total": vanilla PGS - "partial": partial PGS, calculate partial PGS for each local ancestry Returns ------- np.ndarray PGS for each individual - method = "total": (n_indiv, ) - method = "partial": (n_indiv, n_anc) """ assert method in [ "total", "partial", ], "method should be either 'total' or 'partial'" assert np.all( dset.snp.index == df_weights.index ), "`dset` and `df_weights` should have exactly the same index" assert len(df_weights.columns) == 1, "`df_weights` should have only one column" if method == "total": pgs = admix.data.geno_mult_mat( dset.geno.sum(axis=2), df_weights.values ).flatten() elif method == "partial": n_anc = dset.n_anc pgs = np.zeros((dset.n_indiv, n_anc)) apa = dset.allele_per_anc() for i_anc in range(n_anc): pgs[:, i_anc] = admix.data.geno_mult_mat( apa[:, :, i_anc], df_weights.values ).flatten() else: raise ValueError("method should be either 'total' or 'partial'") return pgs
[docs]def calc_partial_pgs( dset: admix.Dataset, df_weights: pd.DataFrame, dset_ref: admix.Dataset = None, ref_pop_indiv: List[List[str]] = None, weight_col="WEIGHT", ) -> pd.DataFrame: """Given a vector of polygenic score weights, calculate polygenic scores with regard to every ancestry backgrounds for each individual. Parameters ---------- dset : admix.Dataset The admix.Dataset object. df_weights : pd.DataFrame The dataframe for polygenic score weights for each SNP, containing CHROM, POS, REF, ALT, WEIGHT, with index being SNP ID. CHROM, POS, REF, ALT will be aligned with `dset.snp`. dset_ref : admix.Dataset, optional The reference dataset object. Use `dapgen.align_snp` to align the SNPs between `dset` and `dset_ref`. `CHROM` and `POS` must match, with potential flips of `REF` and `ALT` allele coding. ref_pop_indiv : List[List[str]], optional The list of reference individual ID in `dset_ref`. weight_col : str, optional The column name for the weight in `df_weights`. Returns ------- pd.DataFrame The polygenic scores (PGS) for each individual with shape of (n_indiv, n_anc). Raises ------ AssertionError If both `dset_ref` and `ref_pop_indiv` are None or not None. Notes ----- - The `dset` and `df_weights` should align, with potential allele flip. - If `dset_ref` is provided, `dset` and `dset_ref` should align, with potential allele flip. - The length of `ref_pop_indiv` should match with `dset.n_anc`. """ assert (dset_ref is None) == ( ref_pop_indiv is None ), "both `dset_ref` and `ref_pop_indiv` should be None or not None" CALC_REF = dset_ref is not None CHECK_COLS = ["CHROM", "POS", "REF", "ALT"] ## check input idx1, idx2, sample_wgt_flip = dapgen.align_snp( df1=dset.snp[CHECK_COLS], df2=df_weights[CHECK_COLS] ) assert np.all(idx1 == dset.snp.index) & np.all( idx2 == df_weights.index ), "`dset` and `df_weights` should align, with potential allele flip" if CALC_REF: idx1, idx2, ref_wgt_flip = dapgen.align_snp( df1=dset.snp[CHECK_COLS], df2=dset_ref.snp[CHECK_COLS] ) assert np.all(idx1 == dset.snp.index) & np.all( idx2 == dset_ref.snp.index ), "`dset` and `dset_ref` should align, with potential allele flip" weights = df_weights[weight_col].values sample_weights = weights * sample_wgt_flip if CALC_REF: ref_weights = weights * ref_wgt_flip * sample_wgt_flip assert ( len(ref_pop_indiv) == dset.n_anc ), "`len(ref_pops)` should match with `dset.n_anc`" ## scoring dset_geno, dset_lanc = dset.geno.compute(), dset.lanc.compute() sample_pgs = np.zeros((dset.n_indiv, dset.n_anc)) if CALC_REF: ref_geno_list = [dset_ref[:, pop].geno.compute() for pop in ref_pop_indiv] ref_pgs = [[] for pop in ref_pop_indiv] # iterate over each individuals for indiv_i in tqdm(range(dset.n_indiv), desc="admix.data.calc_partial_pgs"): indiv_ref_pgs = [0, 0] # pgs for sample individuals for haplo_i in range(2): geno = dset_geno[:, indiv_i, haplo_i] lanc = dset_lanc[:, indiv_i, haplo_i] for lanc_i in range(dset.n_anc): # sample sample_pgs[indiv_i, lanc_i] += np.dot( geno[lanc == lanc_i], sample_weights[lanc == lanc_i] ) # pgs for reference individuals if CALC_REF: ref_geno = ref_geno_list[lanc_i][lanc == lanc_i, :, :] if ref_geno.shape[0] > 0: ref_geno = ref_geno.reshape(ref_geno.shape[0], -1) s = np.dot(ref_weights[lanc == lanc_i], ref_geno) else: s = np.zeros(ref_geno.shape[1] * 2) indiv_ref_pgs[lanc_i] += s if CALC_REF: for lanc_i in range(dset.n_anc): ref_pgs[lanc_i].append(indiv_ref_pgs[lanc_i]) # format ref_pgs: for each ancestry, we have n_indiv x (n_ref_indiv x 2) # each reference has 2 haplotypes if CALC_REF: ref_pgs = [ pd.DataFrame( data=np.vstack(ref_pgs[i]), index=dset.indiv.index, columns=np.concatenate( [[str(i) + "_1", str(i) + "_2"] for i in ref_pop_indiv[i]] ), ) for i in range(dset.n_anc) ] sample_pgs = pd.DataFrame( data=sample_pgs, index=dset.indiv.index, columns=[f"ANC{i}" for i in range(1, dset.n_anc + 1)], ) if CALC_REF: return sample_pgs, ref_pgs else: return sample_pgs