SQP拥塞控制算法介绍
一种为低延迟强交互视频流设计的拥塞控制算法, google paper:https://arxiv.org/pdf/2207.11857.pdf
在交互视频应用场景,需要传输高码率的视频率,并且保持极低的端到端时延,比如AR流和云游戏,SQP就是为这种场景而设计的拥塞控制算法。SQP采用基于帧的整形数据包来采样网络带宽,并用自适应的单向采样时延测量方法从网络队列中恢复,从而维持极低的队列时延。SQP快速适应网络带宽变化,确保高带宽利用率和低帧时延,并且当存在竞争流时也能在保证可接受的时延内拥有合理的带宽份额。SQP有不错的公平性,网络上有影子缓存时也工作得挺好。
低延时交互流媒体应用生成固定帧率的裸帧。视频码率由ABR(Adaptive bitrate algorithm 码率自适应算法)算法决定,而ABR算法又是由CCA(Congestion control algorithm,拥塞控制算法)产生的信号来决定, 从而管理帧时延、网络拥塞、带宽利用率。压缩的帧通过网络传输,然后在客户端设备上解码和显示。
低时延视频流架构:
SQP内部架构:
我是为低时延交互流媒体应用设计的拥塞控制算法, 比如云游戏,云VR等应用。 这一套拥塞控制算法的目标是: 1. 提供实时的带宽估计,以确保尽可能大的带宽利用率和尽可能低的端到端帧时延;2. 与其它基于网络列队的拥塞控制算法共存时,也可以提供有竞争力的吞吐量。
SQP算法实现
Feeback: 包级别的网络反馈 + 帧级别的网络反馈
#include "sqp_feedback_adaptor.h"
#include <algorithm>
#include <cstdlib>
#include "common/logger.h"
#include "common_time.h"
namespace BCC {
#define HISTORY_CACHE_MS 60000
#define SRTT_FACTOR 0.6
SQPFeedbackAdaptor::SQPFeedbackAdaptor() {
int i;
for (i = 0; i < FEEDBACK_RTT_WIN_SIZE; ++i) {
rtts_[i] = -1;
}
min_feedback_rtt_ = 10;
num_ = 0;
index_ = 0;
hist_ = new SQPSenderHistory(HISTORY_CACHE_MS);
}
SQPFeedbackAdaptor::~SQPFeedbackAdaptor() {
if (hist_) {
delete hist_;
hist_ = nullptr;
}
for (auto const& pair : frame_map_) {
delete pair.second;
}
frame_map_.clear();
}
void SQPFeedbackAdaptor::AddPacket(uint16_t seq, size_t size) {
PacketFeedBackItem packet;
packet.arrival_ts = -1;
packet.create_ts = packet.send_ts = GET_SYS_MS();
packet.payload_size = size;
packet.sequence_number = seq;
hist_->Add(&packet);
}
void SQPFeedbackAdaptor::AddFrame(uint32_t frame_idx, size_t packet_size, bool first_packet, bool last_packet) {
FrameFeedBackItem* frame_ptr;
int64_t now_ts = GET_SYS_MS();
if (frame_map_.find(frame_idx) != frame_map_.end()) {
frame_ptr = frame_map_[frame_idx];
}
else {
frame_ptr = new FrameFeedBackItem();
frame_ptr->frame_index = frame_idx;
frame_map_[frame_idx] = frame_ptr;
}
frame_ptr->frame_size += packet_size;
if (first_packet) {
frame_ptr->send_start_ts = now_ts;
}
if (last_packet) {
frame_ptr->send_end_ts = now_ts;
hist_->AddFrame(frame_ptr);
delete frame_ptr;
frame_map_.erase(frame_idx);
}
}
int SQPFeedbackAdaptor::OnFeedback(BCC_SQP::FeedBackMsgItem* msg) {
int32_t i = 0, feedback_rtt = 0;
int64_t now_ts = GET_SYS_MS();
int64_t delta_ts = 0;
feedback_rtt = -1;
num_ = 0;
for (i = 0; i < msg->samples_num; i++) {
//根据反馈的SEQ获取对应的报文发送信息,计算反馈RTT,更新报文到达时刻
if (hist_->Get(msg->samples[i].seq, &packets_[num_], 1) == 0) {
//计算反馈RTT
if (packets_[num_].send_ts > 0) {
feedback_rtt = (std::max)(now_ts - packets_[num_].send_ts, (int64_t)feedback_rtt);
rtts_[index_++ % FEEDBACK_RTT_WIN_SIZE] = feedback_rtt;
srtt_ = SRTT_FACTOR * srtt_ + (1 - SRTT_FACTOR) * feedback_rtt;
}
//更新到达的值
packets_[num_].arrival_ts = msg->samples[i].ts;
delta_ts = packets_[num_].arrival_ts - packets_[num_].send_ts;
if (new_min_one_way_delay_ == 0 || new_min_one_way_delay_ > delta_ts) {
new_min_one_way_delay_ = delta_ts;
}
num_++;
//更新时间窗内的最小one way delay的值
if (now_ts - last_one_way_delay_update_ts_ > srtt_ * 2) {
last_one_way_delay_update_ts_ = now_ts;
min_one_way_delay_ = new_min_one_way_delay_;
new_min_one_way_delay_ = 0;
}
}
}
frame_num_ = 0;
for (i = 0; i < msg->frame_samples_num; i++) {
//根据反馈的SEQ获取对应的报文发送信息,计算反馈RTT,更新报文到达时刻
if (hist_->GetFrame(msg->frame_samples[i].frame_index, &frames_[frame_num_], 1) == 0) {
//更新到达的值
frames_[frame_num_].arrival_start_ts = msg->frame_samples[i].arrival_start_ts;
frames_[frame_num_].arrival_end_ts = msg->frame_samples[i].arrival_end_ts;
if (max_frame_size_ < frames_[frame_num_].frame_size) {
max_frame_size_ = frames_[frame_num_].frame_size;
}
frame_num_++;
}
}
//更新报文与反馈的rtt最小值
if (feedback_rtt > 0) {
min_feedback_rtt_ = rtts_[0];
for (i = 1; i < FEEDBACK_RTT_WIN_SIZE; i++) {
if (min_feedback_rtt_ > rtts_[i] && rtts_[i] > 0) {
min_feedback_rtt_ = rtts_[i];
LOGD("[bcc][feedback] min feed back rtt update {}", min_feedback_rtt_);
}
}
}
//进行按到达时间的先后顺序进行排序
FeedbackQsort();
FrameFeedbackQsort();
return num_;
}
} // namespace BCC
带宽估计和发送速率控制:
#include "sqp_congestion_control.h"
#include "common/logger.h"
namespace BCC {
SQPCongestionControl::SQPCongestionControl(uint32_t min_bitrate, uint32_t max_bitrate) {
min_bitrate_ = min_bitrate;
max_bitrate_ = max_bitrate;
bandwidth_ = min_bitrate_;
}
SQPCongestionControl::~SQPCongestionControl() {
}
void SQPCongestionControl::UpdateBandwidthEstimator(uint32_t frame_size, uint32_t max_frame_size, int64_t frame_send_start_ts,
int64_t frame_send_end_ts, int64_t frame_recv_start_ts, int64_t frame_recv_end_ts, int32_t one_way_delay) {
double bandwidth_sample = 0.0f;
double gama = ((double)max_frame_size) / frame_size;
if (frame_recv_end_ts == frame_send_start_ts && frame_recv_end_ts == frame_recv_start_ts) {
return;
}
bandwidth_sample = frame_size * 8 * gama * 1000 / (frame_recv_end_ts - frame_send_start_ts - one_way_delay + (frame_recv_end_ts - frame_recv_start_ts) * (gama - 1));
bandwidth_sample *= T_;
if (bandwidth_ <= 0) {
bandwidth_ = bandwidth_sample;
}
else {
bandwidth_ = bandwidth_ + delta_ * (r_ * (bandwidth_sample / bandwidth_ - 1) - (bandwidth_ / bandwidth_sample - 1));
}
}
uint32_t SQPCongestionControl::ComputePacingRate() {
return (uint32_t)(bandwidth_ * m_);
return 0;
}
};
对比实验:
-
采用panthoen实验平台
-
安装panthoen: 以下三个主要组件
Local 和 remote两种网络模式
-
编译llama-sqp生成sender和receiver, 拷贝到panthoen/third_party/llama_sqp下
修改配置:
- 生成src/wrappers/llama_sqp.py:
#!/usr/bin/env python
'''REMOVE ME: Example file to add a new congestion control scheme.
Use Python 2.7 and conform to PEP8.
Use snake_case as file name and make this file executable.
'''
from os import path
from subprocess import check_call
import arg_parser
import context
def main():
# use 'arg_parser' to ensure a common test interface
args = arg_parser.receiver_first() # or 'arg_parser.sender_first()'
# paths to the sender and receiver executables, etc.
cc_repo = path.join(context.third_party_dir, 'llama_sqp')
send_src = path.join(cc_repo, 'transport_sender')
recv_src = path.join(cc_repo, 'transport_receiver')
# [optional] dependencies of Debian packages
if args.option == 'deps':
print 'example_dep_1 example_dep_2'
return
# [optional] persistent setup that only needs to be run once
if args.option == 'setup':
# avoid running as root here
return
# [optional] non-persistent setup that should be performed on every reboot
if args.option == 'setup_after_reboot':
# avoid running as root here
return
# [required] run the first side on port 'args.port'
if args.option == 'receiver':
cmd = [recv_src, args.port]
check_call(cmd)
return
# [required] run the other side to connect to the first side on 'args.ip'
if args.option == 'sender':
cmd = [send_src, args.ip, args.port]
check_call(cmd)
return
if __name__ == '__main__':
main()
配置src/config.yml添加llama_sqp
运行实验:
src/experiments/setup.py --install-deps --schemes "bbr copa cubic vivace llama_bcc llama_sqp"
src/experiments/setup.py --schemes "bbr copa cubic vivace llama_bcc llama_sqp"
src/experiments/test.py local --schemes "bbr copa cubic vivace llama_bcc llama_sqp"
src/analysis/analyze.py --data-dir src/experiments/data/
实验报告:
-
本地网络实验:
Llama-SQP时延最低,但吞吐量只有72.3%,远没有利用好带宽, 待改进。
以下是详细的实验数据:
- 远程网络实验