前言
- 在WebRTC GCC带宽探测原理(一)一文中主要介绍了
ProbeController
模块,该模块主要是根据不同的场景,判断当前是否需要进行带宽探测,并生成ProbeClusterConfig
源信号。 - 而最终
ProbeClusterConfig
信息回经Pacing
模块经由BitrateProber
模块处理,将当前需要发送的包标识成探测包,并进行发送。 - 在
Pacing
模块中存在一个BitrateProber prober_
的成员变量,专门用来处理带宽探测。 - 本文的重点就是分析
BitrateProber
模块。
认识ProbeCluster结构
struct PacedPacketInfo {
PacedPacketInfo();
PacedPacketInfo(int probe_cluster_id,
int probe_cluster_min_probes,
int probe_cluster_min_bytes);
bool operator==(const PacedPacketInfo& rhs) const;
// TODO(srte): Move probing info to a separate, optional struct.
static constexpr int kNotAProbe = -1;
// 当前探测簇的目标探测码率
int send_bitrate_bps = -1;
// 当前探测簇的探测id
int probe_cluster_id = kNotAProbe;
// 当前探测簇最小探测包个数,默认最小5个
int probe_cluster_min_probes = -1;
// 当前探测簇最小探测的字节数 = 目标探测码率 * 目标探测时长(默认15ms)
int probe_cluster_min_bytes = -1;
// 当前探测簇实际发送的字节数
int probe_cluster_bytes_sent = 0;
};
struct ProbeCluster {
PacedPacketInfo pace_info;
// 当前探测簇实际发送的包个数
int sent_probes = 0;
// 当前探测簇实际发送的字节数
int sent_bytes = 0;
//ProbeController模块生成ProbeClusterConfig的时间
Timestamp requested_at = Timestamp::MinusInfinity();
//当前探测簇发送的时间
Timestamp started_at = Timestamp::MinusInfinity();
int retries = 0;
};
ProbeCluster创建
BitrateProber
模块会根据ProbeController
模块生成的ProbeClusterConfig
结构生成对应的ProbeCluster
结构信息。-
其实现流程如下:
代码实现如下:
void PacingController::CreateProbeClusters(
rtc::ArrayView<const ProbeClusterConfig> probe_cluster_configs) {
for (const ProbeClusterConfig probe_cluster_config : probe_cluster_configs) {
prober_.CreateProbeCluster(probe_cluster_config);
}
}
- 循环遍历
ProbeController
模块生成的源信息,调用BitrateProber::CreateProbeCluster
创建ProbeCluster
结构信息。
void BitrateProber::CreateProbeCluster(
const ProbeClusterConfig& cluster_config) {
RTC_DCHECK(probing_state_ != ProbingState::kDisabled);
// 当新生成一个探测簇的时候,遍历clusters_队列,如果对头对应的探测簇的生成时间距离现在已经超过5秒则认为已经超时,则进行移除
// 同时控制探测簇队列的大小在5个以内
while (!clusters_.empty() &&
(cluster_config.at_time - clusters_.front().requested_at >
kProbeClusterTimeout ||
clusters_.size() > kMaxPendingProbeClusters)) {
clusters_.pop();
}
// 构造ProbeCluster
ProbeCluster cluster;
// 设置当前探测簇生成时间
cluster.requested_at = cluster_config.at_time;
// 设置当前探测簇最小探测包数量(默认5个)
cluster.pace_info.probe_cluster_min_probes =
cluster_config.target_probe_count;
// 设置当前探测簇最小探测需要发送的字节数 = 目标探测码率 * 目标探测时长(默认15ms)
cluster.pace_info.probe_cluster_min_bytes =
(cluster_config.target_data_rate * cluster_config.target_duration)
.bytes();
RTC_DCHECK_GE(cluster.pace_info.probe_cluster_min_bytes, 0);
// 设置当前探测簇目标探测码率
cluster.pace_info.send_bitrate_bps = cluster_config.target_data_rate.bps();
// 设置当前探测簇ID
cluster.pace_info.probe_cluster_id = cluster_config.id;
// 插入到队列
clusters_.push(cluster);
// 这个地方看上去只有队列为空的时候才会存在,但到该步骤队列不可能为空了,所以默认这个条件不可能成立?
// 这里是如果是当前模块已经是可以设置成kActive状态了,则设置probing_state_为ProbingState::kActive
// 初始状态为kInactive,在该模块的构造函数进行了设置
if (ReadyToSetActiveState(/*packet_size=*/DataSize::Zero())) {
next_probe_time_ = Timestamp::MinusInfinity();
probing_state_ = ProbingState::kActive;
}
.....
}
-
ProbeController
每次可能会生成多个探测源数据,其中每个源数据ProbeClusterConfig
对应一个探测簇(ProbeCluster
),而每个探测簇在pacing
发送的时候会发送一簇(组)包,按照最小探测时长、最小探测包数量、最小探测字节数进行探测包发送。
BitrateProber状态设置
- 只有当
BitrateProber
模块的状态为ProbingState::kActive
的时候才能发送探测包。 - 该模块的状态更新如下:
void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
....
prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));
....
}
- 当各发送模块向
Pacing
模块提交数据的时候,会调用BitrateProber::OnIncomingPacket
来更新BitrateProber
的状态。
void BitrateProber::OnIncomingPacket(DataSize packet_size) {
if (ReadyToSetActiveState(packet_size)) {
next_probe_time_ = Timestamp::MinusInfinity();
probing_state_ = ProbingState::kActive;
}
}
- 入参为当前
Rtp
包的大小,然后通过函数ReadyToSetActiveState()
来决策是否将当前模块的状态设置为ProbingState::kActive
。
bool BitrateProber::ReadyToSetActiveState(DataSize packet_size) const {
// 如果队列为空直接返回false
if (clusters_.empty()) {
RTC_DCHECK(probing_state_ == ProbingState::kDisabled ||
probing_state_ == ProbingState::kInactive);
return false;
}
// 如果当前状态为ProbingState::kDisabled或者ProbingState::kActive返回false
switch (probing_state_) {
case ProbingState::kDisabled:
case ProbingState::kActive:
return false;
// 只有在ProbingState::kInactive的情况下才会进行判断,默认状态就是这个
case ProbingState::kInactive:
// If config_.min_packet_size > 0, a "large enough" packet must be sent
// first, before a probe can be generated and sent. Otherwise, send the
// probe asap.
// config_.min_packet_size的默认值为200Byte
return packet_size >=
std::min(RecommendedMinProbeSize(), config_.min_packet_size.Get());
}
}
- 而
RecommendedMinProbeSize()
最小推进的探测字节数实现如下:
DataSize BitrateProber::RecommendedMinProbeSize() const {
if (clusters_.empty()) {
return DataSize::Zero();
}
// 计算最小探测字节数=探测目标码率 * 最小探测间隔2ms所对应的字节数
// 假设探测码率为20Mbps=20000000bps,则推进的最小探测字节数量为10000Byte(字节)
DataRate send_rate =
DataRate::BitsPerSec(clusters_.front().pace_info.send_bitrate_bps);
return send_rate * config_.min_probe_delta/*最小探测间隔2ms*/;
}
- 综上所述意思就是,通过传入实际
Rtp
包的数据大小,在结合当前已有的探测簇队列的首个探测簇新对应的探测码率和最小探测间隔算出最小的探测字节数。 - 如果实际
Rtp
包的数据大小大于最小探测间隔内的探测字节数(意思就是这个rtp
包是否能扮演探测包需要满足一定的大小?),则可将该模块设置成kActive
状态。 - 超过
200
字节的包看上去都能满足。
Pacing模块发送探测包
void PacingController::ProcessPackets() {
....
PacedPacketInfo pacing_info;
DataSize recommended_probe_size = DataSize::Zero();
bool is_probing = prober_.is_probing();
if (is_probing) {
// Probe timing is sensitive, and handled explicitly by BitrateProber, so
// use actual send time rather than target.
// 1) 获取目标探测信息
pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {
// 2) 根据探测簇信息得到推介探测字节数大小
recommended_probe_size = prober_.RecommendedMinProbeSize();
RTC_DCHECK_GT(recommended_probe_size, DataSize::Zero());
} else {
// No valid probe cluster returned, probe might have timed out.
is_probing = false;
}
}
DataSize data_sent = DataSize::Zero();
int iteration = 0;
int packets_sent = 0;
int padding_packets_generated = 0;
for (; iteration < circuit_breaker_threshold_; ++iteration) {
....
if (is_probing) {
pacing_info.probe_cluster_bytes_sent += packet_size.bytes();
// If we are currently probing, we need to stop the send loop when we
// have reached the send target.
// 3) 每个包发送后会进行data_sent累加,当总发的字节数大于等于本次推介的探测字节数的时候,则结束发送
if (data_sent >= recommended_probe_size) {
break;
}
}
.....
}
}
if (is_probing) {
probing_send_failure_ = data_sent == DataSize::Zero();
if (!probing_send_failure_) {
//4) 告诉BitrateProber模块本次探测发送的字节数
prober_.ProbeSent(CurrentTime(), data_sent);
}
}
.....
}
- 探测包发送的逻辑相对比较简单。
- 1)首先通过
BitrateProber::CurrentCluster(now)
得到探测簇信息,就是当前需要探测的目标信息,并 - 2)如果步骤1能成功得到探测目标信息,则通过
BitrateProber::RecommendedMinProbeSize()
函数获取目标探测的推介发送的字节数量大小,记作为recommended_probe_size
,该计算在上节中已经进行了分析说明。 - 3)开始进行发送报文,并将报文标识成探测包,每次发送成功后会对
data_sent
累加操作,当data_sent
大于等于recommended_probe_size
的时候结束发送。 - 4)调用
BitrateProber::ProbeSent()
告诉BitrateProber
模块本次实际发送的数据量。
CurrentCluster获取探测信息
absl::optional<PacedPacketInfo> BitrateProber::CurrentCluster(Timestamp now) {
// 为空或者状态不对直接返回nullopt
if (clusters_.empty() || probing_state_ != ProbingState::kActive) {
return absl::nullopt;
}
// 如果next_probe_time_为有限值,这里对应的是上次计算出来的本次应要探测的时间戳
// 并且当前时间(pacing模块即将发包的时间戳) - 上次计算出来的本次应要探测的时间戳 > 10ms
// 则说明探测超时了,此时需要清除队列头部,并设置状态为kInactive
if (next_probe_time_.IsFinite() &&
now - next_probe_time_ > config_.max_probe_delay.Get()/*默认10Ms*/) {
RTC_DLOG(LS_WARNING) << "Probe delay too high"
" (next_ms:"
<< next_probe_time_.ms() << ", now_ms: " << now.ms()
<< "), discarding probe cluster.";
clusters_.pop();
if (clusters_.empty()) {
probing_state_ = ProbingState::kInactive;
return absl::nullopt;
}
}
// 得到首个探测簇目标信息。
PacedPacketInfo info = clusters_.front().pace_info;
info.probe_cluster_bytes_sent = clusters_.front().sent_bytes;
return info;
}
- 那么
next_probe_time_
的计算机制又是什么?该值直接决定了首个探测信号需要在什么时候发送探测包,并且逾期多长时间后将为认为是超时。
ProbeSent更新探测信息
void BitrateProber::ProbeSent(Timestamp now, DataSize size) {
RTC_DCHECK(probing_state_ == ProbingState::kActive);
RTC_DCHECK(!size.IsZero());
if (!clusters_.empty()) {
// 得到队首探测簇信息
ProbeCluster* cluster = &clusters_.front();
// 更新探测簇对应的探测包发送时间,这个后面需要用于计算探测码率
if (cluster->sent_probes == 0) {
RTC_DCHECK(cluster->started_at.IsInfinite());
cluster->started_at = now;
}
// 更新本次探测总共发了多少字节
cluster->sent_bytes += size.bytes<int>();
// 更新本次探测簇信息的探测次数累加1
cluster->sent_probes += 1;
// 计算下一次发送探测包的时间
next_probe_time_ = CalculateNextProbeTime(*cluster);
// 如果满足下面条件,则移除当前探测簇,需要结束本探测簇,比如说探测次数达到5次
// 每次会发一组探测包,该探测簇对应的已经发了5组包了
if (cluster->sent_bytes >= cluster->pace_info.probe_cluster_min_bytes &&
cluster->sent_probes >= cluster->pace_info.probe_cluster_min_probes) {
clusters_.pop();
}
// 如果队列为空则设置状态为kInactive
if (clusters_.empty()) {
probing_state_ = ProbingState::kInactive;
}
}
}
- 当
Pacing
模块发送完一组探测包后,通过上述函数告诉BitrateProber
模块,对BitrateProber
模块进行更新。 - 主要是对首个探测簇新进行更新,如本次探测发送了多少的数据量,以便用于
tcc feedback
后计算探测码率。 - 另外乳沟首个探测簇已经不满足再发送探测包的条件了,则进行移除。
- 同时这里有个十分重要的操作就是调用
CalculateNextProbeTime()
函数计算下一次探测包发送的时间戳,Pacing
模块每发送一组数据后,需要计算一次发包的时间,这里也一样,当BitrateProber
有探测任务的情况下,优先会获取下一组探测包发送的时间来作为下一次Pacing
模块下一次发送的时间节点。
CalculateNextProbeTime计算下一次探测包发送时间
Timestamp BitrateProber::CalculateNextProbeTime(
const ProbeCluster& cluster) const {
RTC_CHECK_GT(cluster.pace_info.send_bitrate_bps, 0);
RTC_CHECK(cluster.started_at.IsFinite());
// Compute the time delta from the cluster start to ensure probe bitrate stays
// close to the target bitrate. Result is in milliseconds.
// 当前探测簇已经发送的字节数
DataSize sent_bytes = DataSize::Bytes(cluster.sent_bytes);
// 当前探测簇的探测目标码率
DataRate send_bitrate =
DataRate::BitsPerSec(cluster.pace_info.send_bitrate_bps);
// 针对当前探测簇算出一个假设是探测目标码率作为发送码率的情况下需要发送多久
TimeDelta delta = sent_bytes / send_bitrate;
// 计算下一次的发送时间=总共发送多少数据/发送速率 + 该探测簇首次发送探测包的时间
return cluster.started_at + delta;
}
总结
-
BitrateProber
模块主要是借助Pacing
模块对每一个探测簇完成探测包的发送。 - 每一个探测簇每次都会发送一组探测包,可以发送多次,当然最多是发送
5
次,并且每次之间的数据发送间隔不能超过10ms
,如果超过10
ms会被认为超时。 - 其他该模块的实现从代码的角度来看,相对不难。
- 接下来需要分析的是基于
twcc feedback
对探测码率的计算。