1. 介绍
这个代码实现了一个名为Combat的类,该类用于数据的批次效应校正。
Combat (ComBat) 是一种广泛应用于基因表达数据和其他高通量数据的批次效应校正方法。
以下是代码的主要流程和作用解释:
主要流程
初始化与参数设置:
- Combat类的构造函数中设置了模式(p或np)和收敛阈值(conv)。
拟合方法 (fit):
- 该方法接受输入数据Y(样本-特征矩阵),批次向量b,以及可选的协变量矩阵X和C。
- 检查输入数据的一致性,并将批次向量b转换为一维向量。
- 构建批次的独热编码矩阵B。
- 计算固定效应的参数估计值(包括截距alpha_hat,协变量效应系数beta_x和效应移除系数beta_c)。
- 对数据进行标准化,得到标准化后的数据矩阵Z。
- 估计标准化数据的批次效应参数gam_hat。
- 使用贝叶斯估计方法(_it_eb_param)迭代计算批次效应的后验分布参数(gam_star和del_sq_star)。
转换方法 (transform):
- 该方法接受输入数据Y,批次向量b,以及可选的协变量矩阵X和C。
- 对数据进行标准化。
- 根据拟合的批次效应参数gamma_和delta_sq_对标准化后的数据进行批次效应校正。
- 将数据重新转换到原始尺度上,并加上截距和协变量效应。
拟合转换方法 (fit_transform):
- 结合拟合和转换方法,先拟合参数再进行数据转换。
3. 代码分析
代码主要函数与方法解释
_compute_lambda 和 _compute_theta, 这些函数用于计算超参数 lambda 和 theta,基于输入的方差估计值 del_hat_sq。
def _compute_lambda(del_hat_sq):
v = np.mean(del_hat_sq)
s2 = np.var(del_hat_sq, ddof=1)
return (2 * s2 + v ** 2) / float(s2)
def _compute_theta(del_hat_sq):
v = del_hat_sq.mean()
s2 = np.var(del_hat_sq, ddof=1)
return (v * s2 + v ** 3) / s2
_post_gamma 和 _post_delta函数计算后验分布参数 gamma 和 delta。
def _post_gamma(x, gam_hat, gam_bar, tau_bar_sq, n):
num = tau_bar_sq * n * gam_hat + x * gam_bar
den = tau_bar_sq * n + x
return num / den
def _post_delta(x, Z, lam_bar, the_bar, n):
num = the_bar + 0.5 * np.sum((Z - x[np.newaxis, :]) ** 2, axis=0)
den = n / 2.0 + lam_bar - 1
return num / den
_inverse_gamma_moments
计算逆伽马分布的参数 lambda 和 theta。
def _inverse_gamma_moments(del_hat_sq):
lam_bar = np.apply_along_axis(_compute_lambda, arr=del_hat_sq, axis=1)
the_bar = np.apply_along_axis(_compute_theta, arr=del_hat_sq, axis=1)
return (lam_bar, the_bar)
_it_eb_param执行迭代的经验贝叶斯参数估计。
def _it_eb_param(Z_batch, gam_hat_batch, del_hat_sq_batch, gam_bar_batch, tau_sq_batch, lam_bar_batch, the_bar_batch, conv):
n = np.sum(1 - np.isnan(Z_batch), axis=0)
gam_prior = gam_hat_batch.copy()
del_sq_prior = del_hat_sq_batch.copy()
change = 1
while change > conv:
gam_post = _post_gamma(del_sq_prior, gam_hat_batch, gam_bar_batch, tau_sq_batch, n)
del_sq_post = _post_delta(gam_post, Z_batch, lam_bar_batch, the_bar_batch, n)
change = max((abs(gam_post - gam_prior) / gam_prior).max(),
(abs(del_sq_post - del_sq_prior) / del_sq_prior).max())
gam_prior = gam_post
del_sq_prior = del_sq_post
return (gam_post, del_sq_post)
Combat 类的核心方法
包括 fit、transform 和 fit_transform 方法,用于数据的批次效应校正。
class Combat(object):
def __init__(self, mode='p', conv=0.0001):
if (mode == 'p') | (mode == 'np'):
self.mode = mode
else:
raise ValueError("mode can only be 'p' o 'np'")
self.conv = conv
def fit(self, Y, b, X=None, C=None):
check_consistent_length(Y, b, X, C)
b = column_or_1d(b)
batches = np.unique(b)
self.batches_ = batches
B = np.column_stack([(b == b_name).astype(int) for b_name in self.batches_])
n_samples, n_features = Y.shape
n_batch = B.shape[1]
if n_batch == 1:
raise ValueError('The number of batches should be at least 2')
sample_per_batch = B.sum(axis=0)
if np.any(sample_per_batch == 1):
raise ValueError('Each batch should have at least 2 observations')
M = B.copy()
if isinstance(X, np.ndarray):
M = np.column_stack((M, X))
end_x = n_batch + X.shape[1]
else:
end_x = n_batch
if isinstance(C, np.ndarray):
M = np.column_stack((M, C))
end_c = end_x + C.shape[1]
else:
end_c = end_x
beta_hat = np.matmul(np.linalg.inv(np.matmul(M.T, M)), np.matmul(M.T, Y))
alpha_hat = np.matmul(sample_per_batch / float(n_samples), beta_hat[:n_batch, :])
self.intercept_ = alpha_hat
beta_x = beta_hat[n_batch:end_x, :]
self.coefs_x_ = beta_x
beta_c = beta_hat[end_x:end_c, :]
self.coefs_c_ = beta_c
Y_hat = np.matmul(M, beta_hat)
sigma = np.mean(((Y - Y_hat) ** 2), axis=0)
self.epsilon_ = sigma
Z = Y.copy()
Z -= alpha_hat[np.newaxis, :]
Z -= np.matmul(M[:, n_batch:end_x], beta_x)
Z -= np.matmul(M[:, end_x:end_c], beta_c)
Z /= np.sqrt(sigma)
gam_hat = np.matmul(np.linalg.inv(np.matmul(B.T, B)), np.matmul(B.T, Z))
gam_bar = np.mean(gam_hat, axis=1)
tau_bar_sq = np.var(gam_hat, axis=1, ddof=1)
del_hat_sq = [np.var(Z[B[:, ii] == 1, :], axis=0, ddof=1) for ii in range(B.shape[1])]
del_hat_sq = np.array(del_hat_sq)
lam_bar, the_bar = _inverse_gamma_moments(del_hat_sq)
if self.mode == 'p':
it_eb = _it_eb_param
else:
it_eb = _it_eb_non_param
gam_star, del_sq_star = [], []
for ii in range(B.shape[1]):
g, d_sq = it_eb(Z[B[:, ii] == 1, :], gam_hat[ii, :], del_hat_sq[ii, :], gam_bar[ii], tau_bar_sq[ii], lam_bar[ii], the_bar[ii], self.conv)
gam_star.append(g)
del_sq_star.append(d_sq)
gam_star = np.array(gam_star)
del_sq_star = np.array(del_sq_star)
self.gamma_ = gam_star
self.delta_sq_ = del_sq_star
return self
def transform(self, Y, b, X=None, C=None):
Y, b, X, C = self._validate_for_transform(Y, b, X, C)
test_batches = np.unique(b)
Y_trans = Y - self.intercept_[np.newaxis, :]
if self.coefs_x_.size > 0:
Y_trans -= np.matmul(X, self.coefs_x_)
if self.coefs_c_.size > 0:
Y_trans -= np.matmul(C, self.coefs_c_)
Y_trans /= np.sqrt(self.epsilon_)