Source code for admix.assoc

import statsmodels.api as sm
import numpy as np
import pandas as pd
from scipy import stats
from tqdm import tqdm
import admix
from typing import Any, Dict, List, Optional
import dask.array as da
import admix
from statsmodels.tools import sm_exceptions
import warnings
import itertools

warnings.filterwarnings(action="error", category=sm_exceptions.ValueWarning)

__all__ = ["marginal"]


def _format_block_test(
    var: np.ndarray,
    cov: np.ndarray,
    pheno: np.ndarray,
    var_size: int,
    test_vars: np.ndarray,
):
    n_indiv = var.shape[0]
    assert (
        cov.shape[0] == n_indiv
    ), "Number of individuals in genotype and covariate do not match"
    assert pheno.ndim == 1, "Phenotype must be a vector"
    assert (
        pheno.shape[0] == n_indiv
    ), "Number of individuals in genotype and phenotype do not match"
    assert var_size > 0, "Variable size must be greater than 0"
    assert (
        var.shape[1] % var_size == 0
    ), f"Number of variables in var ({var.shape[1]}) must be a multiple of var_size ({var_size})"
    n_var = var.shape[1] // var_size

    if isinstance(test_vars, List):
        test_vars = np.array(test_vars)

    assert np.all(test_vars < var_size), "test_vars must be less than var_size"

    # fill covariates
    n_cov = cov.shape[1]

    if isinstance(var, da.Array):
        var = var.compute()

    assert isinstance(var, np.ndarray), "var must be a numpy array"
    return var, n_var, n_cov, test_vars


def _block_het_test(
    var: np.ndarray,
    cov: np.ndarray,
    pheno: np.ndarray,
    var_size: int,
    test_vars: np.ndarray,
    fast: bool,
    family: str,
    logistic_kwargs: Dict[str, Any] = dict(),
):
    admix.logger.info(
        "Currently HET test is implemented through statsmodels, which can be slow. "
        "Pass in small amount of data whenever possible."
    )

    ## Format data
    var, n_var, n_cov, test_vars = _format_block_test(
        var, cov, pheno, var_size, test_vars
    )

    # format reduced variable matrix
    n_indiv = var.shape[0]
    reduced_var_size = var_size - len(test_vars) + 1
    design_full = np.zeros((n_indiv, var_size + n_cov))
    design_full[:, var_size : var_size + n_cov] = cov
    design_reduced = np.zeros((n_indiv, reduced_var_size + n_cov))
    design_reduced[:, reduced_var_size : reduced_var_size + n_cov] = cov
    other_vars = np.array([i for i in range(var_size) if i not in test_vars])
    shared_param_index = np.concatenate(
        [other_vars, np.arange(var_size, var_size + n_cov)]
    ).astype(int)

    # statsmodels implementation
    if family == "linear":
        reg_method = lambda pheno, design, start_params=None: sm.OLS(
            pheno, design, missing="drop"
        ).fit(disp=0, start_params=start_params)

    elif family == "logistic":
        reg_method = lambda pheno, design, start_params=None: sm.Logit(
            pheno, design, missing="drop"
        ).fit(disp=0, start_params=start_params)
    else:
        raise NotImplementedError

    # legacy implementation of using F-test
    # ftest_mat = np.zeros([len(test_vars) - 1, design_full.shape[1]])
    # for i in range(len(test_vars) - 1):
    #     ftest_mat[i, test_vars[i]] = 1
    #     ftest_mat[i, test_vars[i + 1]] = -1

    res = np.zeros((n_var, var_size * 2 + 2))

    for i in range(n_var):
        design_full[:, 0:var_size] = var[:, i * var_size : (i + 1) * var_size]
        model_full = reg_method(pheno, design_full)

        # coefficients
        res[i, 0 : var_size * 2 : 2] = model_full.params[0:var_size]
        # standard errors
        res[i, 1 : var_size * 2 : 2] = model_full.bse[0:var_size]
        res[i, -2] = model_full.nobs

        design_reduced[:, 0] = var[:, test_vars + i * var_size].sum(axis=1)
        if len(other_vars) > 0:
            design_reduced[:, 1:reduced_var_size] = var[:, other_vars + i * var_size]

        model_reduced = reg_method(
            pheno,
            design_reduced,
            start_params=np.concatenate([[0.0], model_full.params[shared_param_index]]),
        )

        if family == "linear" or family == "logistic":
            p = stats.chi2.sf(
                -2 * (model_reduced.llf - model_full.llf),
                (model_full.df_model - model_reduced.df_model),
            )
            res[i, -1] = p
        else:
            raise NotImplementedError
    return res


def _block_test(
    var: np.ndarray,
    cov: np.ndarray,
    pheno: np.ndarray,
    var_size: int,
    test_vars: np.ndarray,
    fast: bool,
    family: str,
    logistic_kwargs: Dict[str, Any] = dict(),
) -> np.ndarray:
    """
    Perform association testing for a block of variables

    Parameters
    ----------
    var : np.ndarray
        (n_indiv, n_var x var_size) variable matrix
    cov : np.ndarray
        (n_indiv, n_cov) covariate matrix
    pheno : np.ndarray
        (n_snp) phenotype matrix
    var_size : int
        Number of variables for each test
    test_vars : List[int]
        Index of variables to test

    Returns
    -------
    np.ndarray
        (n_snp, n_test_var * 2 + 2) association testing information.
        - The first n_test_var * 2 columns are the effect size (odd columns)
        and standard error (even columns) for each variable.
        - The last two columns are the number of individuals and p-value.
    """
    var, n_var, n_cov, test_vars = _format_block_test(
        var, cov, pheno, var_size, test_vars
    )

    if fast:
        try:
            import tinygwas
        except ImportError:
            raise ImportError(
                "\nplease install tinygwas:\n\n"
                "\tpip install git+https://github.com/bogdanlab/tinygwas.git#egg=tinygwas"
            )

        res = np.zeros((n_var, var_size * 2 + 2))
        if family == "linear":
            tinygwas.linear_f_test(var, cov, pheno, var_size, test_vars, res)
            f_stats = res[:, -1]
            n_indiv_test = res[:, -2]

            res[:, -1] = stats.f.sf(
                f_stats, len(test_vars), n_indiv_test - n_cov - var_size
            )

        elif family == "logistic":
            if "max_iter" not in logistic_kwargs:
                logistic_kwargs["max_iter"] = 100
            if "tol" not in logistic_kwargs:
                logistic_kwargs["tol"] = 1e-6

            tinygwas.logistic_lrt(
                var,
                cov,
                pheno,
                var_size,
                test_vars,
                res,
                logistic_kwargs["max_iter"],
                logistic_kwargs["tol"],
            )
            # convert lrt diff to pvalue
            res[:, -1] = stats.chi2.sf(2 * res[:, -1], len(test_vars))
        else:
            raise ValueError(f"Unknown family: {family}")
    else:
        # statsmodels implementation
        n_indiv = var.shape[0]
        design = np.zeros((n_indiv, var_size + n_cov))
        design[:, var_size : var_size + n_cov] = cov

        if family == "linear":
            reg_method = lambda pheno, design, start_params=None: sm.OLS(
                pheno, design, missing="drop"
            ).fit(disp=0, start_params=start_params)
        elif family == "logistic":
            reg_method = lambda pheno, design, start_params=None: sm.Logit(
                pheno, design, missing="drop"
            ).fit(disp=0, start_params=start_params)
        else:
            raise NotImplementedError

        reduced_index = np.concatenate(
            [
                [i for i in range(var_size) if i not in test_vars],
                np.arange(var_size, var_size + n_cov),
            ]
        ).astype(int)

        f_test_r_matrix = np.zeros((len(test_vars), design.shape[1]))
        for i, v in enumerate(test_vars):
            f_test_r_matrix[i, v] = 1

        res = np.zeros((n_var, var_size * 2 + 2))

        for i in range(n_var):
            design[:, 0:var_size] = var[:, i * var_size : (i + 1) * var_size]
            model = reg_method(pheno, design)
            # coefficients
            res[i, 0 : var_size * 2 : 2] = model.params[0:var_size]
            # standard errors
            res[i, 1 : var_size * 2 : 2] = model.bse[0:var_size]
            res[i, -2] = model.nobs
            # sample size
            if family == "linear":
                # f-test using statsmodels
                try:
                    p = model.f_test(f_test_r_matrix).pvalue.item()
                except sm_exceptions.ValueWarning:
                    p = np.nan
                res[i, -1] = p

            elif family == "logistic":
                # more than one test variables
                model_reduced = reg_method(
                    pheno,
                    design[:, reduced_index],
                    start_params=model.params[reduced_index],
                )
                # determine p-values using difference in log-likelihood
                # and difference in degrees of freedom
                p = stats.chi2.sf(
                    -2 * (model_reduced.llf - model.llf),
                    (model.df_model - model_reduced.df_model),
                )
                res[i, -1] = p
    return res


[docs]def marginal( dset: admix.Dataset = None, geno: da.Array = None, lanc: da.Array = None, pheno: np.ndarray = None, n_anc: Optional[int] = None, cov: Optional[np.ndarray] = None, method: str = "ATT", family: str = "linear", fast: bool = True, ): """Marginal association testing for one SNP at a time Parameters ---------- dset : admix.Dataset data set geno : da.Array genotype (n_snp, n_indiv, 2) matrix lanc : da.Array local ancestry (n_snp, n_indiv, 2) matrix pheno : np.ndarray phenotype (n_snp, ) n_anc : int number of ancestral populations, if not specified, inferred from lanc cov : np.ndarray, optional Covariate matrix, by default None. Do NOT include `1` intercept method : str, optional methods used for association testing, by default "ATT", one of ["ATT", "TRACTOR", "JOINT", "ADM", "SNP1", "ASE", "HET"] family : str, optional family of phenotype, by default "linear" fast : bool, optional use fast implementation, by default True Returns ------- np.ndarray Association p-values for each SNP being tested """ assert family in ["linear", "logistic"], "Unknown family" # check phenotype # nan values are not allowed assert pheno is not None, "Must specify `pheno`" assert np.all(np.isfinite(pheno)), "pheno must not contain NaN values" if family == "logistic": assert np.all( (pheno == 0) | (pheno == 1) ), "When family='logistic', pheno must be 0 or 1" if family == "logistic": if len(pheno) < 1000: admix.logger.warn( "logistic family is known to be unstable with small sample size (N < 1,000)" + "NaN values in the returned p-values may be caused by this." ) # format data assert method in ["ATT", "TRACTOR", "JOINT", "ADM", "SNP1", "ASE", "HET"] if dset is not None: assert (geno is None) and ( lanc is None ), "Cannot specify both `dset` and `geno`, `lanc`" geno = dset.geno lanc = dset.lanc n_anc = dset.n_anc else: assert ( (geno is not None) and (lanc is not None) and (n_anc is not None) ), "Must specify `dset` or (`geno`, `lanc`, `n_anc`)" # convert geno and lanc to da.Array when necessary if not isinstance(geno, da.Array): geno = da.from_array(geno, chunks=-1) if not isinstance(lanc, da.Array): lanc = da.from_array(lanc, chunks=-1) assert np.all(geno.shape == lanc.shape), "geno and lanc must have same shape" n_snp, n_indiv = geno.shape[0:2] # process covariates if cov is None: cov = np.ones((n_indiv, 1)) else: assert cov.shape[0] == n_indiv, "cov must have same number of rows as pheno" # prepend a column of ones to the covariates cov = np.hstack((np.ones((n_indiv, 1)), cov)) assert cov is not None # impute missing values when needed if np.isnan(cov).any(): admix.logger.info( "NaN found in covariates, impute with column mean for each covariate." ) # fill nan with column mean debug_old_mean = np.nanmean(cov, axis=0) cov = admix.data.impute_with_mean(cov, axis=0) assert np.allclose( np.nanmean(cov, axis=0), debug_old_mean, equal_nan=True ), "NaN imputation failed" # check covariates must be full rank assert np.linalg.matrix_rank(cov) == cov.shape[1], "Covariates must be of full rank" if method == "ATT": # test genotype dosage var = geno.sum(axis=2).swapaxes(0, 1) var_size = 1 var_names = ["G"] test_vars = [0] elif method == "SNP1": # test genotype dosage, condition on local ancestry var = da.empty((n_indiv, n_snp * n_anc)) var[:, 0::n_anc] = geno.sum(axis=2).swapaxes(0, 1) for i in range(n_anc - 1): var[:, (1 + i) :: n_anc] = (lanc == i).sum(axis=2).swapaxes(0, 1) var_size = n_anc var_names = ["G"] + [f"L{i + 1}" for i in range(n_anc - 1)] test_vars = [0] elif method == "TRACTOR": # test ancestry-specfic genotype dosage, condition on local ancestry allele_per_anc = admix.data.allele_per_anc( geno, lanc, n_anc=n_anc, ).swapaxes(0, 1) var = da.empty((n_indiv, n_snp * (2 * n_anc - 1))) # allele per ancestor per SNP for i in range(n_anc): var[:, i :: (2 * n_anc - 1)] = allele_per_anc[:, :, i] # number of ancestries per SNP for i in range(n_anc - 1): var[:, (i + n_anc) :: (2 * n_anc - 1)] = ( (lanc == i).sum(axis=2).swapaxes(0, 1) ) var_size = 2 * n_anc - 1 var_names = [f"G{i + 1}" for i in range(n_anc)] + [ f"L{i + 1}" for i in range(n_anc - 1) ] test_vars = [i for i in range(n_anc)] elif method == "JOINT": # joint test of ancestry-specfic genotype dosage AND local ancestry allele_per_anc = admix.data.allele_per_anc( geno, lanc, n_anc=n_anc, ).swapaxes(0, 1) var = da.empty((n_indiv, n_snp * (2 * n_anc - 1))) # allele per ancestor per SNP for i in range(n_anc): var[:, i :: (2 * n_anc - 1)] = allele_per_anc[:, :, i] # number of ancestries per SNP for i in range(n_anc - 1): var[:, (i + n_anc) :: (2 * n_anc - 1)] = ( (lanc == i).sum(axis=2).swapaxes(0, 1) ) var_size = 2 * n_anc - 1 var_names = [f"G{i + 1}" for i in range(n_anc)] + [ f"L{i + 1}" for i in range(n_anc - 1) ] # test both genotype and local ancestry test_vars = [i for i in range(var_size)] elif method == "ASE": # test ancestry-specfic genotype dosage, without conditioning on local ancestry allele_per_anc = admix.data.allele_per_anc(geno, lanc, n_anc=n_anc).swapaxes( 0, 1 ) var = da.empty((n_indiv, n_snp * n_anc)) for i in range(n_anc): var[:, i::n_anc] = allele_per_anc[:, :, i] var_size = n_anc var_names = [f"G{i + 1}" for i in range(n_anc)] test_vars = [i for i in range(n_anc)] elif method == "ADM": # test local ancestry var = da.empty((n_indiv, n_snp * (n_anc - 1))) for i in range(n_anc - 1): var[:, i :: (n_anc - 1)] = (lanc == i).sum(axis=2).swapaxes(0, 1) var_size = n_anc - 1 var_names = [f"L{i + 1}" for i in range(n_anc - 1)] test_vars = [i for i in range(n_anc - 1)] elif method == "HET": allele_per_anc = admix.data.allele_per_anc(geno, lanc, n_anc=n_anc).swapaxes( 0, 1 ) var = da.empty((n_indiv, n_snp * n_anc)) for i in range(n_anc): var[:, i::n_anc] = allele_per_anc[:, :, i] var_size = n_anc var_names = [f"G{i + 1}" for i in range(n_anc)] test_vars = [i for i in range(n_anc)] else: raise NotImplementedError # iterate over block of SNPs assert var.shape[1] % var_size == 0, "var must have multiple of `var_size` columns" assert var.shape[1] / var_size == n_snp res = [] # block-by-block computation based on the chunk size of the `geno` array if geno is not None: snp_chunks = geno.chunks[0] else: assert lanc is not None snp_chunks = lanc.chunks[0] for snp_start, snp_stop in tqdm( admix.data.index_over_chunks(snp_chunks), desc="admix.assoc.marginal", total=len(snp_chunks), ): if method == "HET": test_func = _block_het_test else: test_func = _block_test # test each SNP in block res.append( test_func( var=var[:, snp_start * var_size : snp_stop * var_size], cov=cov, pheno=pheno, var_size=var_size, test_vars=test_vars, family=family, fast=fast, ) ) # columns will be BETA1, SE1, BETA2, SE2, ... with n_anc columns = list( itertools.chain.from_iterable([[f"{v}_BETA", f"{v}_SE"] for v in var_names]) ) + ["N", "P"] df_res = pd.DataFrame( np.concatenate(res), columns=columns, index=dset.snp.index if dset is not None else None, ).astype({"N": "int"}) df_res.loc[df_res.P.isna(), df_res.columns != "N"] = np.nan return df_res
def marginal_simple(dset: admix.Dataset, pheno: np.ndarray) -> np.ndarray: """Simple marginal association testing for one SNP at a time Useful in simulation study because this will be very fast Parameters ---------- dset : admix.Dataset Dataset containing the (n_indiv, n_snp) genotype matrix, dset.geno pheno : np.ndarray (n_snp, n_sim) phenotype matrix Returns ------- coef : np.ndarray (n_snp, n_sim) marginal association coefficient coef_se: np.ndarray (n_snp, n_sim) marginal association coefficient standard error zscores : np.ndarray (n_snp, n_sim) association z-scores for each SNP being tested Examples -------- To check the consistency of results of standard methods >>> n_indiv = dset_admix.dims["indiv"] >>> n_cov = 1 >>> geno = _impute_with_mean(dset_admix.geno.values) >>> geno = (geno - geno.mean(axis=0)) / geno.std(axis=0) >>> f_stats = tinygwas.linear_f_test(geno, np.ones((n_indiv, 1)), sim["pheno"][:, 0], 1, [0]) >>> p_vals = stats.f.sf(f_stats, 1, n_indiv - n_cov - 1) >>> zscores2 = stats.norm.ppf(p_vals / 2) * np.sign(zscores[:, 0]) >>> dset = admix.Dataset({"geno": (["indiv", "snp"], geno), "pheno": (["snp", "sim"], pheno)}) >>> zscores = marginal_simple(dset, pheno) """ geno = dset.geno n_indiv, n_snp = geno.shape assert ( n_indiv == pheno.shape[0] ), "Number of individuals in genotype and phenotype do not match" n_sim = pheno.shape[1] # center phenotype for each simulation Y = pheno - pheno.mean(axis=0) X = geno - da.nanmean(geno, axis=0) XtY, snp_var = admix.data.geno_mult_mat( X, Y, transpose_geno=True, return_snp_var=True ) XtX = snp_var * n_indiv coef = XtY / XtX[:, np.newaxis] coef_var = np.var(Y, axis=0) / XtX[:, np.newaxis] coef_se = np.sqrt(coef_var) zscores = coef / coef_se return coef, coef_se, zscores # def mixscore_wrapper(pheno, anc, geno, theta, # scores=["ADM", "ATT", "MIX", "SNP1", "SUM"], # mixscore_path="/u/project/pasaniuc/kangchen/tractor/software/mixscore-1.3/bin/mixscore", # verbose=False): # """ # A python wrapper for mixscore # # Args # ---- # pheno: phenotypes # anc: ancestry # geno: genotype # theta: global ancestry component # """ # # tmp = tempfile.TemporaryDirectory() # tmp_dir = tmp.name # # n_sample = len(pheno) # n_snp = anc.shape[1] # # write_int_mat(join(tmp_dir, "pheno"), pheno.reshape((1, -1))) # write_int_mat(join(tmp_dir, "anc"), anc.T) # write_int_mat(join(tmp_dir, "geno"), geno.T) # np.savetxt(join(tmp_dir, "theta"), theta, fmt='%.6f') # # param = {"nsamples": str(n_sample), # "nsnps": str(n_snp), # "phenofile": join(tmp_dir, "pheno"), # "ancfile": join(tmp_dir, "anc"), # "genofile": join(tmp_dir, "geno"), # "thetafile": join(tmp_dir, "theta"), # "outfile": join(tmp_dir, "out")} # # with open(join(tmp_dir, "param"), 'w') as f: # f.writelines([k + ':' + param[k] + '\n' for k in param]) # # rls_dict = {} # for name in scores: # if verbose: # print(f"Calculating {name}...") # # cmd = ' '.join([mixscore_path, name, f"{tmp_dir}/param"]) # subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT) # with open(param["outfile"]) as f: # out = [line.strip() for line in f.readlines()] # rls_dict[name] = out # tmp.cleanup() # score_df = pd.DataFrame(rls_dict).apply(pd.to_numeric, errors='coerce') # # convert to p-value # for name in score_df.columns: # score_df[name] = stats.chi2.sf(score_df[name], df=(2 if name == "SUM" else 1)) # return score_df