版本
导入Scanpy, 其版本为'1.9.1',如果你看到的源码和下文有差异,其可能是由于版本差异。
import scanpy as sc
sc.__version__
#'1.9.1'
例子
函数pp.normalize_total
用于Normalize counts per cell, 其源代码在scanpy/preprocessing/_normalization.py
我们通过一个简单例子来了解该函数主要功能:
将一个简单的矩阵数据转为AnnData对象
from anndata import AnnData
import scanpy as sc
adata = AnnData(np.array([
[3, 3, 3, 6, 6],
[1, 1, 1, 2, 2],
[1, 22, 1, 2, 2],
]))
adata.X
# array([[ 3., 3., 3., 6., 6.],
# [ 1., 1., 1., 2., 2.],
# [ 1., 22., 1., 2., 2.]], dtype=float32)
运行后pp.normalize_total
后, adata的数值发生变换
sc.pp.normalize_total(adata, target_sum=1)
adata.X
# array([[0.14, 0.14, 0.14, 0.29, 0.29],
# [0.14, 0.14, 0.14, 0.29, 0.29],
# [0.04, 0.79, 0.04, 0.07, 0.07]], dtype=float32)
数值变换满足一个规律,即每行的的总和加起来为一个确定数值,这里使用target_sum设为1.
for i in range(adata.shape[0]):
print(sum(adata.X[i, ]))
# 1.0000000447034836
# 1.0000000447034836
# 0.9999999925494194
代码
全部代码
以下为normalize_total
主要逻辑代码,为方便理解主要逻辑,其中删除了一些即将废弃的,异常处理,日志打印等代码。
[图片上传失败...(image-65e9eb-1662902530294)]
参数设置
代码前几行是函数的参数设置:
def normalize_total(
adata: anndata._core.anndata.AnnData,
target_sum: Optional[float] = None,
exclude_highly_expressed: bool = False,
max_fraction: float = 0.05,
key_added: Optional[str] = None,
layer: Optional[str] = None,
inplace: bool = True,
copy: bool = False,
) -> Optional[Dict[str, numpy.ndarray]]
adata, target_sum, ..., copy 是函数参数, 冒号后面跟的是参数类型注解,表面这个参数应该传递什么类型的值给函数。
是否copy数据
if copy:
if not inplace:
raise ValueError("`copy=True` cannot be used with `inplace=False`.")
adata = adata.copy()
用到参数copy
真值,来判断将现有adata 数据复制一份新数据的来进行后续操作
获取layer数据
X = _get_obs_rep(adata, layer=layer)
该行代码用到参数layer
, 代码功能为获取adata 的layer数据,如果不另外设置,默认返回adata.X
也是raw data, 进行normalize。
-
layer
设置需要进行normalize的layer. layer 是anndata中的一个概念。 -
adata
, 为一个AnnData对象,其中的数据矩阵,行为观测样本(observations ),列为变量(variable )或特征, 单细胞分析中,行对应细胞, 列对应基因
是否排除非常高表达的基因
if exclude_highly_expressed:
counts_per_cell = X.sum(1) # original counts per cell
counts_per_cell = np.ravel(counts_per_cell)
# at least one cell as more than max_fraction of counts per cell
gene_subset = (X > counts_per_cell[:, None] * max_fraction).sum(0)
gene_subset = np.ravel(gene_subset) == 0
counts_per_cell = X[:, gene_subset].sum(1)
else:
counts_per_cell = X.sum(1)
counts_per_cell = np.ravel(counts_per_cell)
这几行代码用到参数exclude_highly_expressed
和 max_fraction
,这两是搭配使用的。
-
exclude_highly_expressed
是设置是否排除基因count非常大的基因count。 -
max_fraction
设置每个细胞的原始total counts 的百分比作为最大基因count的比例,也即大于这个比例的基因将不参与最后total count的计算
假设X
为
array([[ 3., 3., 3., 6., 6.],
[ 1., 1., 1., 2., 2.],
[ 1., 22., 1., 2., 2.]], dtype=float32)
如果exclude_highly_expressed
为False, 则最后输出为每行的total count: counts_per_cell
counts_per_cell= X.sum(1)
counts_per_cell
# array([21., 7., 28.], dtype=float32)
如果exclude_highly_expressed
为True, 则根据计算出每行的原始total count,再计算totalcount * max_fraction
得到每行 允许的基因最大count 阈值,筛选出每行中小于等于这个阈值的基因count, 然后求和得到counts_per_cell。
exclude_highly_expressed = True
max_fraction = 0.2
if exclude_highly_expressed:
counts_per_cell = X.sum(1) # original counts per cell
counts_per_cell = np.ravel(counts_per_cell)
# at least one cell as more than max_fraction of counts per cell
# 下面几行是关键,如需要更好的理解,你可能需要一步步的运算查看输出结果来帮助理解
gene_subset = (X > counts_per_cell[:, None] * max_fraction).sum(0)
gene_subset = np.ravel(gene_subset) == 0
counts_per_cell = X[:, gene_subset].sum(1)
# X[:, gene_subset]
# array([[3., 3.],
# [1., 1.],
# [1., 1.]], dtype=float32)
#counts_per_cell
# array([6., 2., 2.], dtype=float32)
是否替换原数据
if inplace:
if key_added is not None:
# 添加.obs 字段, 内容为每行的total count
adata.obs[key_added] = counts_per_cell
# _set_obs_rep函数作用为将normalize的数据替换adata原数据
_set_obs_rep(
adata, _normalize_data(X, counts_per_cell, target_sum), layer=layer
)
else:
# 如果inplace为False,即不原位替换,则返回一个字典
dat = dict(
X=_normalize_data(X, counts_per_cell, target_sum, copy=True),
norm_factor=counts_per_cell,
)
这几行代码用到的参数:
-
inplace
: normalize后的数据是否替换原来adata.X 数据或者adata.layer数据 -
key_added
是否再adata.obs 新加一个字段 -
target_sum
设置normalize后每行的总和数据值
该部分代码逻辑,则根据inplace 真值,执行不同操作:
- inplace为True, 则使用
_set_obs_rep
函数将normalize后的数据替换原来adata.X 数据或者adata.layer数据。 - inplace为False, 则是构建一个字典,存储normalize后的数据
其中_normalize_data()
函数为真正进行normalize的函数, 后文再进行说明。
判断返回结果
if copy:
return adata
elif not inplace:
return dat
-
copy
为真返回adata -
inplace
为假返回 dat 也即上文提到的字典
_normalize_data
_normalize_data
函数的代码如下:
def _normalize_data(X, counts, after=None, copy=False):
# 是否copy
X = X.copy() if copy else X
# 类型转换
if issubclass(X.dtype.type, (int, np.integer)):
X = X.astype(np.float32) # TODO: Check if float64 should be used
# 支持DaskArray类型
if isinstance(counts, DaskArray):
counts_greater_than_zero = counts[counts > 0].compute_chunk_sizes()
else:
counts_greater_than_zero = counts[counts > 0]
# after为传进来的target_sum
# 如果为target_sum为None, target_sum则使用counts的中值
after = np.median(counts_greater_than_zero, axis=0) if after is None else after
# 将counts中得0 变成1
counts += counts == 0
# 使最后每行总和为target_sum
counts = counts / after
# 下面为支持不同类型,做了一些判断,但做的事都一样
if issparse(X):
sparsefuncs.inplace_row_scale(X, 1 / counts)
elif isinstance(counts, np.ndarray):
np.divide(X, counts[:, None], out=X)
else:
X = np.divide(X, counts[:, None]) # dask does not support kwarg "out"
# X = X / counts
# counts
# array([6., 2., 2.], dtype=float32)
# X
# array([[ 3., 3., 3., 6., 6.],
# [ 1., 1., 1., 2., 2.],
# [ 1., 22., 1., 2., 2.]], dtype=float32)
# np.divide(X, counts[:, None])
# array([[ 0.5, 0.5, 0.5, 1. , 1. ],
# [ 0.5, 0.5, 0.5, 1. , 1. ],
# [ 0.5, 11. , 0.5, 1. , 1. ]], dtype=float32)
# 即每行得count除以每行的total count
return X
上面代码中给了一些注释,辅助理解。