From 6cd081931e0e124e98a5f9a8f89192bf94af8e60 Mon Sep 17 00:00:00 2001 From: Ukyeon Date: Fri, 28 Apr 2023 20:19:30 -0400 Subject: [PATCH 1/4] test --- dynamo/estimation/csc/velocity.py | 485 +++++++++++------------------- dynamo/tools/dynamics.py | 2 +- 2 files changed, 180 insertions(+), 307 deletions(-) diff --git a/dynamo/estimation/csc/velocity.py b/dynamo/estimation/csc/velocity.py index 8588836fa..355200103 100755 --- a/dynamo/estimation/csc/velocity.py +++ b/dynamo/estimation/csc/velocity.py @@ -2,7 +2,7 @@ from multiprocessing.dummy import Pool as ThreadPool from warnings import warn -from scipy.sparse import csr_matrix +import numpy as np from tqdm import tqdm from ...tools.moments import calc_2nd_moment, calc_12_mom_labeling @@ -496,287 +496,73 @@ def fit( if self.extyp.lower() in ["conventional", "kin"]: if self.model.lower() == "deterministic": if np.all(self._exist_data("uu", "su")): - self.parameters["beta"] = np.ones(n_genes) - gamma, gamma_intercept, gamma_r2, gamma_logLL = ( - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - ) + # gamma, gamma_intercept, gamma_r2, gamma_logLL = np.zeros((n_genes, 4)) U = self.data["uu"] if self.data["ul"] is None else self.data["uu"] + self.data["ul"] S = self.data["su"] if self.data["sl"] is None else self.data["su"] + self.data["sl"] - if cores == 1: - for i in tqdm(range(n_genes), desc="estimating gamma"): - ( - gamma[i], - gamma_intercept[i], - _, - gamma_r2[i], - _, - gamma_logLL[i], - ) = self.fit_gamma_steady_state(U[i], S[i], intercept, perc_left, perc_right) - else: - pool = ThreadPool(cores) - res = pool.starmap( - self.fit_gamma_steady_state, - zip( - U, - S, - itertools.repeat(intercept), - itertools.repeat(perc_left), - itertools.repeat(perc_right), - ), - ) - pool.close() - pool.join() - ( - gamma, - gamma_intercept, - _, - gamma_r2, - _, - gamma_logLL, - ) = zip(*res) - (gamma, gamma_intercept, gamma_r2, gamma_logLL) = ( - np.array(gamma), - np.array(gamma_intercept), - np.array(gamma_r2), - np.array(gamma_logLL), - ) - ( - self.parameters["gamma"], - self.aux_param["gamma_intercept"], - self.aux_param["gamma_r2"], - self.aux_param["gamma_logLL"], - ) = (gamma, gamma_intercept, gamma_r2, gamma_logLL) elif np.all(self._exist_data("uu", "ul")): - self.parameters["beta"] = np.ones(n_genes) - gamma, gamma_intercept, gamma_r2, gamma_logLL = ( - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - ) U = self.data["ul"] S = self.data["uu"] + self.data["ul"] - if cores == 1: - for i in tqdm(range(n_genes), desc="estimating gamma"): - ( - gamma[i], - gamma_intercept[i], - _, - gamma_r2[i], - _, - gamma_logLL[i], - ) = self.fit_gamma_steady_state(U[i], S[i], intercept, perc_left, perc_right) - else: - pool = ThreadPool(cores) - res = pool.starmap( - self.fit_gamma_steady_state, - zip( - U, - S, - itertools.repeat(intercept), - itertools.repeat(perc_left), - itertools.repeat(perc_right), - ), - ) - pool.close() - pool.join() - ( - gamma, - gamma_intercept, - _, - gamma_r2, - _, - gamma_logLL, - ) = zip(*res) - (gamma, gamma_intercept, gamma_r2, gamma_logLL) = ( - np.array(gamma), - np.array(gamma_intercept), - np.array(gamma_r2), - np.array(gamma_logLL), - ) - ( - self.parameters["gamma"], - self.aux_param["gamma_intercept"], - self.aux_param["gamma_r2"], - self.aux_param["gamma_logLL"], - ) = (gamma, gamma_intercept, gamma_r2, gamma_logLL) + + res = _parallel_wrapper( + fit_gamma_stochastic_helper, + zip( + U, + S, + itertools.repeat(intercept), + itertools.repeat(perc_left), + itertools.repeat(perc_right), + ), + ) + gamma, gamma_intercept, _, gamma_r2, _, gamma_logLL = res + + self.parameters["beta"] = np.ones(n_genes) + self.parameters["gamma"] = np.array(gamma) + self.aux_param["gamma_intercept"] = np.array(gamma_intercept) + self.aux_param["gamma_r2"] = np.array(gamma_r2) + self.aux_param["gamma_logLL"] = np.array(gamma_logLL) + elif self.model.lower() == "stochastic": if np.all(self._exist_data("uu", "su")): - self.parameters["beta"] = np.ones(n_genes) - gamma, gamma_intercept, gamma_r2, gamma_logLL, bs, bf = ( - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - ) U = self.data["uu"] if self.data["ul"] is None else self.data["uu"] + self.data["ul"] S = self.data["su"] if self.data["sl"] is None else self.data["su"] + self.data["sl"] - US = ( - self.data["us"] - if self.data["us"] is not None - else calc_2nd_moment(U.T, S.T, self.conn, mX=U.T, mY=S.T).T - ) - S2 = ( - self.data["s2"] - if self.data["s2"] is not None - else calc_2nd_moment(S.T, S.T, self.conn, mX=S.T, mY=S.T).T - ) - if cores == 1: - for i in tqdm(range(n_genes), desc="estimating gamma"): - ( - gamma[i], - gamma_intercept[i], - _, - gamma_r2[i], - _, - gamma_logLL[i], - bs[i], - bf[i], - ) = self.fit_gamma_stochastic( - self.est_method, - U[i], - S[i], - US[i], - S2[i], - perc_left=perc_left, - perc_right=perc_right, - normalize=True, - ) - else: - pool = ThreadPool(cores) - res = pool.starmap( - self.fit_gamma_stochastic, - zip( - itertools.repeat(self.est_method), - U, - S, - US, - S2, - itertools.repeat(perc_left), - itertools.repeat(perc_right), - itertools.repeat(True), - ), - ) - pool.close() - pool.join() - ( - gamma, - gamma_intercept, - _, - gamma_r2, - _, - gamma_logLL, - bs, - bf, - ) = zip(*res) - (gamma, gamma_intercept, gamma_r2, gamma_logLL, bs, bf,) = ( - np.array(gamma), - np.array(gamma_intercept), - np.array(gamma_r2), - np.array(gamma_logLL), - np.array(bs), - np.array(bf), - ) - ( - self.parameters["gamma"], - self.aux_param["gamma_intercept"], - self.aux_param["gamma_r2"], - self.aux_param["gamma_logLL"], - self.aux_param["bs"], - self.aux_param["bf"], - ) = (gamma, gamma_intercept, gamma_r2, gamma_logLL, bs, bf) elif np.all(self._exist_data("uu", "ul")): - self.parameters["beta"] = np.ones(n_genes) - gamma, gamma_intercept, gamma_r2, gamma_logLL, bs, bf = ( - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - ) U = self.data["ul"] S = self.data["uu"] + self.data["ul"] - US = ( - self.data["us"] - if self.data["us"] is not None - else calc_2nd_moment(U.T, S.T, self.conn, mX=U.T, mY=S.T).T - ) - S2 = ( - self.data["s2"] - if self.data["s2"] is not None - else calc_2nd_moment(S.T, S.T, self.conn, mX=S.T, mY=S.T).T - ) - if cores == 1: - for i in tqdm(range(n_genes), desc="estimating gamma"): - ( - gamma[i], - gamma_intercept[i], - _, - gamma_r2[i], - _, - gamma_logLL[i], - bs[i], - bf[i], - ) = self.fit_gamma_stochastic( - self.est_method, - U[i], - S[i], - US[i], - S2[i], - perc_left=perc_left, - perc_right=perc_right, - normalize=True, - ) - else: - pool = ThreadPool(cores) - res = pool.starmap( - self.fit_gamma_stochastic, - zip( - itertools.repeat(self.est_method), - U, - S, - US, - S2, - itertools.repeat(perc_left), - itertools.repeat(perc_right), - itertools.repeat(True), - ), - ) - pool.close() - pool.join() - ( - gamma, - gamma_intercept, - _, - gamma_r2, - _, - gamma_logLL, - bs, - bf, - ) = zip(*res) - (gamma, gamma_intercept, gamma_r2, gamma_logLL, bs, bf,) = ( - np.array(gamma), - np.array(gamma_intercept), - np.array(gamma_r2), - np.array(gamma_logLL), - np.array(bs), - np.array(bf), - ) - ( - self.parameters["gamma"], - self.aux_param["gamma_intercept"], - self.aux_param["gamma_r2"], - self.aux_param["gamma_logLL"], - self.aux_param["bs"], - self.aux_param["bf"], - ) = (gamma, gamma_intercept, gamma_r2, gamma_logLL, bs, bf) + US = ( + self.data["us"] + if self.data["us"] is not None + else calc_2nd_moment(U.T, S.T, self.conn, mX=U.T, mY=S.T).T + ) + S2 = ( + self.data["s2"] + if self.data["s2"] is not None + else calc_2nd_moment(S.T, S.T, self.conn, mX=S.T, mY=S.T).T + ) + + res = _parallel_wrapper( + fit_gamma_stochastic_helper, + zip( + itertools.repeat(self.est_method), + U, + S, + US, + S2, + itertools.repeat(perc_left), + itertools.repeat(perc_right), + itertools.repeat(True), + ), + ) + gamma, gamma_intercept, _, gamma_r2, _, gamma_logLL, bs, bf = res + + self.parameters["beta"] = np.ones(n_genes) + self.parameters["gamma"] = np.array(gamma) + self.aux_param["gamma_intercept"] = np.array(gamma_intercept) + self.aux_param["gamma_r2"] = np.array(gamma_r2) + self.aux_param["gamma_logLL"] = np.array(gamma_logLL) + self.aux_param["bs"] = np.array(bs) + self.aux_param["bf"] = np.array(bf) + else: if self.extyp.lower() == "deg": if np.all(self._exist_data("ul", "sl")): @@ -792,44 +578,23 @@ def fit( if self._exist_data("uu"): # alpha estimation uu_m, uu_v, _ = calc_12_mom_labeling(self.data["uu"], self.t) - alpha, uu0, r2 = ( - np.zeros((n_genes, 1)), - np.zeros(n_genes), - np.zeros(n_genes), + + res = _parallel_wrapper( + fit_alpha_degradation, + zip( + itertools.repeat(t_uniq), + uu_m, + self.parameters["beta"], + itertools.repeat(True), + ), ) - if cores == 1: - for i in range(n_genes): - alpha[i], uu0[i], r2[i] = fit_alpha_degradation( - t_uniq, - uu_m[i], - self.parameters["beta"][i], - intercept=True, - ) - else: - pool = ThreadPool(cores) - res = pool.starmap( - fit_alpha_degradation, - zip( - itertools.repeat(t_uniq), - uu_m, - self.parameters["beta"], - itertools.repeat(True), - ), - ) - pool.close() - pool.join() - (alpha, uu0, r2) = zip(*res) - (alpha, uu0, r2) = ( - np.array(alpha), - np.array(uu0), - np.array(r2), - ) - ( - self.parameters["alpha"], - self.aux_param["alpha_intercept"], - self.aux_param["uu0"], - self.aux_param["alpha_r2"], - ) = (alpha, uu0, uu0, r2) + alpha, uu0, r2 = res + + self.parameters["alpha"] = np.array(alpha) + # self.aux_param["alpha_intercept"] = np.array(uu0) + self.aux_param["uu0"] = np.array(uu0) + self.aux_param["alpha_r2"] = np.array(r2) + elif self._exist_data("ul"): # gamma estimation # use mean + var for fitting degradation parameter k @@ -1924,3 +1689,111 @@ def get_exist_data_names(self): if v is not None: ret.append(k) return ret + + +def fit_gamma_stochastic_helper(args): + est_method, U, S, US, S2, perc_left, perc_right, boolean = args + return _fit_gamma_stochastic(est_method, U, S, US, S2, perc_left, perc_right, boolean) + + +def _fit_gamma_stochastic( + est_method, + u, + s, + us, + ss, + perc_left=None, + perc_right=5, + normalize=True, +): + """Estimate gamma using GMM (generalized method of moments) or negbin distrubtion based on the steady state assumption. + + Arguments + --------- + est_method: `str` {`gmm`, `negbin`} The estimation method to be used when using the `stochastic` model. + * Available options when the `model` is 'ss' include: + (2) 'gmm': The new generalized methods of moments from us that is based on master equations, similar to the + "moment" model in the excellent scVelo package; + (3) 'negbin': The new method from us that models steady state RNA expression as a negative binomial distribution, + also built upon on master equations. + Note that all those methods require using extreme data points (except negbin, which use all data points) for + estimation. Extreme data points are defined as the data from cells whose expression of unspliced / spliced + or new / total RNA, etc. are in the top or bottom, 5%, for example. `linear_regression` only considers the mean of + RNA species (based on the `deterministic` ordinary different equations) while moment based methods (`gmm`, `negbin`) + considers both first moment (mean) and second moment (uncentered variance) of RNA species (based on the `stochastic` + master equations). + The above method are all (generalized) linear regression based method. In order to return estimated parameters + (including RNA half-life), it additionally returns R-squared (either just for extreme data points or all data points) + as well as the log-likelihood of the fitting, which will be used for transition matrix and velocity embedding. + All `est_method` uses least square to estimate optimal parameters with latin cubic sampler for initial sampling. + u: :class:`~numpy.ndarray` or sparse `csr_matrix` + A matrix of unspliced mRNA counts. Dimension: genes x cells. + s: :class:`~numpy.ndarray` or sparse `csr_matrix` + A matrix of spliced mRNA counts. Dimension: genes x cells. + us: :class:`~numpy.ndarray` or sparse `csr_matrix` + A matrix of unspliced mRNA counts. Dimension: genes x cells. + ss: :class:`~numpy.ndarray` or sparse `csr_matrix` + A matrix of spliced mRNA counts. Dimension: genes x cells. + perc_left: float + The percentage of samples included in the linear regression in the left tail. If set to None, then all the left samples are excluded. + perc_right: float + The percentage of samples included in the linear regression in the right tail. If set to None, then all the samples are included. + normalize: bool + Whether to first normalize the + + Returns + ------- + k: float + The slope of the linear regression model, which is gamma under the steady state assumption. + b: float + The intercept of the linear regression model. + r2: float + Coefficient of determination or r square for the extreme data points. + r2: float + Coefficient of determination or r square for the extreme data points. + all_r2: float + Coefficient of determination or r square for all data points. + """ + u = u.A.flatten() if issparse(u) else u.flatten() + s = s.A.flatten() if issparse(s) else s.flatten() + us = us.A.flatten() if issparse(us) else us.flatten() + ss = ss.A.flatten() if issparse(ss) else ss.flatten() + + mask = find_extreme( + s, + u, + normalize=normalize, + perc_left=perc_left, + perc_right=perc_right, + ) + + bs, bf = None, None + if est_method.lower() == "gmm": + k = fit_stochastic_linreg(u[mask], s[mask], us[mask], ss[mask]) + elif est_method.lower() == "negbin": + phi = compute_dispersion(s, ss) + k = fit_k_negative_binomial(u[mask], s[mask], ss[mask], phi) + bs, bf = compute_bursting_properties(np.mean(u[mask] + s[mask]), phi, k) + + r2, all_r2 = calc_R2(s[mask], u[mask], k), calc_R2(s, u, k) + logLL, all_logLL = ( + calc_norm_loglikelihood(s[mask], u[mask], k), + calc_norm_loglikelihood(s, u, k), + ) + + return (k, 0, r2, all_r2, logLL, all_logLL, bs, bf) + + +def _parallel_wrapper(func, args_list): + import multiprocessing as mp + import os + + cores = os.cpu_count() + ctx = mp.get_context("fork") + + with ctx.Pool(cores) as pool: + results = pool.map(func, args_list) + pool.close() + pool.join() + + return [np.array(result) for result in zip(*results)] diff --git a/dynamo/tools/dynamics.py b/dynamo/tools/dynamics.py index 977c623ca..f6dd72874 100755 --- a/dynamo/tools/dynamics.py +++ b/dynamo/tools/dynamics.py @@ -454,7 +454,7 @@ def dynamics( assumption_mRNA = "ss" elif experiment_type.lower() in ["mix_pulse_chase", "deg", "kin"]: assumption_mRNA = "kinetic" - + assumption_mRNA = "ss" ####### if model.lower() == "stochastic" and experiment_type.lower() not in [ "conventional", "kinetics", From 9bb1b0f92c1069e2cc0b0181ce34c321ef9db29f Mon Sep 17 00:00:00 2001 From: Ukyeon Date: Thu, 4 May 2023 18:54:31 -0400 Subject: [PATCH 2/4] more optimization --- dynamo/estimation/csc/velocity.py | 460 ++++++++++-------------------- 1 file changed, 146 insertions(+), 314 deletions(-) diff --git a/dynamo/estimation/csc/velocity.py b/dynamo/estimation/csc/velocity.py index 355200103..4c9a1a207 100755 --- a/dynamo/estimation/csc/velocity.py +++ b/dynamo/estimation/csc/velocity.py @@ -494,17 +494,16 @@ def fit( cores = max(1, int(self.cores)) # fit mRNA if self.extyp.lower() in ["conventional", "kin"]: + if np.all(self._exist_data("uu", "su")): + U = self.data["uu"] if self.data["ul"] is None else self.data["uu"] + self.data["ul"] + S = self.data["su"] if self.data["sl"] is None else self.data["su"] + self.data["sl"] + elif np.all(self._exist_data("uu", "ul")): + U = self.data["ul"] + S = self.data["uu"] + self.data["ul"] + if self.model.lower() == "deterministic": - if np.all(self._exist_data("uu", "su")): - # gamma, gamma_intercept, gamma_r2, gamma_logLL = np.zeros((n_genes, 4)) - U = self.data["uu"] if self.data["ul"] is None else self.data["uu"] + self.data["ul"] - S = self.data["su"] if self.data["sl"] is None else self.data["su"] + self.data["sl"] - elif np.all(self._exist_data("uu", "ul")): - U = self.data["ul"] - S = self.data["uu"] + self.data["ul"] - - res = _parallel_wrapper( - fit_gamma_stochastic_helper, + gamma, gamma_intercept, _, gamma_r2, _, gamma_logLL = _parallel_wrapper( + _fit_gamma_steady_state_helper, zip( U, S, @@ -513,8 +512,6 @@ def fit( itertools.repeat(perc_right), ), ) - gamma, gamma_intercept, _, gamma_r2, _, gamma_logLL = res - self.parameters["beta"] = np.ones(n_genes) self.parameters["gamma"] = np.array(gamma) self.aux_param["gamma_intercept"] = np.array(gamma_intercept) @@ -522,13 +519,6 @@ def fit( self.aux_param["gamma_logLL"] = np.array(gamma_logLL) elif self.model.lower() == "stochastic": - if np.all(self._exist_data("uu", "su")): - U = self.data["uu"] if self.data["ul"] is None else self.data["uu"] + self.data["ul"] - S = self.data["su"] if self.data["sl"] is None else self.data["su"] + self.data["sl"] - elif np.all(self._exist_data("uu", "ul")): - U = self.data["ul"] - S = self.data["uu"] + self.data["ul"] - US = ( self.data["us"] if self.data["us"] is not None @@ -540,8 +530,8 @@ def fit( else calc_2nd_moment(S.T, S.T, self.conn, mX=S.T, mY=S.T).T ) - res = _parallel_wrapper( - fit_gamma_stochastic_helper, + gamma, gamma_intercept, _, gamma_r2, _, gamma_logLL, bs, bf = _parallel_wrapper( + _fit_gamma_stochastic_helper, zip( itertools.repeat(self.est_method), U, @@ -553,8 +543,6 @@ def fit( itertools.repeat(True), ), ) - gamma, gamma_intercept, _, gamma_r2, _, gamma_logLL, bs, bf = res - self.parameters["beta"] = np.ones(n_genes) self.parameters["gamma"] = np.array(gamma) self.aux_param["gamma_intercept"] = np.array(gamma_intercept) @@ -575,11 +563,12 @@ def fit( self.aux_param["ul0"], self.aux_param["sl0"], ) = self.fit_beta_gamma_lsq(t_uniq, ul_m, sl_m) + if self._exist_data("uu"): # alpha estimation uu_m, uu_v, _ = calc_12_mom_labeling(self.data["uu"], self.t) - res = _parallel_wrapper( + alpha, uu0, r2 = _parallel_wrapper( fit_alpha_degradation, zip( itertools.repeat(t_uniq), @@ -588,10 +577,8 @@ def fit( itertools.repeat(True), ), ) - alpha, uu0, r2 = res - self.parameters["alpha"] = np.array(alpha) - # self.aux_param["alpha_intercept"] = np.array(uu0) + self.aux_param["alpha_intercept"] = np.array(uu0) self.aux_param["uu0"] = np.array(uu0) self.aux_param["alpha_r2"] = np.array(r2) @@ -603,47 +590,25 @@ def fit( self.parameters["gamma"], self.aux_param["ul0"], ) = self.fit_gamma_nosplicing_lsq(t_uniq, ul_m) + if self._exist_data("uu"): # alpha estimation - alpha, alpha_b, alpha_r2 = ( - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - ) uu_m, uu_v, _ = calc_12_mom_labeling(self.data["uu"], self.t) - if cores == 1: - for i in tqdm(range(n_genes), desc="estimating alpha"): - (alpha[i], alpha_b[i], alpha_r2[i],) = fit_alpha_degradation( - t_uniq, - uu_m[i], - self.parameters["gamma"][i], - intercept=True, - ) - else: - pool = ThreadPool(cores) - res = pool.starmap( - fit_alpha_degradation, - zip( - itertools.repeat(t_uniq), - uu_m, - self.parameters["gamma"], - itertools.repeat(True), - ), - ) - pool.close() - pool.join() - (alpha, alpha_b, alpha_r2) = zip(*res) - (alpha, alpha_b, alpha_r2) = ( - np.array(alpha), - np.array(alpha_b), - np.array(alpha_r2), - ) - ( - self.parameters["alpha"], - self.aux_param["alpha_intercept"], - self.aux_param["uu0"], - self.aux_param["alpha_r2"], - ) = (alpha, alpha_b, alpha_b, alpha_r2) + + alpha, alpha_b, alpha_r2 = _parallel_wrapper( + fit_alpha_degradation, + zip( + itertools.repeat(t_uniq), + uu_m, + self.parameters["gamma"], + itertools.repeat(True), + ), + ) + self.parameters["alpha"] = np.array(alpha) + self.aux_param["alpha_intercept"] = np.array(alpha_b) + self.aux_param["uu0"] = np.array(alpha_b) + self.aux_param["alpha_r2"] = np.array(alpha_r2) + elif (self.extyp.lower() == "kin" or self.extyp.lower() == "one-shot") and len(np.unique(self.t)) > 1: if np.all(self._exist_data("ul", "uu", "su")): if not self._exist_parameter("beta"): @@ -659,26 +624,18 @@ def fit( ) = self.fit_beta_gamma_lsq(t_uniq, uu_m, su_m) # alpha estimation ul_m, ul_v, t_uniq = calc_12_mom_labeling(self.data["ul"], self.t) - alpha = np.zeros(n_genes) + # let us only assume one alpha for each gene in all cells - if cores == 1: - for i in tqdm(range(n_genes), desc="estimating alpha"): - # for j in range(len(self.data['ul'][i])): - alpha[i] = fit_alpha_synthesis(t_uniq, ul_m[i], self.parameters["beta"][i]) - else: - pool = ThreadPool(cores) - alpha = pool.starmap( - fit_alpha_synthesis, - zip( - itertools.repeat(t_uniq), - ul_m, - self.parameters["beta"], - ), - ) - pool.close() - pool.join() - alpha = np.array(alpha) - self.parameters["alpha"] = alpha + alpha = _parallel_wrapper( + fit_alpha_synthesis, + zip( + itertools.repeat(t_uniq), + ul_m, + self.parameters["beta"], + ), + ) + self.parameters["alpha"] = np.array(alpha) + elif np.all(self._exist_data("ul", "uu")): n_genes = self.data["uu"].shape[0] # self.get_n_genes(data=U) u0, gamma = np.zeros(n_genes), np.zeros(n_genes) @@ -692,26 +649,17 @@ def fit( alpha = np.zeros(n_genes) # let us only assume one alpha for each gene in all cells ul_m, ul_v, _ = calc_12_mom_labeling(self.data["ul"], self.t) - if cores == 1: - for i in tqdm(range(n_genes), desc="estimating gamma"): - # for j in range(len(self.data['ul'][i])): - alpha[i] = fit_alpha_synthesis(t_uniq, ul_m[i], self.parameters["gamma"][i]) - else: - pool = ThreadPool(cores) - alpha = pool.starmap( - fit_alpha_synthesis, - zip( - itertools.repeat(t_uniq), - ul_m, - self.parameters["gamma"], - ), - ) - pool.close() - pool.join() - alpha = np.array(alpha) - self.parameters["alpha"] = alpha - # alpha: one-shot - # 'one_shot' + + alpha = _parallel_wrapper( + fit_alpha_synthesis, + zip( + itertools.repeat(t_uniq), + ul_m, + self.parameters["gamma"], + ), + ) + self.parameters["alpha"] = np.array(alpha) + elif self.extyp.lower() == "one-shot": t_uniq = np.unique(self.t) if len(t_uniq) > 1: @@ -761,29 +709,16 @@ def fit( ul_m, ul_v, t_uniq = calc_12_mom_labeling(self.data["ul"], self.t) alpha = np.zeros(n_genes) # let us only assume one alpha for each gene in all cells - if cores == 1: - for i in tqdm(range(n_genes), desc="estimating alpha"): - # for j in range(len(self.data['ul'][i])): - alpha[i] = fit_alpha_synthesis( - t_uniq, - ul_m[i], - self.parameters["beta"][i], - ) - else: - pool = ThreadPool(cores) - alpha = pool.starmap( - fit_alpha_synthesis, - zip( - itertools.repeat(t_uniq), - ul_m, - self.parameters["beta"], - ), - ) - pool.close() - pool.join() - alpha = np.array(alpha) - self.parameters["alpha"] = alpha - # self.parameters['alpha'] = self.fit_alpha_oneshot(self.t, self.data['ul'], self.parameters['beta'], clusters) + alpha = _parallel_wrapper( + fit_alpha_synthesis, + zip( + itertools.repeat(t_uniq), + ul_m, + self.parameters["beta"], + ), + ) + self.parameters["alpha"] = np.array(alpha) + else: if self._exist_data("ul") and self._exist_parameter("gamma"): self.parameters["alpha"] = self.fit_alpha_oneshot( @@ -797,45 +732,24 @@ def fit( gamma, total0 = np.zeros(n_genes), np.zeros(n_genes) for i in tqdm(range(n_genes), desc="estimating gamma"): total = self.data["uu"][i] + self.data["ul"][i] - total0[i], gamma[i] = ( - np.mean(total), - solve_gamma( - np.max(self.t), - self.data["uu"][i], - total, - ), - ) - (self.aux_param["total0"], self.parameters["gamma"],) = ( - total0, - gamma, - ) + total0[i] = np.mean(total) + gamma[i] = solve_gamma(np.max(self.t), self.data["uu"][i], total) + + self.aux_param["total0"] = total0 + self.parameters["gamma"] = gamma ul_m, ul_v, t_uniq = calc_12_mom_labeling(self.data["ul"], self.t) # let us only assume one alpha for each gene in all cells - alpha = np.zeros(n_genes) - if cores == 1: - for i in tqdm(range(n_genes), desc="estimating alpha"): - # for j in range(len(self.data['ul'][i])): - alpha[i] = fit_alpha_synthesis( - t_uniq, - ul_m[i], - self.parameters["gamma"][i], - ) # ul_m[i] / t_uniq - else: - pool = ThreadPool(cores) - alpha = pool.starmap( - fit_alpha_synthesis, - zip( - itertools.repeat(t_uniq), - ul_m, - self.parameters["gamma"], - ), - ) - pool.close() - pool.join() - alpha = np.array(alpha) - self.parameters["alpha"] = alpha - # self.parameters['alpha'] = self.fit_alpha_oneshot(self.t, self.data['ul'], self.parameters['gamma'], clusters) + alpha = _parallel_wrapper( + fit_alpha_synthesis, + zip( + itertools.repeat(t_uniq), + ul_m, + self.parameters["gamma"], + ), + ) + self.parameters["alpha"] = np.array(alpha) + elif one_shot_method == "combined": self.parameters["alpha"] = ( csr_matrix(self.data["ul"].shape) @@ -1317,154 +1231,6 @@ def fit( _, # self.aux_param["delta_logLL"], ) = (delta, delta_intercept, delta_r2, delta_logLL) - def fit_gamma_steady_state(self, u, s, intercept=True, perc_left=None, perc_right=5, normalize=True): - """Estimate gamma using linear regression based on the steady state assumption. - - Arguments - --------- - u: :class:`~numpy.ndarray` or sparse `csr_matrix` - A matrix of unspliced mRNA counts. Dimension: genes x cells. - s: :class:`~numpy.ndarray` or sparse `csr_matrix` - A matrix of spliced mRNA counts. Dimension: genes x cells. - intercept: bool - If using steady state assumption for fitting, then: - True -- the linear regression is performed with an unfixed intercept; - False -- the linear regresssion is performed with a fixed zero intercept. - perc_left: float - The percentage of samples included in the linear regression in the left tail. If set to None, then all the - left samples are excluded. - perc_right: float - The percentage of samples included in the linear regression in the right tail. If set to None, then all the - samples are included. - normalize: bool - Whether to first normalize the data. - - Returns - ------- - k: float - The slope of the linear regression model, which is gamma under the steady state assumption. - b: float - The intercept of the linear regression model. - r2: float - Coefficient of determination or r square for the extreme data points. - r2: float - Coefficient of determination or r square for the extreme data points. - all_r2: float - Coefficient of determination or r square for all data points. - """ - if intercept and perc_left is None: - perc_left = perc_right - u = u.A.flatten() if issparse(u) else u.flatten() - s = s.A.flatten() if issparse(s) else s.flatten() - - mask = find_extreme( - s, - u, - normalize=normalize, - perc_left=perc_left, - perc_right=perc_right, - ) - - if self.est_method.lower() == "ols": - k, b, r2, all_r2 = fit_linreg(s, u, mask, intercept) - else: - k, b, r2, all_r2 = fit_linreg_robust(s, u, mask, intercept, self.est_method) - - logLL, all_logLL = ( - calc_norm_loglikelihood(s[mask], u[mask], k), - calc_norm_loglikelihood(s, u, k), - ) - - return k, b, r2, all_r2, logLL, all_logLL - - def fit_gamma_stochastic( - self, - est_method, - u, - s, - us, - ss, - perc_left=None, - perc_right=5, - normalize=True, - ): - """Estimate gamma using GMM (generalized method of moments) or negbin distrubtion based on the steady state assumption. - - Arguments - --------- - est_method: `str` {`gmm`, `negbin`} The estimation method to be used when using the `stochastic` model. - * Available options when the `model` is 'ss' include: - (2) 'gmm': The new generalized methods of moments from us that is based on master equations, similar to the - "moment" model in the excellent scVelo package; - (3) 'negbin': The new method from us that models steady state RNA expression as a negative binomial distribution, - also built upon on master equations. - Note that all those methods require using extreme data points (except negbin, which use all data points) for - estimation. Extreme data points are defined as the data from cells whose expression of unspliced / spliced - or new / total RNA, etc. are in the top or bottom, 5%, for example. `linear_regression` only considers the mean of - RNA species (based on the `deterministic` ordinary different equations) while moment based methods (`gmm`, `negbin`) - considers both first moment (mean) and second moment (uncentered variance) of RNA species (based on the `stochastic` - master equations). - The above method are all (generalized) linear regression based method. In order to return estimated parameters - (including RNA half-life), it additionally returns R-squared (either just for extreme data points or all data points) - as well as the log-likelihood of the fitting, which will be used for transition matrix and velocity embedding. - All `est_method` uses least square to estimate optimal parameters with latin cubic sampler for initial sampling. - u: :class:`~numpy.ndarray` or sparse `csr_matrix` - A matrix of unspliced mRNA counts. Dimension: genes x cells. - s: :class:`~numpy.ndarray` or sparse `csr_matrix` - A matrix of spliced mRNA counts. Dimension: genes x cells. - us: :class:`~numpy.ndarray` or sparse `csr_matrix` - A matrix of unspliced mRNA counts. Dimension: genes x cells. - ss: :class:`~numpy.ndarray` or sparse `csr_matrix` - A matrix of spliced mRNA counts. Dimension: genes x cells. - perc_left: float - The percentage of samples included in the linear regression in the left tail. If set to None, then all the left samples are excluded. - perc_right: float - The percentage of samples included in the linear regression in the right tail. If set to None, then all the samples are included. - normalize: bool - Whether to first normalize the - - Returns - ------- - k: float - The slope of the linear regression model, which is gamma under the steady state assumption. - b: float - The intercept of the linear regression model. - r2: float - Coefficient of determination or r square for the extreme data points. - r2: float - Coefficient of determination or r square for the extreme data points. - all_r2: float - Coefficient of determination or r square for all data points. - """ - u = u.A.flatten() if issparse(u) else u.flatten() - s = s.A.flatten() if issparse(s) else s.flatten() - us = us.A.flatten() if issparse(us) else us.flatten() - ss = ss.A.flatten() if issparse(ss) else ss.flatten() - - mask = find_extreme( - s, - u, - normalize=normalize, - perc_left=perc_left, - perc_right=perc_right, - ) - - bs, bf = None, None - if est_method.lower() == "gmm": - k = fit_stochastic_linreg(u[mask], s[mask], us[mask], ss[mask]) - elif est_method.lower() == "negbin": - phi = compute_dispersion(s, ss) - k = fit_k_negative_binomial(u[mask], s[mask], ss[mask], phi) - bs, bf = compute_bursting_properties(np.mean(u[mask] + s[mask]), phi, k) - - r2, all_r2 = calc_R2(s[mask], u[mask], k), calc_R2(s, u, k) - logLL, all_logLL = ( - calc_norm_loglikelihood(s[mask], u[mask], k), - calc_norm_loglikelihood(s, u, k), - ) - - return (k, 0, r2, all_r2, logLL, all_logLL, bs, bf) - def fit_beta_gamma_lsq(self, t, U, S): """Estimate beta and gamma with the degradation data using the least squares method. @@ -1691,9 +1457,75 @@ def get_exist_data_names(self): return ret -def fit_gamma_stochastic_helper(args): - est_method, U, S, US, S2, perc_left, perc_right, boolean = args - return _fit_gamma_stochastic(est_method, U, S, US, S2, perc_left, perc_right, boolean) +def _fit_gamma_steady_state_helper(args): + U, S, intercept, perc_left, perc_right = args + return _fit_gamma_steady_state(U, S, intercept, perc_left, perc_right) + + +def _fit_gamma_steady_state(self, u, s, intercept=True, perc_left=None, perc_right=5, normalize=True): + """Estimate gamma using linear regression based on the steady state assumption. + + Arguments + --------- + u: :class:`~numpy.ndarray` or sparse `csr_matrix` + A matrix of unspliced mRNA counts. Dimension: genes x cells. + s: :class:`~numpy.ndarray` or sparse `csr_matrix` + A matrix of spliced mRNA counts. Dimension: genes x cells. + intercept: bool + If using steady state assumption for fitting, then: + True -- the linear regression is performed with an unfixed intercept; + False -- the linear regresssion is performed with a fixed zero intercept. + perc_left: float + The percentage of samples included in the linear regression in the left tail. If set to None, then all the + left samples are excluded. + perc_right: float + The percentage of samples included in the linear regression in the right tail. If set to None, then all the + samples are included. + normalize: bool + Whether to first normalize the data. + + Returns + ------- + k: float + The slope of the linear regression model, which is gamma under the steady state assumption. + b: float + The intercept of the linear regression model. + r2: float + Coefficient of determination or r square for the extreme data points. + r2: float + Coefficient of determination or r square for the extreme data points. + all_r2: float + Coefficient of determination or r square for all data points. + """ + if intercept and perc_left is None: + perc_left = perc_right + u = u.A.flatten() if issparse(u) else u.flatten() + s = s.A.flatten() if issparse(s) else s.flatten() + + mask = find_extreme( + s, + u, + normalize=normalize, + perc_left=perc_left, + perc_right=perc_right, + ) + + if self.est_method.lower() == "ols": + k, b, r2, all_r2 = fit_linreg(s, u, mask, intercept) + else: + k, b, r2, all_r2 = fit_linreg_robust(s, u, mask, intercept, self.est_method) + + logLL, all_logLL = ( + calc_norm_loglikelihood(s[mask], u[mask], k), + calc_norm_loglikelihood(s, u, k), + ) + + return k, b, r2, all_r2, logLL, all_logLL + + +def _fit_gamma_stochastic_helper(args): + est_method, U, S, US, S2, perc_left, perc_right, normalize = args + return _fit_gamma_stochastic(est_method, U, S, US, S2, perc_left, perc_right, normalize) def _fit_gamma_stochastic( From 8cd00b0f1d6688ae896ac1a468bfea93bcd828c2 Mon Sep 17 00:00:00 2001 From: Ukyeon Date: Tue, 9 May 2023 17:40:22 -0400 Subject: [PATCH 3/4] cancel duplicated code --- dynamo/estimation/csc/velocity.py | 466 +++++++++--------------------- 1 file changed, 136 insertions(+), 330 deletions(-) diff --git a/dynamo/estimation/csc/velocity.py b/dynamo/estimation/csc/velocity.py index 4c9a1a207..155dbfa89 100755 --- a/dynamo/estimation/csc/velocity.py +++ b/dynamo/estimation/csc/velocity.py @@ -646,7 +646,7 @@ def fit( except: gamma[i], u0[i] = 0, 0 self.parameters["gamma"], self.aux_param["uu0"] = gamma, u0 - alpha = np.zeros(n_genes) + # let us only assume one alpha for each gene in all cells ul_m, ul_v, _ = calc_12_mom_labeling(self.data["ul"], self.t) @@ -699,15 +699,14 @@ def fit( np.mean(U), solve_gamma(np.max(self.t), self.data["uu"][i], U), ) - ( - self.aux_param["U0"], - self.aux_param["S0"], - self.parameters["beta"], - self.parameters["gamma"], - ) = (U0, S0, beta, gamma) + + self.aux_param["U0"] = U0 + self.aux_param["S0"] = S0 + self.parameters["beta"] = beta + self.parameters["gamma"] = gamma ul_m, ul_v, t_uniq = calc_12_mom_labeling(self.data["ul"], self.t) - alpha = np.zeros(n_genes) + # let us only assume one alpha for each gene in all cells alpha = _parallel_wrapper( fit_alpha_synthesis, @@ -764,81 +763,31 @@ def fit( np.zeros(n_genes), np.zeros(n_genes), ) - U, S = ( - self.data["ul"], - self.data["uu"] + self.data["ul"], - ) + U = self.data["ul"] + S = self.data["uu"] + self.data["ul"] - if cores == 1: - for i in tqdm(range(n_genes), desc="estimating gamma"): - ( - gamma_k[i], - gamma_intercept[i], - _, - gamma_r2[i], - _, - gamma_logLL[i], - ) = self.fit_gamma_steady_state(U[i], S[i], False, None, perc_right) - ( - gamma[i], - self.parameters["alpha"][i], - ) = one_shot_gamma_alpha(gamma_k[i], t_uniq, U[i]) - else: - pool = ThreadPool(cores) - res1 = pool.starmap( - self.fit_gamma_steady_state, - zip( - U, - S, - itertools.repeat(False), - itertools.repeat(None), - itertools.repeat(perc_right), - ), - ) - - ( - gamma_k, - gamma_intercept, - _, - gamma_r2, - _, - gamma_logLL, - ) = zip(*res1) - (gamma_k, gamma_intercept, gamma_r2, gamma_logLL,) = ( - np.array(gamma_k), - np.array(gamma_intercept), - np.array(gamma_r2), - np.array(gamma_logLL), - ) - - res2 = pool.starmap( - one_shot_gamma_alpha, - zip(gamma_k, itertools.repeat(t_uniq), U), - ) - - (gamma, alpha) = zip(*res2) - (gamma, self.parameters["alpha"]) = ( - np.array(gamma), - np.array(alpha), - ) - - pool.close() - pool.join() - ( - self.parameters["gamma"], - self.aux_param["gamma_k"], - self.aux_param["gamma_intercept"], - self.aux_param["gamma_r2"], - self.aux_param["gamma_logLL"], - self.aux_param["alpha_r2"], - ) = ( - gamma, - gamma_k, - gamma_intercept, - gamma_r2, - gamma_logLL, - gamma_r2, + gamma_k, gamma_intercept, _, gamma_r2, _, gamma_logLL = _parallel_wrapper( + _fit_gamma_steady_state_helper, + zip( + U, + S, + itertools.repeat(False), + itertools.repeat(None), + itertools.repeat(perc_right), + ), + ) + gamma, alpha = _parallel_wrapper( + one_shot_gamma_alpha, + zip(gamma_k, itertools.repeat(t_uniq), U), ) + + self.parameters["gamma"] = np.array(gamma) + self.aux_param["gamma_k"] = np.array(gamma_k) + self.aux_param["gamma_intercept"] = np.array(gamma_intercept) + self.aux_param["gamma_r2"] = np.array(gamma_r2) + self.aux_param["gamma_logLL"] = np.array(gamma_logLL) + self.aux_param["alpha_r2"] = np.array(gamma_r2) + elif self.model.lower() == "stochastic": if np.all(self._exist_data("uu", "ul", "su", "sl")): self.parameters["beta"] = np.ones(n_genes) @@ -862,72 +811,33 @@ def fit( if self.data["s2"] is not None else calc_2nd_moment(S.T, S.T, self.conn, mX=S.T, mY=S.T).T ) - if cores == 1: - for i in tqdm( - range(n_genes), - desc="estimating beta and alpha for one-shot experiment", - ): - ( - k[i], - k_intercept[i], - _, - k_r2[i], - _, - k_logLL[i], - bs[i], - bf[i], - ) = self.fit_gamma_stochastic( - self.est_method, - U[i], - S[i], - US[i], - S2[i], - perc_left=perc_left, - perc_right=perc_right, - normalize=True, - ) - else: - pool = ThreadPool(cores) - res = pool.starmap( - self.fit_gamma_stochastic, - zip( - itertools.repeat(self.est_method), - U, - S, - US, - S2, - itertools.repeat(perc_left), - itertools.repeat(perc_right), - itertools.repeat(True), - ), - ) - pool.close() - pool.join() - ( - k, - k_intercept, - _, - k_r2, - _, - k_logLL, - bs, - bf, - ) = zip(*res) - (k, k_intercept, k_r2, k_logLL, bs, bf) = ( - np.array(k), - np.array(k_intercept), - np.array(k_r2), - np.array(k_logLL), - np.array(bs), - np.array(bf), - ) - beta, alpha0 = one_shot_gamma_alpha_matrix(k, t_uniq, U) - self.parameters["beta"], self.aux_param["beta_k"] = ( - beta, - k, + k, k_intercept, _, k_r2, _, k_logLL, bs, bf = _parallel_wrapper( + _fit_gamma_stochastic_helper, + zip( + itertools.repeat(self.est_method), + U, + S, + US, + S2, + itertools.repeat(perc_left), + itertools.repeat(perc_right), + itertools.repeat(True), + ), ) + (k, k_intercept, k_r2, k_logLL, bs, bf) = ( + np.array(k), + np.array(k_intercept), + np.array(k_r2), + np.array(k_logLL), + np.array(bs), + np.array(bf), + ) + beta, alpha0 = one_shot_gamma_alpha_matrix(k, t_uniq, U) + self.parameters["beta"] = beta + self.aux_param["beta_k"] = k + U = self.data["uu"] + self.data["ul"] S = U + self.data["su"] + self.data["sl"] US = ( @@ -940,86 +850,42 @@ def fit( if self.data["s2"] is not None else calc_2nd_moment(S.T, S.T, self.conn, mX=S.T, mY=S.T).T ) - if cores == 1: - for i in tqdm( - range(n_genes), - desc="estimating gamma and alpha for one-shot experiment", - ): - ( - k[i], - k_intercept[i], - _, - k_r2[i], - _, - k_logLL[i], - bs[i], - bf[i], - ) = self.fit_gamma_stochastic( - self.est_method, - U[i], - S[i], - US[i], - S2[i], - perc_left=perc_left, - perc_right=perc_right, - normalize=True, - ) - else: - pool = ThreadPool(cores) - res = pool.starmap( - self.fit_gamma_stochastic, - zip( - itertools.repeat(self.est_method), - U, - S, - US, - S2, - itertools.repeat(perc_left), - itertools.repeat(perc_right), - itertools.repeat(True), - ), - ) - pool.close() - pool.join() - (k, k_intercept, _, k_r2, _, k_logLL, bs, bf) = zip(*res) - (k, k_intercept, k_r2, k_logLL, bs, bf) = ( - np.array(k), - np.array(k_intercept), - np.array(k_r2), - np.array(k_logLL), - np.array(bs), - np.array(bf), - ) - gamma, alpha = one_shot_gamma_alpha_matrix(k, t_uniq, U) - ( - self.parameters["alpha"], - self.parameters["gamma"], - self.aux_param["gamma_k"], - self.aux_param["gamma_intercept"], - self.aux_param["gamma_r2"], - self.aux_param["gamma_logLL"], - self.aux_param["bs"], - self.aux_param["bf"], - ) = ( - (alpha + alpha0) / 2, - gamma, - k, - k_intercept, - k_r2, - k_logLL, - bs, - bf, + k, k_intercept, _, k_r2, _, k_logLL, bs, bf = _parallel_wrapper( + _fit_gamma_stochastic_helper, + zip( + itertools.repeat(self.est_method), + U, + S, + US, + S2, + itertools.repeat(perc_left), + itertools.repeat(perc_right), + itertools.repeat(True), + ), ) - elif np.all(self._exist_data("uu", "ul")): - k, k_intercept, k_r2, k_logLL, bs, bf = ( - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), - np.zeros(n_genes), + + (k, k_intercept, k_r2, k_logLL, bs, bf) = ( + np.array(k), + np.array(k_intercept), + np.array(k_r2), + np.array(k_logLL), + np.array(bs), + np.array(bf), ) + + gamma, alpha = one_shot_gamma_alpha_matrix(k, t_uniq, U) + + self.parameters["alpha"] = (alpha + alpha0) / 2 + self.parameters["gamma"] = gamma + self.aux_param["gamma_k"] = k + self.aux_param["gamma_intercept"] = k_intercept + self.aux_param["gamma_r2"] = k_r2 + self.aux_param["gamma_logLL"] = k_logLL + self.aux_param["bs"] = bs + self.aux_param["bf"] = bf + + elif np.all(self._exist_data("uu", "ul")): U = self.data["ul"] S = self.data["ul"] + self.data["uu"] US = ( @@ -1032,74 +898,41 @@ def fit( if self.data["s2"] is not None else calc_2nd_moment(S.T, S.T, self.conn, mX=S.T, mY=S.T).T ) - if cores == 1: - for i in tqdm(range(n_genes), desc="estimating gamma"): - ( - k[i], - k_intercept[i], - _, - k_r2[i], - _, - k_logLL[i], - bs[i], - bf[i], - ) = self.fit_gamma_stochastic( - self.est_method, - U[i], - S[i], - US[i], - S2[i], - perc_left=perc_left, - perc_right=perc_right, - normalize=True, - ) - else: - pool = ThreadPool(cores) - res = pool.starmap( - self.fit_gamma_stochastic, - zip( - itertools.repeat(self.est_method), - U, - S, - US, - S2, - itertools.repeat(perc_left), - itertools.repeat(perc_right), - itertools.repeat(True), - ), - ) - pool.close() - pool.join() - (k, k_intercept, _, k_r2, _, k_logLL, bs, bf) = zip(*res) - (k, k_intercept, k_r2, k_logLL, bs, bf) = ( - np.array(k), - np.array(k_intercept), - np.array(k_r2), - np.array(k_logLL), - np.array(bs), - np.array(bf), - ) - gamma, alpha = one_shot_gamma_alpha_matrix(k, t_uniq, U) - ( - self.parameters["alpha"], - self.parameters["gamma"], - self.aux_param["gamma_k"], - self.aux_param["gamma_intercept"], - self.aux_param["gamma_r2"], - self.aux_param["gamma_logLL"], - self.aux_param["bs"], - self.aux_param["bf"], - ) = ( - alpha, - gamma, - k, - k_intercept, - k_r2, - k_logLL, - bs, - bf, + k, k_intercept, _, k_r2, _, k_logLL, bs, bf = _parallel_wrapper( + _fit_gamma_stochastic_helper, + zip( + itertools.repeat(self.est_method), + U, + S, + US, + S2, + itertools.repeat(perc_left), + itertools.repeat(perc_right), + itertools.repeat(True), + ), ) + + (k, k_intercept, k_r2, k_logLL, bs, bf) = ( + np.array(k), + np.array(k_intercept), + np.array(k_r2), + np.array(k_logLL), + np.array(bs), + np.array(bf), + ) + + gamma, alpha = one_shot_gamma_alpha_matrix(k, t_uniq, U) + + self.parameters["alpha"] = alpha + self.parameters["gamma"] = gamma + self.aux_param["gamma_k"] = k + self.aux_param["gamma_intercept"] = k_intercept + self.aux_param["gamma_r2"] = k_r2 + self.aux_param["gamma_logLL"] = k_logLL + self.aux_param["bs"] = bs + self.aux_param["bf"] = bf + elif self.extyp.lower() == "mix_std_stm": t_min, t_max = np.min(self.t), np.max(self.t) if np.all(self._exist_data("ul", "uu", "su")): @@ -1187,49 +1020,22 @@ def fit( if self._exist_data("sl") else self.data["su"][ind_for_proteins] ) - if cores == 1: - for i in tqdm(range(n_genes), desc="estimating delta"): - ( - delta[i], - delta_intercept[i], - _, - delta_r2[i], - _, - delta_logLL[i], - ) = self.fit_gamma_steady_state( - s[i], - self.data["p"][i], - intercept, - perc_left, - perc_right, - ) - else: - pool = ThreadPool(cores) - res = pool.starmap( - self.fit_gamma_steady_state, - zip( - s, - self.data["p"], - itertools.repeat(intercept), - itertools.repeat(perc_left), - itertools.repeat(perc_right), - ), - ) - pool.close() - pool.join() - (delta, delta_intercept, _, delta_r2, _, delta_logLL) = zip(*res) - (delta, delta_intercept, delta_r2, delta_logLL) = ( - np.array(delta), - np.array(delta_intercept), - np.array(delta_r2), - np.array(delta_logLL), - ) - ( - self.parameters["delta"], - self.aux_param["delta_intercept"], - self.aux_param["delta_r2"], - _, # self.aux_param["delta_logLL"], - ) = (delta, delta_intercept, delta_r2, delta_logLL) + + delta, delta_intercept, _, delta_r2, _, delta_logLL = _parallel_wrapper( + _fit_gamma_steady_state_helper, + zip( + s, + self.data["p"], + itertools.repeat(intercept), + itertools.repeat(perc_left), + itertools.repeat(perc_right), + ), + ) + + self.parameters["delta"] = np.array(delta) + self.aux_param["delta_intercept"] = np.array(delta_intercept) + self.aux_param["delta_r2"] = np.array(delta_r2) + # self.aux_param["delta_logLL"] = np.array(delta_logLL) def fit_beta_gamma_lsq(self, t, U, S): """Estimate beta and gamma with the degradation data using the least squares method. From 0a645b8aa08774d162eaeab2d2a73ac0e2387142 Mon Sep 17 00:00:00 2001 From: Ukyeon Date: Tue, 9 May 2023 19:42:16 -0400 Subject: [PATCH 4/4] add cores --- dynamo/estimation/csc/velocity.py | 18 +++++++++++++++--- dynamo/tools/dynamics.py | 2 +- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/dynamo/estimation/csc/velocity.py b/dynamo/estimation/csc/velocity.py index 155dbfa89..1a663328b 100755 --- a/dynamo/estimation/csc/velocity.py +++ b/dynamo/estimation/csc/velocity.py @@ -503,6 +503,7 @@ def fit( if self.model.lower() == "deterministic": gamma, gamma_intercept, _, gamma_r2, _, gamma_logLL = _parallel_wrapper( + cores, _fit_gamma_steady_state_helper, zip( U, @@ -531,6 +532,7 @@ def fit( ) gamma, gamma_intercept, _, gamma_r2, _, gamma_logLL, bs, bf = _parallel_wrapper( + cores, _fit_gamma_stochastic_helper, zip( itertools.repeat(self.est_method), @@ -569,6 +571,7 @@ def fit( uu_m, uu_v, _ = calc_12_mom_labeling(self.data["uu"], self.t) alpha, uu0, r2 = _parallel_wrapper( + cores, fit_alpha_degradation, zip( itertools.repeat(t_uniq), @@ -596,6 +599,7 @@ def fit( uu_m, uu_v, _ = calc_12_mom_labeling(self.data["uu"], self.t) alpha, alpha_b, alpha_r2 = _parallel_wrapper( + cores, fit_alpha_degradation, zip( itertools.repeat(t_uniq), @@ -627,6 +631,7 @@ def fit( # let us only assume one alpha for each gene in all cells alpha = _parallel_wrapper( + cores, fit_alpha_synthesis, zip( itertools.repeat(t_uniq), @@ -651,6 +656,7 @@ def fit( ul_m, ul_v, _ = calc_12_mom_labeling(self.data["ul"], self.t) alpha = _parallel_wrapper( + cores, fit_alpha_synthesis, zip( itertools.repeat(t_uniq), @@ -709,6 +715,7 @@ def fit( # let us only assume one alpha for each gene in all cells alpha = _parallel_wrapper( + cores, fit_alpha_synthesis, zip( itertools.repeat(t_uniq), @@ -740,6 +747,7 @@ def fit( ul_m, ul_v, t_uniq = calc_12_mom_labeling(self.data["ul"], self.t) # let us only assume one alpha for each gene in all cells alpha = _parallel_wrapper( + cores, fit_alpha_synthesis, zip( itertools.repeat(t_uniq), @@ -767,6 +775,7 @@ def fit( S = self.data["uu"] + self.data["ul"] gamma_k, gamma_intercept, _, gamma_r2, _, gamma_logLL = _parallel_wrapper( + cores, _fit_gamma_steady_state_helper, zip( U, @@ -777,6 +786,7 @@ def fit( ), ) gamma, alpha = _parallel_wrapper( + cores, one_shot_gamma_alpha, zip(gamma_k, itertools.repeat(t_uniq), U), ) @@ -813,6 +823,7 @@ def fit( ) k, k_intercept, _, k_r2, _, k_logLL, bs, bf = _parallel_wrapper( + cores, _fit_gamma_stochastic_helper, zip( itertools.repeat(self.est_method), @@ -852,6 +863,7 @@ def fit( ) k, k_intercept, _, k_r2, _, k_logLL, bs, bf = _parallel_wrapper( + cores, _fit_gamma_stochastic_helper, zip( itertools.repeat(self.est_method), @@ -900,6 +912,7 @@ def fit( ) k, k_intercept, _, k_r2, _, k_logLL, bs, bf = _parallel_wrapper( + cores, _fit_gamma_stochastic_helper, zip( itertools.repeat(self.est_method), @@ -1022,6 +1035,7 @@ def fit( ) delta, delta_intercept, _, delta_r2, _, delta_logLL = _parallel_wrapper( + cores, _fit_gamma_steady_state_helper, zip( s, @@ -1422,11 +1436,9 @@ def _fit_gamma_stochastic( return (k, 0, r2, all_r2, logLL, all_logLL, bs, bf) -def _parallel_wrapper(func, args_list): +def _parallel_wrapper(cores, func, args_list): import multiprocessing as mp - import os - cores = os.cpu_count() ctx = mp.get_context("fork") with ctx.Pool(cores) as pool: diff --git a/dynamo/tools/dynamics.py b/dynamo/tools/dynamics.py index f6dd72874..977c623ca 100755 --- a/dynamo/tools/dynamics.py +++ b/dynamo/tools/dynamics.py @@ -454,7 +454,7 @@ def dynamics( assumption_mRNA = "ss" elif experiment_type.lower() in ["mix_pulse_chase", "deg", "kin"]: assumption_mRNA = "kinetic" - assumption_mRNA = "ss" ####### + if model.lower() == "stochastic" and experiment_type.lower() not in [ "conventional", "kinetics",