参考资料:《Ceph 设计原理与实现》
CRUSH(Controlled Replication Under Scalable Hashing),是一种基于哈希的数据分布算法。以数据唯一标识符、当前存储集群的拓扑机构以及数据备份策略作为 CRUSH 输入,可以随时随地通过计算获取数据所在的底层存储设备位置并直接与其通信,从而避免查表操作,实现去中心化和高度并发。
1 straw 与 starw2
straw 算法将所有元素比作吸管,针对指定输入,为每个元素随机计算一个长度,最后从中选择长度最长的那个元素作为结果输出。这个过程被称为抽签,对应元素长度成为签长。针对硬件差异,引入了权重,让权重大的元素更容易被抽中,使得数据再异构存储网络中也能合理分布。
straw 算法举例:
1)假定当前集合中一共包含 n 个元素:(e1, e2, ..., en)
2)向集合中添加新元素:(e1, e2, ..., en, en+1)
3)针对任意输入 x,加入 en+1 之前,分别计算每个元素签长并假定其中最大值为 dmax:(d1, d2, ..., dn)
4)因为新元素 en+1 的签长计算只和自身编号和权重有关,所以可以独立计算出签长:dn+1
5)straw 算法总是选择最大的签长作为最终结果,所以:
如果dn+1 > dmax,那么 x 将被重新映射到新元素 en+1;反之,对 x 已有的映射结果无影响。
添加一个元素,straw 算法会随机地将一些原有元素中的数据重新映射至新加入的元素中;同理,删除一个元素,straw 算法会将该元素中全部数据随机地重新映射至其他元素中。因此,无论添加或者删除元素,都不回导致数据在第三方元素迁移,只涉及迁出元素和迁入元素。
# straw 算法伪代码,input 输入,r 随机因子,
max_x = -1
max_item = -1
for each item:
x = hash(input, r)
x = x * item_straw
if x > max_x:
max_x = x
max_item = item
return max_item
# item_straw 通过权重计算得到
reverse = rearrange all weights in reverse order #逆序排列所有权重
straw = -1
weight_diff_prev_total = 0
for each item:
item_straw = straw * 0x10000
weight_diff_prev = (reverse[current_item] - reverse[prev_item]) * items_remain
weight_diff_prev_total += weight_diff_prev
weight_diff_next = (reverse[next_item] - reverse[current_item]) * items_remain
scale = weight_diff_prev_total / (weight_diff_prev_total + weight_diff_next)
stram *= pow(1/scale, 1/items_reamain)
item_straw 不但取决于每个元素的自身权重,而且也和集合当中所有其他元素的权重相关。从而导致每次有元素加入当前集合或者从当前集合中删除时,会引起不相关的数据迁移。
strew2 算法中不考虑和其他元素之间的联系,仅关注自身权重,得到了新 的算法。
max_x = -1
max_item = -1
for each item:
x = hash(input, r)
x = ln(x / 65536) / weight
if x > max_x:
max_x = x
max_item = item
return max_item
x 在执行 hash 算法后,结果必然落在[0, 65535]之间,因此 ln(x / 65536x) 结果为复数,将其除以自身权重后,则权重越大,得到的 x 结果越大(x < 0)。体现了权重对于每个元素抽签结果的正反馈作用。
2 crush 简介
Crush 算法需要输入三个参数:对象、cluster map(集群拓扑结构) 和 placement rule(数据分布策略)。一般而言,placement rule 不会轻易改动,cluster map 类似现实中的服务器地址,当这两个参数都不变动的时候,每次输入一个对象,计算得到的结果都是确定的。又因为采用了 hash 算法,所以每个硬盘被选中的概率也大约一致。从而,可以保证数据在整个集群中均匀分布。
上图一个简化的 cluster map ,每个叶子节点都是真实的最小物理存储设备(例如磁盘),成为 device;所有的中间节点统称为 bucket,每个 bucket 可以是一些 devices 的集合,也可以是低一级的 buckets 集合;根节点成为 root,是整个集群的入口。每个节点都拥有唯一的数字 ID 和类型,以标识其在集群中所处的位置和层级,但是只有叶子节点,才拥有非负 ID,表明其是承载数据的最终设备。上一级节点权重是其所有孩子节点的权重之和。
这里给出 cluster map 常见的节点类型,根据规模从小到大排列;osd、host、chassis、rack、row、pdu、pod、room、datacenter、region、root。
placement rule 用来完成数据映射。每条 palcement rule 可以包含多个操作,这些操作共有3种类型:take、select、emit。
- take
take 从cluster map 选择指定编号的 bucket(某个特定的 bucket),并以此作为后续步骤的输入。例如系统默认的 placement rule 总是以 cluster map 中的 root 节点作为输入开始执行的。- select
select 从输入的 bucket 当中随机选择指定类型和数量的条目(items)。Ceph 当前支持两种备份策略——副本和纠删码,相应的有两种 select 算法——firstn 和 indep。- emit
emit 输出最终选择结果给上级调用者并返回。
下图以 firstn 为例展示了 select 从指定的 bucket 当中查找指定数量条目的过程。
(1)在当前开始查找的 bucket 下选择一个 item
这里就用到我们上述提到的选择算法,例如 straw,用于从对应的 bucket 中计算出合适的条目。输入为对象的特征标识符 x 和随机因子 r (r 实际上是作为哈希函数的种子)。为了防止死循环,还需要对每个副本过程中的尝试次数进行限制,称为全局尝试次数(choose_total_tries)。
(2)冲突
冲突指选中的条目已经存在于输出条目列表之中。
(3)OSD过载(或失效)
- 集群规模较小、集群整体容量有限,导致集群 PG 总数有限。
- crush 算法本身缺陷——crush 的基本选择算法中,以 straw2 为例,每次选择都是计算单个条目被选中的独立概率,但是 ceph 中的多副本策略要求选出多个条目。所以从原理上 crush 就无法处理好多副本模式下的副本均匀分布问题。
这些因素导致在真实的 Ceph 集群中,特别是在异构集群中,很容易出现磁盘数据分布不均匀的问题。所以磁盘的权重是动态变化的,并且除了可以根据容量来计算真实权重外,Ceph 还设置了一个 reweight 权重。当选中一个 osd 后, 还会根据输入量 x 和 OSD 编号 hash 计算一次,当结果小于 OSD reweight 时,才会真正选择此 osd。
由上面给出的过载测试流程图可以看出,reweight 设置得越高,通过测试的概率越高(若 reweight > 0x10000,则通过概率100%),反之亦然。在实际应用中,通过降低过载 OSD 或者增加空闲 OSD 的 reweight 都可以触发数据在 OSD 之间重新分布。
过载测试的另一个好处:对 OSD 暂时失效和 OSD 永久删除的场景进行区分。如果时暂时失效,只需要把 reweight 设为0,避免引发 Ceph 集群大的波动。
初始时,Ceph 将每个 OSD 的 reweight 都设置为 0x10000,过载测试失效。
可以通过命令行获取修改 crush map 配置文件。
- 获取 crush map
ceph osd getcrushmap -o {compiled-crushmap-filename}
- 反编译 crush map
crushtool -d {compiled-crushmap-filename} -o {decompiled-crushmap-filename}
cat crushmapdecompliedbywq
# begin crush map
tunable choose_local_tries 0 # 已废弃,为做向后兼容设为0
tunable choose_local_fallback_tries 0 # 已废弃,为做向后兼容设为0
tunable choose_total_tries 50 # 选择 bucket 最大尝试次数,默认值 50
tunable chooseleaf_descend_once 1 # 已废弃,为做向后兼容设为1
tunable chooseleaf_vary_r 1 #
tunable chooseleaf_stable 1 # 避免一些不必要的 pg 迁移
tunable straw_calc_version 1 # starw 算法版本,为向后兼容设为1
tunable allowed_bucket_algs 54 # 允许使用的 bucket 选择算法,54 代表 straw2 算法
# devices
# 每一个最末端的的物理设备,也叫叶子节点就叫device,可以对磁盘进行智能识别为 hdd ssd nvme类型。
device 0 osd.0 class hdd
device 1 osd.1 class hdd
device 2 osd.2 class hdd
# types
# type 是可以自定义的, 是bucket的类型,一般设计为层级结构,编号必须为正整数。
type 0 osd
type 1 host
type 2 chassis
type 3 rack
type 4 row
type 5 pdu
type 6 pod
type 7 room
type 8 datacenter
type 9 zone
type 10 region
type 11 root
# buckets
# 所有的中间节点就叫做bucket,bucket可以是一些devices的集合也可以是低一级的buckets的集合, 根节点称为root是整个集群的入口, bucket的id必须是负数且唯一,一个bucket在crush map 实际存储位置是 buckets[-1-(bucket id)]。
host localhost {
id -3 # do not change unnecessarily
id -4 class hdd # do not change unnecessarily
# weight 0.032
alg straw2
hash 0 # rjenkins1
item osd.0 weight 0.011
item osd.1 weight 0.011
item osd.2 weight 0.011
}
root default {
id -1 # do not change unnecessarily
id -2 class hdd # do not change unnecessarily
# weight 0.032
alg straw2
hash 0 # rjenkins1
item localhost weight 0.032
}
# rules
# placement rule
rule replicated_rule {
id 0 # id
type replicated # 类型 [replicated|erasure]
min_size 1 # 如果副本数小于这个数值,就不会应用这条rule
max_size 10 # 如果副本数大于这个数值,就不会应用这条rule
step take default # crush规则的入口,一般是类型为root的bucket
step choose firstn 0 type osd # 分为choose和chooseleaf两种,num代表选择的数量,type是预期的bucket类型。
step emit # 输出结果
}
# end crush map
这里单独把 placement rule 提出来,下面给出了编写方法。
rule <rulename> {
ruleset <ruleset>
type [replicated|erasure]
min_size <min-size>
max_size <max-size>
step take <bucket-name>
step select [choose|chooseleaf] [firstn|indep] <num> type <bucket-type>
step emit
}
placement rule 执行流程如下:
take 操作选择一个 bucket,一般是 root 类型的 bucket。
choose 操作由不同的选择方式,其输入都是上一步的输出。
a) choose firstn 深度优先选择出 num 个类型为 bucket-type 个的子 bucket。
b) chooseleaf 先选择出 num 个类型为 bucket-type 个子 bucket,然后递归到叶子节点。
- 如果 num == 0,num 为 pool 设置的副本数。
- 如果 0 < num < pool.size(pool 设置的副本数),那么选出 num 个。
- 如果 num < 0,选出 pool.size - |num| 个。
firstn 和 indep 都是深度优先算法。主要区别在于:如果 num 为4,如果无法选出4个结果时,firstn 返回[1,2,4],而 indep 会返回[1,2,CRUSH_ITEM_NONE,4]。一般选择 firstn 模式。
- 编译 crush map
crushtool -c {decompiled-crush-map-filename} -o {compiled-crush-map-filename}
- 测试 crush map
crushtool -i {compiled-crush-map-filename} --test --min-x 0 --max-x {nums} --num-rep {nums} --ruleset --show_mappings
- 注入集群,使之生效
ceph osd setcrushmap -i {compiled-crush-map-filename}
3 _calc_target()
_calc_target() 是源码中计算主 osd 目标方法,大致流程如下:
- 根据 poolid 获取 pool 信息,包括 type、crush id、pg 数量等。
- 判断是否强制重发,标志位:force_resend
- 判断是否有缓存池,若有,则更新 pool 信息为缓存池信息,若没有,则不操作。
- 根据发送对象的信息(name,key,namespce)和 poolid,来计算 pg 信息,得到关键参数 m_seed。
- 根据 pg 信息,使用 crush 算法计算对象发送到哪组 osd 以及主 osd。
- 根据本次发送的目标是否和上一次发送的一致(对象和 pool 都一样才为一致)以及 any_change 参数,重置 force_resend。
- 读取 osd 状态(CEPH_OSDMAP_PAUSERD、CEPH_OSDMAP_PAUSEWR),判断是否暂停发送。
- 判断是否合法变更,标志位:legacy_change。
- 根据 pool 的 pg 数量是否变化,若变化,则 split_or_merge 标志位置真。
- 更新 op_target_t 参数,包括发送主 osd 以及 osd 组,成功返回 RECALC_OP_TARGET_NEED_RESEND。
以上过程涉及两次选择计算,第一次是选择 pg,第二次是选择 osd,接下来将分别介绍这两个选择函数 osdmap->object_locator_to_pg() 和 osdmap->pg_to_up_acting_osds()。
int Objecter::_calc_target(op_target_t *t, Connection *con, bool any_change)
{
//设置读写标志
bool is_read = t->flags & CEPH_OSD_FLAG_READ;
bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
//获取 osdmap epoch
t->epoch = osdmap->get_epoch();
//获取指定 poolid 的 pool 信息
const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
if (!pi) {
t->osd = -1;
return RECALC_OP_TARGET_POOL_DNE;
}
ldout(cct,30) << __func__ << " base pi " << pi
<< " pg_num " << pi->get_pg_num() << dendl;
// 第一次更新 force_recend
bool force_resend = false;
if (osdmap->get_epoch() == pi->last_force_op_resend) {
if (t->last_force_resend < pi->last_force_op_resend) {
t->last_force_resend = pi->last_force_op_resend;
force_resend = true;
} else if (t->last_force_resend == 0) {
force_resend = true;
}
}
//替换为缓存池
// apply tiering
t->target_oid = t->base_oid;
t->target_oloc = t->base_oloc;
if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
if (is_read && pi->has_read_tier())
t->target_oloc.pool = pi->read_tier;
if (is_write && pi->has_write_tier())
t->target_oloc.pool = pi->write_tier;
pi = osdmap->get_pg_pool(t->target_oloc.pool);
if (!pi) {
t->osd = -1;
return RECALC_OP_TARGET_POOL_DNE;
}
}
//计算 pg
pg_t pgid;
if (t->precalc_pgid) {
ceph_assert(t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY);
ceph_assert(t->base_oid.name.empty()); // make sure this is a pg op
ceph_assert(t->base_oloc.pool == (int64_t)t->base_pgid.pool());
pgid = t->base_pgid;
} else {
//具体计算 pg 函数入口
int ret = osdmap->object_locator_to_pg(t->target_oid, t->target_oloc,
pgid);
if (ret == -ENOENT) {
t->osd = -1;
return RECALC_OP_TARGET_POOL_DNE;
}
}
ldout(cct,20) << __func__ << " target " << t->target_oid << " "
<< t->target_oloc << " -> pgid " << pgid << dendl;
ldout(cct,30) << __func__ << " target pi " << pi
<< " pg_num " << pi->get_pg_num() << dendl;
t->pool_ever_existed = true;
int size = pi->size;
int min_size = pi->min_size;
unsigned pg_num = pi->get_pg_num();
unsigned pg_num_pending = pi->get_pg_num_pending();
int up_primary, acting_primary;
vector<int> up, acting;
// cursh 算法
osdmap->pg_to_up_acting_osds(pgid, &up, &up_primary,
&acting, &acting_primary);
bool sort_bitwise = osdmap->test_flag(CEPH_OSDMAP_SORTBITWISE);
bool recovery_deletes = osdmap->test_flag(CEPH_OSDMAP_RECOVERY_DELETES);
unsigned prev_seed = ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask);
pg_t prev_pgid(prev_seed, pgid.pool());
//第二次更新 force_resend
if (any_change && PastIntervals::is_new_interval(
t->acting_primary,
acting_primary,
t->acting,
acting,
t->up_primary,
up_primary,
t->up,
up,
t->size,
size,
t->min_size,
min_size,
t->pg_num,
pg_num,
t->pg_num_pending,
pg_num_pending,
t->sort_bitwise,
sort_bitwise,
t->recovery_deletes,
recovery_deletes,
prev_pgid)) {
force_resend = true;
}
//更新 should_be_paused、legacy_change、split_or_merge
bool unpaused = false;
bool should_be_paused = target_should_be_paused(t);
if (t->paused && !should_be_paused) {
unpaused = true;
}
t->paused = should_be_paused;
bool legacy_change =
t->pgid != pgid ||
is_pg_changed(
t->acting_primary, t->acting, acting_primary, acting,
t->used_replica || any_change);
bool split_or_merge = false;
if (t->pg_num) {
split_or_merge =
prev_pgid.is_split(t->pg_num, pg_num, nullptr) ||
prev_pgid.is_merge_source(t->pg_num, pg_num, nullptr) ||
prev_pgid.is_merge_target(t->pg_num, pg_num);
}
//重置 op_target_t
if (legacy_change || split_or_merge || force_resend) {
t->pgid = pgid;
t->acting = acting; // osd 组
t->acting_primary = acting_primary; // 主 osd
t->up_primary = up_primary;
t->up = up;
t->size = size;
t->min_size = min_size;
t->pg_num = pg_num;
t->pg_num_mask = pi->get_pg_num_mask();
t->pg_num_pending = pg_num_pending;
osdmap->get_primary_shard(
pg_t(ceph_stable_mod(pgid.ps(), t->pg_num, t->pg_num_mask), pgid.pool()),
&t->actual_pgid);
t->sort_bitwise = sort_bitwise;
t->recovery_deletes = recovery_deletes;
ldout(cct, 10) << __func__ << " "
<< " raw pgid " << pgid << " -> actual " << t->actual_pgid
<< " acting " << acting
<< " primary " << acting_primary << dendl;
t->used_replica = false;
if (acting_primary == -1) {
t->osd = -1;
} else {
int osd;
bool read = is_read && !is_write;
//读操作,会从几个acting osd 组中,随机选一个,作为读取目标 osd
if (read && (t->flags & CEPH_OSD_FLAG_BALANCE_READS)) {
int p = rand() % acting.size();
if (p)
t->used_replica = true;
osd = acting[p];
ldout(cct, 10) << " chose random osd." << osd << " of " << acting
<< dendl;
} else if (read && (t->flags & CEPH_OSD_FLAG_LOCALIZE_READS) &&
acting.size() > 1) {
// look for a local replica. prefer the primary if the
// distance is the same.
int best = -1;
int best_locality = 0;
for (unsigned i = 0; i < acting.size(); ++i) {
int locality = osdmap->crush->get_common_ancestor_distance(
cct, acting[i], crush_location);
ldout(cct, 20) << __func__ << " localize: rank " << i
<< " osd." << acting[i]
<< " locality " << locality << dendl;
if (i == 0 ||
(locality >= 0 && best_locality >= 0 &&
locality < best_locality) ||
(best_locality < 0 && locality >= 0)) {
best = i;
best_locality = locality;
if (i)
t->used_replica = true;
}
}
ceph_assert(best >= 0);
osd = acting[best];
} else {
osd = acting_primary;
}
// 目标 osd 为 acting_primary osd
t->osd = osd;
}
}
if (legacy_change || unpaused || force_resend) {
return RECALC_OP_TARGET_NEED_RESEND;
}
if (split_or_merge &&
(osdmap->require_osd_release >= CEPH_RELEASE_LUMINOUS ||
HAVE_FEATURE(osdmap->get_xinfo(acting_primary).features,
RESEND_ON_SPLIT))) {
return RECALC_OP_TARGET_NEED_RESEND;
}
return RECALC_OP_TARGET_NO_ACTION;
}
3.1 osdmap->object_locator_to_pg()
OSDMap::object_locator_to_pg() 输入参数有3个:
- object_t:这是一个结构体,主要参数只有一个:name 对象名称。
- object_locator_t:对象定位器,其主要参数:pool id,key,namespace,hash(选择哪种 hash 算法)。
- pg_t:要计算的目标 pg,其主要参数:pool id,m_seed(hash 种子数,用于计算 osd)。
pg_t 中的 pool id 就是 loc 中的 pool id,实际需要计算的只有 m_seed。根据下面给出的源码可知,当 loc.hash 大于0时,会把 loc.hash 直接当成 m_seed,构造 pg_t。当 loc.hash 小于0时,调用 map_to_pg() 方法计算 m_seed。
int OSDMap::object_locator_to_pg(
const object_t& oid, const object_locator_t& loc, pg_t &pg) const
{
if (loc.hash >= 0) {
if (!get_pg_pool(loc.get_pool())) {
return -ENOENT;
}
pg = pg_t(loc.hash, loc.get_pool());
return 0;
}
return map_to_pg(loc.get_pool(), oid.name, loc.key, loc.nspace, &pg);
}
strcut pg_t{
...
//构造函数
pg_t(ps_t seed, uint64_t pool) :
m_pool(pool), m_seed(seed) {}
...
}
根据 map_to_pg() 方法可知,ps(m_seed)是由 loc 中的 key 或者 name 配合 namespace 通过 pool->hash_key() 计算出来的。继续深入 hash_key() 可以得到调用链:hash_key() -> ceph_str_hash() -> ceph_str_hash_rjenkins()/ceph_str_hash_linux() 。一般选择 rjenkins hash 算法。
int OSDMap::map_to_pg(
int64_t poolid,
const string& name,
const string& key,
const string& nspace,
pg_t *pg) const
{
// calculate ps (placement seed)
const pg_pool_t *pool = get_pg_pool(poolid);
if (!pool)
return -ENOENT;
ps_t ps;
if (!key.empty())
ps = pool->hash_key(key, nspace);
else
ps = pool->hash_key(name, nspace);
*pg = pg_t(ps, poolid);
return 0;
}
通过分析 rjenkins hash 算法,发现无论输入为多少位的字符串,输出总是定长的无符号32位数字。通过循环拆分,它把前12位字符,分别拆成3个32位数字a、b、c。方法是:把字符串的每一个字符挨个转成8位的数字(char 占1 byte,8 bit),并循环把第1、2、3、4个字符拼成32位数字a,第5、6、7、8个字符拼成数字b,第9、10、11、12个字符拼成数字c,通过 mix() 搅拌函数充分搅拌a、b、c。12个字符位一组,循环处理输入的字符串,并与之前的a、b、c相加。若长度不足12,则按顺序向前推。如:长度位5,只处理前5个字符,前4个字符拼成a,最后一个拼成b,c为0,执行 mix() 函数。最终输出结果为c,c就是 hash 种子 m_seed。
#define mix(a, b, c) \
do { \
a = a - b; a = a - c; a = a ^ (c >> 13); \
b = b - c; b = b - a; b = b ^ (a << 8); \
c = c - a; c = c - b; c = c ^ (b >> 13); \
a = a - b; a = a - c; a = a ^ (c >> 12); \
b = b - c; b = b - a; b = b ^ (a << 16); \
c = c - a; c = c - b; c = c ^ (b >> 5); \
a = a - b; a = a - c; a = a ^ (c >> 3); \
b = b - c; b = b - a; b = b ^ (a << 10); \
c = c - a; c = c - b; c = c ^ (b >> 15); \
} while (0)
unsigned ceph_str_hash_rjenkins(const char *str, unsigned length)
{
const unsigned char *k = (const unsigned char *)str;
__u32 a, b, c; /* the internal state */
__u32 len; /* how many key bytes still need mixing */
/* Set up the internal state */
len = length;
a = 0x9e3779b9; /* the golden ratio; an arbitrary value */
b = a;
c = 0; /* variable initialization of internal state */
/* handle most of the key */
while (len >= 12) {
a = a + (k[0] + ((__u32)k[1] << 8) + ((__u32)k[2] << 16) +
((__u32)k[3] << 24));
b = b + (k[4] + ((__u32)k[5] << 8) + ((__u32)k[6] << 16) +
((__u32)k[7] << 24));
c = c + (k[8] + ((__u32)k[9] << 8) + ((__u32)k[10] << 16) +
((__u32)k[11] << 24));
mix(a, b, c);
k = k + 12;
len = len - 12;
}
/* handle the last 11 bytes */
c = c + length;
switch (len) { /* all the case statements fall through */
case 11:
c = c + ((__u32)k[10] << 24);
case 10:
c = c + ((__u32)k[9] << 16);
case 9:
c = c + ((__u32)k[8] << 8);
/* the first byte of c is reserved for the length */
case 8:
b = b + ((__u32)k[7] << 24);
case 7:
b = b + ((__u32)k[6] << 16);
case 6:
b = b + ((__u32)k[5] << 8);
case 5:
b = b + k[4];
case 4:
a = a + ((__u32)k[3] << 24);
case 3:
a = a + ((__u32)k[2] << 16);
case 2:
a = a + ((__u32)k[1] << 8);
case 1:
a = a + k[0];
/* case 0: nothing left to add */
}
mix(a, b, c);
return c;
}
3.2 osdmap->pg_to_up_acting_osds()
pg_to_up_acting_osds() 函数可以根据输入的 pg_t 计算出 up osd 组、主 up osd、acting osd 组、主 acting osd。注意,其中 acting 系列才是最终发送的 osd 目标。
通过分析 _pg_to_up_acting_osds() 方法,发现计算出最终的的 up osd 组和主 up osd,主要经过以下5步骤:
- _pg_to_raw_osds():根据 pg 位置进行 crush 运算,计算出一组 up osd 保存在 raw 中。
- _apply_unmap():启用 upmap,人为指定 pg 到 osd 的映射,得到一组新的 osd。用于均衡 osd 数据分布。
- _raw_to_up_osds():把 raw 数组转移到 up 数组,其中保存 osd 组。
- _pick_primary():从 up 数组中选出一个主 osd。
- _apply_primary_affinity():启用 up_primary ,重新选出主 osd。
其中 _pg_to_up_acting_osds() 实现将 pg 通过 crush 算法映射到 osd,根据 crush map 的规则计算出一组 osd,是 crush 算法的入口,也是 ceph 的最核心部分。下面将主要介绍此函数,另外四个函数不做具体分析。
void pg_to_up_acting_osds(pg_t pg, vector<int> *up, int *up_primary,
vector<int> *acting, int *acting_primary) const {
_pg_to_up_acting_osds(pg, up, up_primary, acting, acting_primary);
}
void OSDMap::_pg_to_up_acting_osds(
const pg_t& pg, vector<int> *up, int *up_primary,
vector<int> *acting, int *acting_primary,
bool raw_pg_to_pg) const
{
...
if (_acting.empty() || up || up_primary) {
_pg_to_raw_osds(*pool, pg, &raw, &pps);
_apply_upmap(*pool, pg, &raw);
_raw_to_up_osds(*pool, raw, &_up);
_up_primary = _pick_primary(_up);
_apply_primary_affinity(pps, *pool, &_up, &_up_primary);
...
}
_pg_to_raw_osds() 二次计算了 hash 种子,通过 raw_pg_to_pps() 方法。具体步骤,先根据之前计算的 m_seed 和 b_mask(可以认为等于 pg 数减一)取余(通过 ceph_stable_mod() 函数),即把 m_seed 定位到所有的 pg 中的一个,得到一个小于等于 pg 数量的值,然后再把这个值和 poolid 进行 rjenkins1 hash 运算,得到新的 hash 种子,并命名为 pps (placement ps)。
crush->find_rule() 获取 crush map 的编号。可以设置多组 crush map,并指定实际使用的 map。
crush->do_rule() 根据 pps,crush map,osd 比重,计算出 pg 到一组 osd 的映射。
_remove_nonexistent_osds() 删除不存在的 osd。
void OSDMap::_pg_to_raw_osds(
const pg_pool_t& pool, pg_t pg,
vector<int> *osds,
ps_t *ppps) const
{
// map to osds[]
//获取新的 hash 种子
ps_t pps = pool.raw_pg_to_pps(pg); // placement ps
unsigned size = pool.get_size();
//获取 crushmap
// what crush rule?
int ruleno = crush->find_rule(pool.get_crush_rule(), pool.get_type(), size);
if (ruleno >= 0)
//计算 osd 组
crush->do_rule(ruleno, pps, *osds, size, osd_weight, pg.pool());
_remove_nonexistent_osds(pool, *osds);
if (ppps)
*ppps = pps;
}
//取模运算,相当于:return (x % bmask) < b ? (x % bmask) : (x % (bamsk / 2));
static inline int ceph_stable_mod(int x, int b, int bmask)
{
//注意:x
if ((x & bmask) < b)
return x & bmask;
else
return x & (bmask >> 1);
}
do_rule() 中调用了 crush_do_rule(),这是真正实现 crush 算法的地方。
template<typename WeightVector>
void do_rule(int rule, int x, vector<int>& out, int maxout,
const WeightVector& weight,
uint64_t choose_args_index) const {
int rawout[maxout];
char work[crush_work_size(crush, maxout)];
crush_init_workspace(crush, work);
crush_choose_arg_map arg_map = choose_args_get_with_fallback(
choose_args_index);
//计算 osd 组,结果保存在 rawout,numrep 为
int numrep = crush_do_rule(crush, rule, x, rawout, maxout, &weight[0],
weight.size(), work, arg_map.args);
if (numrep < 0)
numrep = 0;
out.resize(numrep);
for (int i=0; i<numrep; i++)
out[i] = rawout[i];
}
3.2.1 crush_do_rule()
/**
* crush_do_rule - calculate a mapping with the given input and rule
* @map: the crush_map
* @ruleno: the rule id
* @x: hash input
* @result: pointer to result vector
* @result_max: maximum result size
* @weight: weight vector (for map leaves) // 叶子节点就是osd
* @weight_max: size of weight vector
* @cwin: Pointer to at least map->working_size bytes of memory or NULL.
*/
int crush_do_rule(const struct crush_map *map,
int ruleno, int x, int *result, int result_max,
const __u32 *weight, int weight_max,
void *cwin, const struct crush_choose_arg *choose_args)
{
int result_len;
struct crush_work *cw = cwin;
int *a = (int *)((char *)cw + map->working_size);
int *b = a + result_max;
int *c = b + result_max;
int *w = a;
int *o = b;
int recurse_to_leaf; // 是否递归到叶子节点
int wsize = 0;
int osize; // 当前step 选择出来的结果数量
int *tmp;
const struct crush_rule *rule;
__u32 step;
int i, j;
int numrep;
int out_size;
/*
* the original choose_total_tries value was off by one (it
* counted "retries" and not "tries"). add one.
* crush map 文件中的choose_total_tries变量是重试的次数,所以总次数需要+1
*/
int choose_tries = map->choose_total_tries + 1;
int choose_leaf_tries = 0;
/*
* the local tries values were counted as "retries", though,
* and need no adjustment
*/
int choose_local_retries = map->choose_local_tries;
int choose_local_fallback_retries = map->choose_local_fallback_tries;
int vary_r = map->chooseleaf_vary_r;
int stable = map->chooseleaf_stable;
if ((__u32)ruleno >= map->max_rules) {
dprintk(" bad ruleno %d\n", ruleno);
return 0;
}
rule = map->rules[ruleno];
result_len = 0;
// 这里开始循环执行rule的每一步
for (step = 0; step < rule->len; step++) {
int firstn = 0; // 是否使用 firstn 深度优先算法
const struct crush_rule_step *curstep = &rule->steps[step];
switch (curstep->op) {
case CRUSH_RULE_TAKE: // 当op 为 take的时候是没有arg2的
// 判断参数是否正确,bucket是否存在
if ((curstep->arg1 >= 0 &&
curstep->arg1 < map->max_devices) ||
(-1-curstep->arg1 >= 0 &&
-1-curstep->arg1 < map->max_buckets && // 这里可以看出 bucket的id 是有顺序的,从-1开始-n,存储在map中是0至于n-1,
map->buckets[-1-curstep->arg1])) { // The bucket found at __buckets[i]__ must have a crush_bucket.id == -1-i
w[0] = curstep->arg1; // arg1 就是bucket id, 就是root 的id ,作为下一step开始的点
wsize = 1;
} else {
dprintk(" bad take value %d\n", curstep->arg1);
}
break;
// CRUSH_RULE_SET_* 相关的参数都是用来设置crush 参数的
case CRUSH_RULE_SET_CHOOSE_TRIES:
if (curstep->arg1 > 0)
choose_tries = curstep->arg1;
break;
case CRUSH_RULE_SET_CHOOSELEAF_TRIES:
if (curstep->arg1 > 0)
choose_leaf_tries = curstep->arg1;
break;
case CRUSH_RULE_SET_CHOOSE_LOCAL_TRIES:
if (curstep->arg1 >= 0)
choose_local_retries = curstep->arg1;
break;
case CRUSH_RULE_SET_CHOOSE_LOCAL_FALLBACK_TRIES:
if (curstep->arg1 >= 0)
choose_local_fallback_retries = curstep->arg1;
break;
case CRUSH_RULE_SET_CHOOSELEAF_VARY_R:
if (curstep->arg1 >= 0)
vary_r = curstep->arg1;
break;
case CRUSH_RULE_SET_CHOOSELEAF_STABLE:
if (curstep->arg1 >= 0)
stable = curstep->arg1;
break;
case CRUSH_RULE_CHOOSELEAF_FIRSTN:
case CRUSH_RULE_CHOOSE_FIRSTN:
firstn = 1;
/* fall through */
case CRUSH_RULE_CHOOSELEAF_INDEP:
case CRUSH_RULE_CHOOSE_INDEP:
if (wsize == 0)
break;
// 带有CHOOSELEAF的操作都是要递归到子节点的
recurse_to_leaf =
curstep->op ==
CRUSH_RULE_CHOOSELEAF_FIRSTN ||
curstep->op ==
CRUSH_RULE_CHOOSELEAF_INDEP;
/* reset output */
osize = 0; // osize 当前step已经选出来的数量
for (i = 0; i < wsize; i++) {
int bno; // bucket id
numrep = curstep->arg1; // 这个numrep 是要选择的个数,可能为负数
if (numrep <= 0) {
numrep += result_max;
if (numrep <= 0)
continue;
}
j = 0;
/* make sure bucket id is valid */
bno = -1 - w[i];
if (bno < 0 || bno >= map->max_buckets) {
// w[i] is probably CRUSH_ITEM_NONE
dprintk(" bad w[i] %d\n", w[i]);
continue;
}
if (firstn) { // 如果使用的是 firstn 深度优先算法
int recurse_tries;
if (choose_leaf_tries)
recurse_tries =
choose_leaf_tries;
else if (map->chooseleaf_descend_once) // 这里一直都是设置为1的,因为会造成一些边界问题
recurse_tries = 1;
else
recurse_tries = choose_tries;
osize += crush_choose_firstn(
map,
cw,
map->buckets[bno],
weight, weight_max,
x, numrep,
curstep->arg2,
o+osize, j,
result_max-osize,
choose_tries,
recurse_tries,
choose_local_retries,
choose_local_fallback_retries,
recurse_to_leaf,
vary_r,
stable,
c+osize,
0,
choose_args);
} else {
out_size = ((numrep < (result_max-osize)) ?
numrep : (result_max-osize));
crush_choose_indep(
map,
cw,
map->buckets[bno],
weight, weight_max,
x, out_size, numrep,
curstep->arg2,
o+osize, j,
choose_tries,
choose_leaf_tries ?
choose_leaf_tries : 1,
recurse_to_leaf,
c+osize,
0,
choose_args);
osize += out_size;
}
}
if (recurse_to_leaf)
/* copy final _leaf_ values to output set */
memcpy(o, c, osize*sizeof(*o));
/* swap o and w arrays */
tmp = o;
o = w;
w = tmp; // 上一step输出的结果,作为下一step的开始,在上一步选择好的基础上在进行下一步的选择
wsize = osize;
break;
case CRUSH_RULE_EMIT:
for (i = 0; i < wsize && result_len < result_max; i++) {
result[result_len] = w[i];
result_len++;
}
wsize = 0;
break;
default:
dprintk(" unknown op %d at step %d\n",
curstep->op, step);
break;
}
}
return result_len;
}
可以看到,choose 步骤有两个函数,分别是用于复制算法的 crush_choose_firstn() 和 纠删码算法的 crush_choose_indep(),这里主要介绍 choose_firstn。crush_do_rule() 中实际就是解析了 crushmap 中的 rule 规则,并根据其设置的参数、步骤,按部就班的执行命令,算出 osd 组。
rule replicated_rule {
id 0
type replicated
min_size 1
max_size 10
step take default
step choose firstn 0 type osd
step emit
}
crush_choose_firstn() 调用 crush_bucket_choose() 选择需要的副本数,并对选择出来的 OSD 做了相关的冲突检查,如果冲突检查失效或者过载,继续选择新的 OSD,这里有个最大重试次数:local_retries。
/**
* crush_choose_firstn - choose numrep distinct items of given type
* @map: the crush_map
* @bucket: the bucket we are choose an item from
* @x: crush input value
* @numrep: the number of items to choose
* @type: the type of item to choose
* @out: pointer to output vector
* @outpos: our position in that vector
* @out_size: size of the out vector
* @tries: number of attempts to make
* @recurse_tries: number of attempts to have recursive chooseleaf make
* @local_retries: localized retries
* @local_fallback_retries: localized fallback retries
* @recurse_to_leaf: true if we want one device under each item of given type (chooseleaf instead of choose)
* @stable: stable mode starts rep=0 in the recursive call for all replicas
* @vary_r: pass r to recursive calls
* @out2: second output vector for leaf items (if @recurse_to_leaf)
* @parent_r: r value passed from the parent
*/
static int crush_choose_firstn(const struct crush_map *map,
struct crush_work *work,
const struct crush_bucket *bucket,
const __u32 *weight, int weight_max,
int x, int numrep, int type,
int *out, int outpos,
int out_size,
unsigned int tries,
unsigned int recurse_tries,
unsigned int local_retries,
unsigned int local_fallback_retries,
int recurse_to_leaf,
unsigned int vary_r,
unsigned int stable,
int *out2,
int parent_r,
const struct crush_choose_arg *choose_args)
{
int rep; // 计数器,用来记录已经选择的数量
unsigned int ftotal, flocal;
int retry_descent, retry_bucket, skip_rep;
const struct crush_bucket *in = bucket;
int r;
int i;
int item = 0;
int itemtype;
int collide, reject;
int count = out_size;
dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d tries %d \
recurse_tries %d local_retries %d local_fallback_retries %d \
parent_r %d stable %d\n",
recurse_to_leaf ? "_LEAF" : "",
bucket->id, x, outpos, numrep,
tries, recurse_tries, local_retries, local_fallback_retries,
parent_r, stable);
for (rep = stable ? 0 : outpos; rep < numrep && count > 0 ; rep++) {
/* keep trying until we get a non-out, non-colliding item */
ftotal = 0; // fail total 失败的总次数
skip_rep = 0; // 是否跳过这一次选择
do {
retry_descent = 0;
in = bucket; /* initial bucket */
/* choose through intervening buckets */
flocal = 0; // 当前bucket的选择重试的次数,局部重试次数
do {
collide = 0; // 判断是否有冲撞
retry_bucket = 0;
r = rep + parent_r; // 随机因子r
/* r' = r + f_total */
r += ftotal; // 如果选择失败,这里要加上失败次数再进行重试
/* bucket choose */
if (in->size == 0) {
reject = 1;
goto reject;
}
if (local_fallback_retries > 0 &&
flocal >= (in->size>>1) &&
flocal > local_fallback_retries)
item = bucket_perm_choose( // 这是一个后备选择算法,会记录之前冲突过的item,触发的条件比较苛刻
in, work->work[-1-in->id],
x, r);
else
item = crush_bucket_choose( // 这里从输入的bucket中选择一个item 出来,item 就是bucket的id 号
in, work->work[-1-in->id],
x, r,
(choose_args ? &choose_args[-1-in->id] : 0),
outpos);
if (item >= map->max_devices) { // 如果选出来的item id 比 devices个数还大肯定是错误的
dprintk(" bad item %d\n", item);
skip_rep = 1;
break;
}
/* desired type? */
if (item < 0) // bucket id 都是小于0的,如果不是那选出来的就是osd
itemtype = map->buckets[-1-item]->type;
else
itemtype = 0; // 不然的话就是osd 类型
dprintk(" item %d type %d\n", item, itemtype);
/* keep going? */
if (itemtype != type) { // 如果选出来的bucket type 跟预期的bucket type不一样
if (item >= 0 ||
(-1-item) >= map->max_buckets) {
dprintk(" bad item type %d\n", type);
skip_rep = 1;
break;
}
in = map->buckets[-1-item]; // 将刚刚找到的bucket作为下一次查找的输入(递归)
retry_bucket = 1; // 重新选择
continue;
}
// 到这一步证明找到的是目标类型的bucket或者osd,跟已经找到的进行对比,如果冲突那么需要重新查找
/* collision? */
for (i = 0; i < outpos; i++) {
if (out[i] == item) {
collide = 1; // 判断选择的是否冲突
break;
}
}
reject = 0;
if (!collide && recurse_to_leaf) { // 如果选出来的bucket不冲突,并且需要递归到叶节点osd
if (item < 0) { // 如果是bucket类型的
int sub_r;
if (vary_r)
sub_r = r >> (vary_r-1);
else
sub_r = 0;
if (crush_choose_firstn(
map,
work,
map->buckets[-1-item], // 注意这里入口变成了刚刚选出来的bucket
weight, weight_max,
x, stable ? 1 : outpos+1, 0,
out2, outpos, count,
recurse_tries, 0,
local_retries,
local_fallback_retries,
0,
vary_r,
stable,
NULL,
sub_r,
choose_args) <= outpos)
/* didn't get leaf */
reject = 1;
} else { // osd
/* we already have a leaf! */
out2[outpos] = item; // 这个是应用在需要递归到叶子节点的输出
}
}
if (!reject && !collide) {
/* out? */
if (itemtype == 0)
reject = is_out(map, weight, // 进行osd reweight 的再次过滤
weight_max,
item, x);
}
reject:
if (reject || collide) { // 如果‘冲突‘或者‘故障‘了,那就重新查找随机因子 r 递增
ftotal++;
flocal++;
if (collide && flocal <= local_retries) // 如果再当前bucket下重试次数还达到上限local_retries
/* retry locally a few times */
retry_bucket = 1;
else if (local_fallback_retries > 0 &&
flocal <= in->size + local_fallback_retries)
/* exhaustive bucket search */
retry_bucket = 1;
else if (ftotal < tries)
/* then retry descent */
retry_descent = 1;
else
/* else give up */
skip_rep = 1;
dprintk(" reject %d collide %d "
"ftotal %u flocal %u\n",
reject, collide, ftotal,
flocal);
}
} while (retry_bucket); // 在当前bucket下重试选择(局部重试),每一次都从头开始是很消耗资源的
} while (retry_descent); // 从最开始的bucket处开始重新选择(从头开始)
if (skip_rep) {
dprintk("skip rep\n");
continue;
}
dprintk("CHOOSE got %d\n", item);
out[outpos] = item;
outpos++;
count--;
#ifndef __KERNEL__
if (map->choose_tries && ftotal <= map->choose_total_tries)
map->choose_tries[ftotal]++;
#endif
}
dprintk("CHOOSE returns %d\n", outpos);
return outpos;
}
crush_bucket_choose() 是选择 bucket 的算法,有 uniform、list、tree、straw、straw2 这5种。一般都是 starw2 算法。
static int crush_bucket_choose(const struct crush_bucket *in,
struct crush_work_bucket *work,
int x, int r,
const struct crush_choose_arg *arg,
int position)
{
dprintk(" crush_bucket_choose %d x=%d r=%d\n", in->id, x, r);
BUG_ON(in->size == 0);
switch (in->alg) {
case CRUSH_BUCKET_UNIFORM:
return bucket_uniform_choose(
(const struct crush_bucket_uniform *)in,
work, x, r);
case CRUSH_BUCKET_LIST:
return bucket_list_choose((const struct crush_bucket_list *)in,
x, r);
case CRUSH_BUCKET_TREE:
return bucket_tree_choose((const struct crush_bucket_tree *)in,
x, r);
case CRUSH_BUCKET_STRAW:
return bucket_straw_choose(
(const struct crush_bucket_straw *)in,
x, r);
case CRUSH_BUCKET_STRAW2:
return bucket_straw2_choose(
(const struct crush_bucket_straw2 *)in,
x, r, arg, position);
default:
dprintk("unknown bucket %d alg %d\n", in->id, in->alg);
return in->items[0];
}
}
bucket_straw2_choose() 的输入参数中,x 为 pgid,r 为副本个数。具体实现如下:
- generate_exponential_distribution():对每个 item 计算 hash 值,并进行 ln 运算,把得到的结果除以自身权重 weight。
- hight_draw:比较 draw 值,并记录最大的 draw 值所对应的 bucket。
static int bucket_straw2_choose(const struct crush_bucket_straw2 *bucket,
int x, int r, const struct crush_choose_arg *arg,
int position)
{
unsigned int i, high = 0;
__s64 draw, high_draw = 0;
__u32 *weights = get_choose_arg_weights(bucket, arg, position);
__s32 *ids = get_choose_arg_ids(bucket, arg);
for (i = 0; i < bucket->h.size; i++) {
dprintk("weight 0x%x item %d\n", weights[i], ids[i]);
if (weights[i]) {
draw = generate_exponential_distribution(bucket->h.hash, x, ids[i], r, weights[i]);
} else {
draw = S64_MIN;
}
if (i == 0 || draw > high_draw) {
high = i;
high_draw = draw;
}
}
return bucket->h.items[high];
}
is_out() 通过 osd reweight 再次过滤,具体步骤上文有介绍,这里不再赘述。
static int is_out(const struct crush_map *map,
const __u32 *weight, int weight_max,
int item, int x)
{
if (item >= weight_max)
return 1;
if (weight[item] >= 0x10000)
return 0;
if (weight[item] == 0)
return 1;
if ((crush_hash32_2(CRUSH_HASH_RJENKINS1, x, item) & 0xffff)
< weight[item])
return 0;
return 1;
}