vhost-net介绍
virtio架构
virtio表示虚拟化IO,用于实现设备半虚拟化,即虚拟机中运行的操作系统需要加载特殊的驱动(e.g. virtio-net)且虚拟机知道自己是虚拟机,相较于基于完全模拟的全虚拟化,基于virtio的半虚拟化可以提升设备访问性能。
运行在虚拟机中的部分称为前端驱动,负责对虚拟机提供统一的接口;运行在宿主机中的部分称为后端驱动,负责适配不同的物理硬件设备。
virtio_net/vhost_net 是一套网络半虚拟化驱动 + 设备的方案,virtio_net是前端驱动,运行在guest中;vhost_net 是后端驱动,运行在宿主机内核中。
vhost_net在内核中有两个比较重要文件,vhost.c和vhost-net.c。其中前者实现的是脱离具体功能的vhost核心实现,后者实现网络方面的功能。
vhost-net注册为misc device,其file_operations 为 vhost_net_fops。
static const struct file_operations vhost_net_fops = {
.owner = THIS_MODULE,
.release = vhost_net_release,
.read_iter = vhost_net_chr_read_iter,
.write_iter = vhost_net_chr_write_iter,
.poll = vhost_net_chr_poll,
.unlocked_ioctl = vhost_net_ioctl,
#ifdef CONFIG_COMPAT
.compat_ioctl = vhost_net_compat_ioctl,
#endif
.open = vhost_net_open,
.llseek = noop_llseek,
};
static struct miscdevice vhost_net_misc = {
.minor = VHOST_NET_MINOR,
.name = "vhost-net",
.fops = &vhost_net_fops,
};
static int vhost_net_init(void)
{
if (experimental_zcopytx)
vhost_net_enable_zcopy(VHOST_NET_VQ_TX);
return misc_register(&vhost_net_misc);
}
qemu的代码中,创建tap设备时会调用到net_init_tap()函数。net_init_tap()其中会检查选项是否指定vhost=on,如果指定,则会调用到vhost_net_init()进行初始化,其中通过open(“/dev/vhost-net”, O_RDWR)打开了vhost-net driver;并通过ioctl(vhost_fd)进行了一系列的初始化。而open(“/dev/vhost-net”, O_RDWR),则会调用到vhost-net驱动的vhost_net_fops->open函数,即vhost_net_openc初始化 vhost设备。
struct vhost_net {
struct vhost_dev dev;
struct vhost_net_virtqueue vqs[VHOST_NET_VQ_MAX];
struct vhost_poll poll[VHOST_NET_VQ_MAX];
/* Number of TX recently submitted.
* Protected by tx vq lock. */
unsigned tx_packets;
/* Number of times zerocopy TX recently failed.
* Protected by tx vq lock. */
unsigned tx_zcopy_err;
/* Flush in progress. Protected by tx vq lock. */
bool tx_flush;
};
struct vhost_dev {
/* Readers use RCU to access memory table pointer
* log base pointer and features.
* Writers use mutex below.*/
struct vhost_memory __rcu *memory;
struct mm_struct *mm;
struct mutex mutex;
unsigned acked_features;
struct vhost_virtqueue **vqs;
int nvqs;
struct file *log_file;
struct eventfd_ctx *log_ctx;
spinlock_t work_lock;
struct list_head work_list;
struct task_struct *worker;
};
struct vhost_net_virtqueue {
struct vhost_virtqueue vq;
/* hdr is used to store the virtio header.
* Since each iovec has >= 1 byte length, we never need more than
* header length entries to store the header. */
struct iovec hdr[sizeof(struct virtio_net_hdr_mrg_rxbuf)];
size_t vhost_hlen;
size_t sock_hlen;
/* vhost zerocopy support fields below: */
/* last used idx for outstanding DMA zerocopy buffers */
int upend_idx;
/* first used idx for DMA done zerocopy buffers */
int done_idx;
/* an array of userspace buffers info */
struct ubuf_info *ubuf_info;
/* Reference counting for outstanding ubufs.
* Protected by vq mutex. Writers must also take device mutex. */
struct vhost_net_ubuf_ref *ubufs;
};
/* The virtqueue structure describes a queue attached to a device. */
struct vhost_virtqueue {
struct vhost_dev *dev;
/* The actual ring of buffers. */
struct mutex mutex;
unsigned int num;
struct vring_desc __user *desc;
struct vring_avail __user *avail;
struct vring_used __user *used;
struct file *kick;
struct file *call;
struct file *error;
struct eventfd_ctx *call_ctx;
struct eventfd_ctx *error_ctx;
struct eventfd_ctx *log_ctx;
struct vhost_poll poll;
/* The routine to call when the Guest pings us, or timeout. */
vhost_work_fn_t handle_kick;
/* Last available index we saw. */
u16 last_avail_idx;
/* Caches available index value from user. */
u16 avail_idx;
/* Last index we used. */
u16 last_used_idx;
/* Used flags */
u16 used_flags;
/* Last used index value we have signalled on */
u16 signalled_used;
/* Last used index value we have signalled on */
bool signalled_used_valid;
/* Log writes to used structure. */
bool log_used;
u64 log_addr;
struct iovec iov[UIO_MAXIOV];
struct iovec *indirect;
struct vring_used_elem *heads;
/* We use a kind of RCU to access private pointer.
* All readers access it from worker, which makes it possible to
* flush the vhost_work instead of synchronize_rcu. Therefore readers do
* not need to call rcu_read_lock/rcu_read_unlock: the beginning of
* vhost_work execution acts instead of rcu_read_lock() and the end of
* vhost_work execution acts instead of rcu_read_unlock().
* Writers use virtqueue mutex. */
void __rcu *private_data;
/* Log write descriptors */
void __user *log_base;
struct vhost_log *log;
};
struct vhost_poll {
poll_table table;
wait_queue_head_t *wqh;
wait_queue_entry_t wait;
struct vhost_work work;
__poll_t mask;
struct vhost_dev *dev;
};
主要数据结构
这里有另一章节从数据结构出发的代码解析:数据结构的详细解析
struct vhost_net:用于描述Vhost-Net设备。它包含几个关键字段:1)struct vhost_dev,通用的vhost设备,可以类比struct device结构体内嵌在其他特定设备的结构体中;2)struct vhost_net_virtqueue,实际上对struct vhost_virtqueue进行了封装,用于网络包的数据传输;3)struct vhost_poll,用于socket的poll,以便在数据包接收与发送时进行任务调度;
struct vhost_dev:描述通用的vhost设备,可内嵌在基于vhost机制的其他设备结构体中,比如struct vhost_net,struct vhost_scsi等。关键字段如下:1)vqs指针,指向已经分配好的struct vhost_virtqueue,对应数据传输;2)work_list,任务链表,用于放置需要在vhost_worker内核线程上执行的任务;3)worker,用于指向创建的内核线程,执行任务列表中的任务;
vhost_net_virtqueue: 用于描述Vhost-Net设备对应的virtqueue,封装的struct vhost_virtqueue。
struct vhost_virtqueue:用于描述vhost设备对应的virtqueue,这部分内容可以参考之前virtqueue机制分析,本质上是将Qemu中virtqueue处理机制下沉到了Kernel中。
一个VM即一个qemu进程可以有多个vhost_net和vhost_dev,而一个vhost_dev对应一对收发队列以及一个vhost内核线程,站在vm的角度,一个vm接口可能包含多个vhost_net和vhost_dev。
这里有另一章节,网卡多队列
vhost_net设备初始化 vhost_net_open
创建vhost_net,完成一系列初始化,vhost_net 和 vhost_net_virtqueue 是描述vhost-net设备的,vhost_dev和vhost_virtqueue则用于通用的vhost设备,在vhost_dev_init中完成vhost_dev的初始化以及和vhost_virtqueue关联(vhost_dev的vhost_virtqueue指向vhost_net的vhost_virtqueue)。
初始化vhost_poll,理解vhost poll机制对读懂vhost_net 实现非常重要,见数据结构章节vhost_poll的介绍,vhost_net报文收发,前后端事件通知都需要vhost_poll机制;
关联file和vhost_net,file->private_data=vhost_net。
static int vhost_net_open(struct inode *inode, struct file *f)
{
struct vhost_net *n;
struct vhost_dev *dev;
struct vhost_virtqueue **vqs;
void **queue;
int i;
n = kvmalloc(sizeof *n, GFP_KERNEL | __GFP_RETRY_MAYFAIL);
if (!n)
return -ENOMEM;
vqs = kmalloc_array(VHOST_NET_VQ_MAX, sizeof(*vqs), GFP_KERNEL);
if (!vqs) {
kvfree(n);
return -ENOMEM;
}
queue = kmalloc_array(VHOST_RX_BATCH, sizeof(void *),
GFP_KERNEL);
if (!queue) {
kfree(vqs);
kvfree(n);
return -ENOMEM;
}
n->vqs[VHOST_NET_VQ_RX].rxq.queue = queue;
dev = &n->dev;
vqs[VHOST_NET_VQ_TX] = &n->vqs[VHOST_NET_VQ_TX].vq;
vqs[VHOST_NET_VQ_RX] = &n->vqs[VHOST_NET_VQ_RX].vq;
n->vqs[VHOST_NET_VQ_TX].vq.handle_kick = handle_tx_kick;
n->vqs[VHOST_NET_VQ_RX].vq.handle_kick = handle_rx_kick;
for (i = 0; i < VHOST_NET_VQ_MAX; i++) {
n->vqs[i].ubufs = NULL;
n->vqs[i].ubuf_info = NULL;
n->vqs[i].upend_idx = 0;
n->vqs[i].done_idx = 0;
n->vqs[i].vhost_hlen = 0;
n->vqs[i].sock_hlen = 0;
n->vqs[i].rx_ring = NULL;
vhost_net_buf_init(&n->vqs[i].rxq);
}
vhost_dev_init(dev, vqs, VHOST_NET_VQ_MAX);
vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, EPOLLOUT, dev);
vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, EPOLLIN, dev);
f->private_data = n;
return 0;
}
void vhost_dev_init(struct vhost_dev *dev,
struct vhost_virtqueue **vqs, int nvqs)
{
struct vhost_virtqueue *vq;
int i;
dev->vqs = vqs;
dev->nvqs = nvqs;
mutex_init(&dev->mutex);
dev->log_ctx = NULL;
dev->umem = NULL;
dev->iotlb = NULL;
dev->mm = NULL;
dev->worker = NULL;
init_llist_head(&dev->work_list);
init_waitqueue_head(&dev->wait);
INIT_LIST_HEAD(&dev->read_list);
INIT_LIST_HEAD(&dev->pending_list);
spin_lock_init(&dev->iotlb_lock);
for (i = 0; i < dev->nvqs; ++i) {
vq = dev->vqs[i];
vq->log = NULL;
vq->indirect = NULL;
vq->heads = NULL;
vq->dev = dev;
mutex_init(&vq->mutex);
vhost_vq_reset(dev, vq);
if (vq->handle_kick)
vhost_poll_init(&vq->poll, vq->handle_kick,
EPOLLIN, dev);
}
}
/* Init poll structure */
void vhost_poll_init(struct vhost_poll *poll, vhost_work_fn_t fn,
__poll_t mask, struct vhost_dev *dev)
{
init_waitqueue_func_entry(&poll->wait, vhost_poll_wakeup);
init_poll_funcptr(&poll->table, vhost_poll_func);
poll->mask = mask;
poll->dev = dev;
poll->wqh = NULL;
vhost_work_init(&poll->work, fn);
}
vhost_net和tap的关系
上面提到理解vhost poll机制对读懂vhost_net 实现非常重要,vhost_net报文收发,前后端事件通知都需要vhost_poll机制,这里就从vhost_net_open 调用的 vhost_poll_init 函数中挂载了几个较为重要的函数入手:
vhost_poll_func
vhost_poll_func函数的作用是将vhost_net 或者 vhost_virtqueue的等待实体挂载到tap socket或者eventfd文件的等待队列中,以便在后续他们收到报文或者信号后唤醒vhost线程处理报文。
涉及到两个流程:
1)VHOST_NET_SET_BACKEND
设置vhost_net和tap接口的关联,并将vhost_net 的vhost_poll等待实体加入tap socket等待队列;
在创建vm添加网卡的时候,会指定对应tap设备和vhost设备的fd,qemu通过VHOST_NET_SET_BACKEND 将 tap的文件 fd传入到内核vhost中,关联vhost和tap。
-netdev tap,fd=41,id=hostnet0,vhost=on,vhostfd=42 -device virtio-net-pci,host_mtu=1500,netdev=hostnet0,id=net0,mac=fa:cc:74:ee:04:00,
VHOST_NET_SET_BACKEND流程如上图,主要实现:
- vhost_net_set_backend 中设置 vhost_virtqueue 的私有数据为 tap 的socket;
- vhost_poll_start 调用tap file的tun_chr_poll,最终调用 vhost_poll_func,完成将vhost_net.vhost_poll->wait(等待实体)挂载到tap设备 tun_file.sock_wq 上,这样tap设备socket上收发报文就可以通过遍历 tun_file->wq 上的等待队列唤醒vhost线程处理报文。
static long vhost_net_ioctl(struct file *f, unsigned int ioctl,
unsigned long arg)
{
......
switch (ioctl) {
case VHOST_NET_SET_BACKEND:
if (copy_from_user(&backend, argp, sizeof backend))
return -EFAULT;
// qemu传入 tap fd,index表示发送还是接收,用来定位 vhost_virtqueue
return vhost_net_set_backend(n, backend.index, backend.fd);
......
}
static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd)
{
struct socket *sock, *oldsock;
struct vhost_virtqueue *vq;
struct vhost_net_virtqueue *nvq;
struct vhost_net_ubuf_ref *ubufs, *oldubufs = NULL;
int r;
......
/* start polling new socket */
oldsock = rcu_dereference_protected(vq->private_data,
lockdep_is_held(&vq->mutex));
if (sock != oldsock) {
ubufs = vhost_net_ubuf_alloc(vq,
sock && vhost_sock_zcopy(sock));
if (IS_ERR(ubufs)) {
r = PTR_ERR(ubufs);
goto err_ubufs;
}
vhost_net_disable_vq(n, vq);
// 设置 vhost_virtqueue 的私有数据未 tap 的socket
rcu_assign_pointer(vq->private_data, sock);
r = vhost_init_used(vq);
if (r)
goto err_used;
r = vhost_net_enable_vq(n, vq);
......
}
......
}
// poll: vhost_net的发送或接收队列的 poll,file: tap设备文件
int vhost_poll_start(struct vhost_poll *poll, struct file *file)
{
unsigned long mask;
int ret = 0;
if (poll->wqh)
return 0;
// 对应 tun_chr_poll
mask = file->f_op->poll(file, &poll->table);
if (mask)
vhost_poll_wakeup(&poll->wait, 0, 0, (void *)mask);
if (mask & POLLERR) {
if (poll->wqh)
remove_wait_queue(poll->wqh, &poll->wait);
ret = -EINVAL;
}
return ret;
}
static unsigned int tun_chr_poll(struct file *file, poll_table *wait)
{
struct tun_file *tfile = file->private_data;
struct tun_struct *tun = __tun_get(tfile);
struct sock *sk;
unsigned int mask = 0;
if (!tun)
return POLLERR;
sk = tfile->socket.sk;
tun_debug(KERN_INFO, tun, "tun_chr_poll\n");
poll_wait(file, &tfile->wq.wait, wait);
......
}
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address)
// 调用 vhost_poll_func
p->_qproc(filp, wait_address, p);
}
static void vhost_poll_func(struct file *file, wait_queue_head_t *wqh,
poll_table *pt)
{
struct vhost_poll *poll;
poll = container_of(pt, struct vhost_poll, table);
poll->wqh = wqh;
add_wait_queue(wqh, &poll->wait);
}
2)VHOST_SET_VRING_KICK,设置vhost_virtqueue和eventfd文件的关联。
qemu通过VHOST_SET_VRING_KICK设置vhost的kick文件,用以响应kvm的kick信号,其中会调用 eventfd的poll函数eventfd_poll,将 vhost_virtqueue 的对应的等待实体加入到 eventfd文件的等待队列中,eventfd文件收到信号后,唤醒等待实体,唤醒vhost内核线程开始处理vm发出的报文。
vhost_poll_wakeup
tap接口收发报文或者eventfd收到kick信号的时候,最终会调用vhost_poll_wakeup唤醒他们等待队列中的等待实体,进而唤醒vhost内核线程处理。
当从tap口发送报文的时候,调用tun_net_xmit函数。根据选好的队列index,找到tap的tun_file,tap口的每个队列对应一个tun_file,而vhost_net 的poll wait是挂载在tun_file中的,每个队列独立处理报文。
/* Net device start xmit */
static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
{
struct tun_struct *tun = netdev_priv(dev);
// 获取已经选择好的队列
int txq = skb->queue_mapping;
struct tun_file *tfile;
rcu_read_lock();
// vm网卡配置多队列的时候,一个tap会有多个tun_file,每个tun_file有独立的socket、poll。
tfile = rcu_dereference(tun->tfiles[txq]);
......
/* Limit the number of packets queued by dividing txq length with the
* number of queues.
tap 发送,即socket接收
*/
if (skb_queue_len(&tfile->socket.sk->sk_receive_queue)
>= dev->tx_queue_len / tun->numqueues)
goto drop;
/* Orphan the skb - required as we might hang on to it
* for indefinite time. */
if (unlikely(skb_orphan_frags(skb, GFP_ATOMIC)))
goto drop;
skb_orphan(skb);
nf_reset(skb);
/* Enqueue packet ,缓存报文 */
skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
/* Notify and wake up reader process */
if (tfile->flags & TUN_FASYNC)
kill_fasync(&tfile->fasync, SIGIO, POLL_IN);
// 队列中挂载 vhost_net的poll,唤醒函数是vhost_poll_wakeup
// 唤醒函数将 vhost_poll挂到 vhost设备vhost_dev的工作队列中,并唤醒设备对应的工作线程处理报文。
wake_up_interruptible_poll(&tfile->wq.wait, POLLIN |
POLLRDNORM | POLLRDBAND);
rcu_read_unlock();
return NETDEV_TX_OK;
drop:
dev->stats.tx_dropped++;
skb_tx_error(skb);
kfree_skb(skb);
rcu_read_unlock();
return NETDEV_TX_OK;
}
static int vhost_poll_wakeup(wait_queue_t *wait, unsigned mode, int sync,
void *key)
{
struct vhost_poll *poll = container_of(wait, struct vhost_poll, wait);
if (!((unsigned long)key & poll->mask))
return 0;
vhost_poll_queue(poll);
return 0;
}
void vhost_poll_queue(struct vhost_poll *poll)
{
vhost_work_queue(poll->dev, &poll->work);
}
void vhost_work_queue(struct vhost_dev *dev, struct vhost_work *work)
{
unsigned long flags;
spin_lock_irqsave(&dev->work_lock, flags);
if (list_empty(&work->node)) {
list_add_tail(&work->node, &dev->work_list);
work->queue_seq++;
wake_up_process(dev->worker);
}
spin_unlock_irqrestore(&dev->work_lock, flags);
}
handle_tx_net/handle_rx_net
vhost内核线程
qemu通过VHOST_SET_OWNER 创建vhost内核线程,这里的owner是指qemu进程,即vhost_net设备的owner是某一个qemu进程。
long vhost_dev_set_owner(struct vhost_dev *dev)
{
struct task_struct *worker;
int err;
/* Is there an owner already? */
if (vhost_dev_has_owner(dev)) {
err = -EBUSY;
goto err_mm;
}
/* No owner, become one */
dev->mm = get_task_mm(current);
worker = kthread_create(vhost_worker, dev, "vhost-%d", current->pid);
if (IS_ERR(worker)) {
err = PTR_ERR(worker);
goto err_worker;
}
dev->worker = worker;
wake_up_process(worker); /* avoid contributing to loadavg */
err = vhost_attach_cgroups(dev);
if (err)
goto err_cgroup;
err = vhost_dev_alloc_iovecs(dev);
if (err)
goto err_cgroup;
return 0;
err_cgroup:
kthread_stop(worker);
dev->worker = NULL;
err_worker:
if (dev->mm)
mmput(dev->mm);
dev->mm = NULL;
err_mm:
return err;
}
vhost_net_set_owner:
- 拿到qemu进程的mm_struct,即guest的内存分布结构。vhost内核线程工作时,调用use_mm(dev->mm),即可设置vhost内核线程可以使用qemu进程的内存映射,收发报文的vring本质上都是共享内存;
- 创建内核vhost 线程,名称为 "vhost-{qemu进程id},当前为qemu的进程上下文,所以current->pid 为qemu pid,线程处理函数为vhost_worker;
- vhost_worker 遍历vhost设备工作队列,获取work(然后摘除),调用处理函数,这里挂载的函数包括 handle_rx_net 、handle_rx_net、handle_tx_kick,handle_rx_kick。
/* Caller should have device mutex */
long vhost_dev_set_owner(struct vhost_dev *dev)
{
struct task_struct *worker;
int err;
/* Is there an owner already? */
if (vhost_dev_has_owner(dev)) {
err = -EBUSY;
goto err_mm;
}
/* No owner, become one */
/* 拿到qemu进程的mm_struct,即guest的内存分布结构 */
dev->mm = get_task_mm(current);
// 创建vhost 进程,名称为 "vhost-{qemu进程id},当前为qemu的进程上下文,所以current->pid 未qemu pid
worker = kthread_create(vhost_worker, dev, "vhost-%d", current->pid);
if (IS_ERR(worker)) {
err = PTR_ERR(worker);
goto err_worker;
}
dev->worker = worker;
wake_up_process(worker); /* avoid contributing to loadavg */
......
}
static int vhost_worker(void *data)
{
struct vhost_dev *dev = data;
struct vhost_work *work = NULL;
unsigned uninitialized_var(seq);
mm_segment_t oldfs = get_fs();
set_fs(USER_DS);
use_mm(dev->mm);
for (;;) {
/* mb paired w/ kthread_stop */
set_current_state(TASK_INTERRUPTIBLE);
spin_lock_irq(&dev->work_lock);
if (work) {
work->done_seq = seq;
if (work->flushing)
wake_up_all(&work->done);
}
if (kthread_should_stop()) {
spin_unlock_irq(&dev->work_lock);
__set_current_state(TASK_RUNNING);
break;
}
// 从vhost设备工作队列中获取work(然后摘除),调用处理函数。
if (!list_empty(&dev->work_list)) {
work = list_first_entry(&dev->work_list,
struct vhost_work, node);
list_del_init(&work->node);
seq = work->queue_seq;
} else
work = NULL;
spin_unlock_irq(&dev->work_lock);
if (work) {
__set_current_state(TASK_RUNNING);
work->fn(work);
if (need_resched())
schedule();
} else
schedule();
}
unuse_mm(dev->mm);
set_fs(oldfs);
return 0;
}
收发包流程
virtio vring 结构,见: https://blog.csdn.net/huang987246510/article/details/103739592
其中涉及 desc[] table, avail ring, used ring,是前后端共享的。
在收发包流程中大概可以总结为:
- 发送侧(send queue)前端驱动发送报文的时,将待发送报文加入avail ring等待后端的处理,后端处理完后,会将其放入used ring,并由前端将其释放desc中(free_old_xmit_skbs, detach_buf),最后通过try_fill_recv重新装入avail ring中;
- 接收侧(receive qeueu),前端将空白物理块加入avail ring中,提供给后端用来接收报文,后端接收完报文会放入used ring。
可以看出:都是后端用完前端的avail ring的东西放入used ring,也就是前端消耗uesd,后端消耗avail。
VM 收包流程
vm收包流程开始于tap口的发包,宿主机协议栈(如bridge)将报文从tap口发出,其发送驱动函数为tun_net_xmit,主要做两件事:
- 将报文放入tap设备某个队列的socket 接收队列中;
- 唤醒socket等待队列中的等待实体,其对应一个vhost_net设备(vhost_net)的vhost_poll,将其工作队列表项挂在到vhost设备的工作列表中,并唤醒它的vhost内核线程,内核线程将遍历工作列表中的工作,调用其处理函数接收报文。
/* Net device start xmit */
static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
{
struct tun_struct *tun = netdev_priv(dev);
int txq = skb->queue_mapping;
struct tun_file *tfile;
......
/* Enqueue packet */
skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
/* Notify and wake up reader process */
if (tfile->flags & TUN_FASYNC)
kill_fasync(&tfile->fasync, SIGIO, POLL_IN);
wake_up_interruptible_poll(&tfile->wq.wait, POLLIN |
POLLRDNORM | POLLRDBAND);
......
}
后续就是vhost线程调用工作队列中的回调函数handle_rx_net,调用栈如下:
vhost线程调用--> handle_rx_net->handle_rx–>tun_recvmsg&vhost_add_used_and_signal_n–>vhost_signal–>eventfd_signal–>wake_up_locked_poll–>irqfd_wakeup–>kvm_set_msi–>kvm_irq_delivery_to_apic–>kvm_irq_delivery_to_apic_fast–>kvm_apic_set_irq–>__apic_accept_irq–>kvm_vcpu_kick(这个函数的功能就是,判断vcpu是否正在物理cpu上运行,如果是,则让vcpu退出,以便进行中断注入。)
其中 handle_rx:
- 调用 get_rx_bufs 从rx queue(vhost_net.vqs[VHOST_NET_VQ_RX])的 avail ring 获取当前可用描述符信息索引,其保存着一组可用描述符buf链头索引,这一组描述符buf 用来存储将要接收的报文,描述符buf的地址存储在rx vhost_virtqueue的iov中,描述符buf 链信息保存在vring_used_elem 类型的vq->heads数组中,包括链头索引,以及总长度;
VRing中的buf描述符存储的地址是GPA(guest-physical address),被映射为HVA(Host Virtual address)保存在iov中。 - 调用tap接口的 sock->ops->recvmsg,即tun_recvmsg接收报文。 tun_recvmsg 从 tap口的 &tfile->socket.sk->sk_receive_queue获取报文,并拷贝到vhost_dev的vhost_virtqueue->iov中,也就是可用描述符buf中;
- 调用vhost_rx_signal_used->vhost_add_used_and_signal_n,做两件事:
1)vhost_add_used_n,将可用描述符信息写入 rx vhost_virtqueue的 used ring,更新vhost当前已使用的used id(vq->last_used_idx)写到vq->used->idx里,这里的作用是让Guest知道当前vhost已经使用的id值,这样当Guest需要回收buffer或者接收vhost转给它的报文时才知道需要从哪里获取,这里是后者,后面的VM 发包流程,调用这个函数的时候,是guest通过used ring回收报文。更新vq->last_used_idx。
2)调用 vhost_signal->eventfd_signal(vq->call_ctx, 1);,call_ctx 就是irqfd,这里写eventfd,模拟向guest注入中断,通过guest接收报文。
如本节开头总结,vm接收流程,从rx queue的avail ring获取desc,缓存报文后,将desc index存入used ring,vm从中获取报文。
static void handle_rx(struct vhost_net *net)
{
struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_RX];
struct vhost_virtqueue *vq = &nvq->vq;
unsigned uninitialized_var(in), log;
struct vhost_log *vq_log;
struct msghdr msg = {
.msg_name = NULL,
.msg_namelen = 0,
.msg_control = NULL, /* FIXME: get and handle RX aux data. */
.msg_controllen = 0,
// 报文接收后,放入vhost_virtqueue中;
.msg_iov = vq->iov,
.msg_flags = MSG_DONTWAIT,
};
struct virtio_net_hdr_mrg_rxbuf hdr = {
.hdr.flags = 0,
.hdr.gso_type = VIRTIO_NET_HDR_GSO_NONE
};
size_t total_len = 0;
int err, mergeable;
s16 headcount;
size_t vhost_hlen, sock_hlen;
size_t vhost_len, sock_len;
/* TODO: check that we are running from vhost_worker? */
// qemu通过VHOST_NET_SET_BACKEND 设置,vq的私有数据是tap设备socket
struct socket *sock = rcu_dereference_check(vq->private_data, 1);
if (!sock)
return;
mutex_lock(&vq->mutex);
vhost_disable_notify(&net->dev, vq);
vhost_hlen = nvq->vhost_hlen;
sock_hlen = nvq->sock_hlen;
vq_log = unlikely(vhost_has_feature(&net->dev, VHOST_F_LOG_ALL)) ?
vq->log : NULL;
mergeable = vhost_has_feature(&net->dev, VIRTIO_NET_F_MRG_RXBUF);
while ((sock_len = peek_head_len(sock->sk))) {
sock_len += sock_hlen;
vhost_len = sock_len + vhost_hlen;
headcount = get_rx_bufs(vq, vq->heads, vhost_len,
&in, vq_log, &log,
likely(mergeable) ? UIO_MAXIOV : 1);
/* On error, stop handling until the next kick. */
if (unlikely(headcount < 0))
break;
/* OK, now we need to know about added descriptors. */
if (!headcount) {
if (unlikely(vhost_enable_notify(&net->dev, vq))) {
/* They have slipped one in as we were
* doing that: check again. */
vhost_disable_notify(&net->dev, vq);
continue;
}
/* Nothing new? Wait for eventfd to tell us
* they refilled. */
break;
}
/* We don't need to be notified again. */
if (unlikely((vhost_hlen)))
/* Skip header. TODO: support TSO. */
move_iovec_hdr(vq->iov, nvq->hdr, vhost_hlen, in);
else
/* Copy the header for use in VIRTIO_NET_F_MRG_RXBUF:
* needed because recvmsg can modify msg_iov. */
copy_iovec_hdr(vq->iov, nvq->hdr, sock_hlen, in);
msg.msg_iovlen = in;
err = sock->ops->recvmsg(NULL, sock, &msg,
sock_len, MSG_DONTWAIT | MSG_TRUNC);
/* Userspace might have consumed the packet meanwhile:
* it's not supposed to do this usually, but might be hard
* to prevent. Discard data we got (if any) and keep going. */
if (unlikely(err != sock_len)) {
pr_debug("Discarded rx packet: "
" len %d, expected %zd\n", err, sock_len);
vhost_discard_vq_desc(vq, headcount);
continue;
}
if (unlikely(vhost_hlen) &&
memcpy_toiovecend(nvq->hdr, (unsigned char *)&hdr, 0,
vhost_hlen)) {
vq_err(vq, "Unable to write vnet_hdr at addr %p\n",
vq->iov->iov_base);
break;
}
/* TODO: Should check and handle checksum. */
if (likely(mergeable) &&
memcpy_toiovecend(nvq->hdr, (unsigned char *)&headcount,
offsetof(typeof(hdr), num_buffers),
sizeof hdr.num_buffers)) {
vq_err(vq, "Failed num_buffers write");
vhost_discard_vq_desc(vq, headcount);
break;
}
vhost_add_used_and_signal_n(&net->dev, vq, vq->heads,
headcount);
if (unlikely(vq_log))
vhost_log_write(vq, vq_log, log, vhost_len);
total_len += vhost_len;
if (unlikely(total_len >= VHOST_NET_WEIGHT)) {
vhost_poll_queue(&vq->poll);
break;
}
}
mutex_unlock(&vq->mutex);
}
/* multi-buffer version of vhost_add_used_and_signal */
void vhost_add_used_and_signal_n(struct vhost_dev *dev,
struct vhost_virtqueue *vq,
struct vring_used_elem *heads, unsigned count)
{
// 将报文写入 rx vhost_virtqueue的 used ring,更新vhost当前已使用的used id(vq->last_used_idx)写到vq->used->idx里.
// 这里的作用是让Guest知道当前vhost已经使用的id值,这样当Guest需要回收buffer或者接收vhost转给它的报文时才知道需要从哪里获取。
vhost_add_used_n(vq, heads, count);
vhost_signal(dev, vq);
}
int vhost_add_used_n(struct vhost_virtqueue *vq, struct vring_used_elem *heads,
unsigned count)
{
int start, n, r;
start = vq->last_used_idx % vq->num;
n = vq->num - start;
if (n < count) {
r = __vhost_add_used_n(vq, heads, n);
if (r < 0)
return r;
heads += n;
count -= n;
}
r = __vhost_add_used_n(vq, heads, count);
/* Make sure buffer is written before we update index. */
smp_wmb();
if (put_user(vq->last_used_idx, &vq->used->idx)) {
vq_err(vq, "Failed to increment used idx");
return -EFAULT;
}
......
}
static int __vhost_add_used_n(struct vhost_virtqueue *vq,
struct vring_used_elem *heads,
unsigned count)
{
struct vring_used_elem __user *used;
u16 old, new;
int start;
start = vq->last_used_idx % vq->num;
used = vq->used->ring + start;
if (__copy_to_user(used, heads, count * sizeof *used)) {
vq_err(vq, "Failed to write used");
return -EFAULT;
}
if (unlikely(vq->log_used)) {
/* Make sure data is seen before log. */
smp_wmb();
/* Log used ring entry write. */
log_write(vq->log_base,
vq->log_addr +
((void __user *)used - (void __user *)vq->used),
count * sizeof *used);
}
old = vq->last_used_idx;
new = (vq->last_used_idx += count);
......
}
VM 发包流程
handle_tx_net 涉及vm发包流程,被注册为响应tap socket POLLOUT事件,待整理。。。
- qemu VHOST_SET_VRING_KICK 流程,关联了kick文件(eventfd文件)和vhost设备, 其中会调用 vhost_poll_start 将vhost_net的vhost_virtqueue的vhost_poll等待实体加入到了vq->kick文件的等待列表中,kvm通过此eventfd文件发送kick信号通知vhost,guest有报文要发送;
long vhost_vring_ioctl(struct vhost_dev *d, int ioctl, void __user *argp)
{
case VHOST_SET_VRING_KICK:
if (copy_from_user(&f, argp, sizeof f)) {
r = -EFAULT;
break;
}
eventfp = f.fd == -1 ? NULL : eventfd_fget(f.fd);
if (IS_ERR(eventfp)) {
r = PTR_ERR(eventfp);
break;
}
if (eventfp != vq->kick) {
pollstop = (filep = vq->kick) != NULL;
pollstart = (vq->kick = eventfp) != NULL;
} else
filep = eventfp;
break;
......
if (pollstart && vq->handle_kick)
r = vhost_poll_start(&vq->poll, vq->kick);
......
}
- guest发包后,报文的描述符index存入avail ring,调用virtqueue_notify通知kvm,触发mmio异常陷出到host,然后通过eventfd机制唤醒vhost线程执行发包流程。
调用栈: vmx_handle_exit–>kvm_vmx_exit_handlers[exit_reason]–>handle_io–>kvm_fast_pio_out–>emulator_pio_out_emulated–>emulator_pio_in_out–>kernel_pio–>kvm_io_bus_write–>kvm_iodevice_write(dev->ops->write)–>ioeventfd_write–>eventfd_signal–>wake_up_locked_poll–>__wake_up_locked_key–>__wake_up_common–>vhost_poll_wakeup–>vhost_poll_queue–>vhost_work_queue–>wake_up_process
唤醒vhost线程执行handle_tx_kick,将报文送入协议栈,见tap协议栈 https://www.jianshu.com/p/53b3199c9a92。
调用栈:handle_tx_kick–>handle_tx(sock->ops->sendmsg)–>tun_sendmsg–>tun_get_user(内部的tun_alloc_skb?)–>netif_rx_ni。
其中 handle_tx:
- 调用 vhost_get_vq_desc 获取guest填充的描述符信息,buf的地址保存在tx vhost_virtqueue的iov中;
- 调用tap socket的sock->ops->sendmsg完成报文的发送,将报文送入内核协议栈;
- 同vm收包流程,调用vhost_add_used_and_signal 将已经用过的描述符buf索引写入used ring,并通知guest回收。
static void handle_tx_kick(struct vhost_work *work)
{
struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue,
poll.work);
struct vhost_net *net = container_of(vq->dev, struct vhost_net, dev);
handle_tx(net);
}
static void handle_tx(struct vhost_net *net)
{
struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
struct vhost_virtqueue *vq = &nvq->vq;
unsigned out, in, s;
int head;
struct msghdr msg = {
.msg_name = NULL,
.msg_namelen = 0,
.msg_control = NULL,
.msg_controllen = 0,
.msg_iov = vq->iov,
.msg_flags = MSG_DONTWAIT,
};
size_t len, total_len = 0;
int err;
size_t hdr_size;
struct socket *sock;
struct vhost_net_ubuf_ref *uninitialized_var(ubufs);
bool zcopy, zcopy_used;
/* TODO: check that we are running from vhost_worker? */
sock = rcu_dereference_check(vq->private_data, 1);
if (!sock)
return;
mutex_lock(&vq->mutex);
vhost_disable_notify(&net->dev, vq);
hdr_size = nvq->vhost_hlen;
zcopy = nvq->ubufs;
for (;;) {
/* Release DMAs done buffers first */
if (zcopy)
vhost_zerocopy_signal_used(net, vq);
head = vhost_get_vq_desc(&net->dev, vq, vq->iov,
ARRAY_SIZE(vq->iov),
&out, &in,
NULL, NULL);
/* On error, stop handling until the next kick. */
if (unlikely(head < 0))
break;
/* Nothing new? Wait for eventfd to tell us they refilled. */
if (head == vq->num) {
int num_pends;
/* If more outstanding DMAs, queue the work.
* Handle upend_idx wrap around
*/
num_pends = likely(nvq->upend_idx >= nvq->done_idx) ?
(nvq->upend_idx - nvq->done_idx) :
(nvq->upend_idx + UIO_MAXIOV -
nvq->done_idx);
if (unlikely(num_pends > VHOST_MAX_PEND))
break;
if (unlikely(vhost_enable_notify(&net->dev, vq))) {
vhost_disable_notify(&net->dev, vq);
continue;
}
break;
}
if (in) {
vq_err(vq, "Unexpected descriptor format for TX: "
"out %d, int %d\n", out, in);
break;
}
/* Skip header. TODO: support TSO. */
s = move_iovec_hdr(vq->iov, nvq->hdr, hdr_size, out);
msg.msg_iovlen = out;
len = iov_length(vq->iov, out);
/* Sanity check */
if (!len) {
vq_err(vq, "Unexpected header len for TX: "
"%zd expected %zd\n",
iov_length(nvq->hdr, s), hdr_size);
break;
}
zcopy_used = zcopy && (len >= VHOST_GOODCOPY_LEN ||
nvq->upend_idx != nvq->done_idx);
/* use msg_control to pass vhost zerocopy ubuf info to skb */
if (zcopy_used) {
vq->heads[nvq->upend_idx].id = head;
if (!vhost_net_tx_select_zcopy(net) ||
len < VHOST_GOODCOPY_LEN) {
/* copy don't need to wait for DMA done */
vq->heads[nvq->upend_idx].len =
VHOST_DMA_DONE_LEN;
msg.msg_control = NULL;
msg.msg_controllen = 0;
ubufs = NULL;
} else {
struct ubuf_info *ubuf;
ubuf = nvq->ubuf_info + nvq->upend_idx;
vq->heads[nvq->upend_idx].len =
VHOST_DMA_IN_PROGRESS;
ubuf->callback = vhost_zerocopy_callback;
ubuf->ctx = nvq->ubufs;
ubuf->desc = nvq->upend_idx;
msg.msg_control = ubuf;
msg.msg_controllen = sizeof(ubuf);
ubufs = nvq->ubufs;
kref_get(&ubufs->kref);
}
nvq->upend_idx = (nvq->upend_idx + 1) % UIO_MAXIOV;
} else
msg.msg_control = NULL;
/* TODO: Check specific error and bomb out unless ENOBUFS? */
err = sock->ops->sendmsg(NULL, sock, &msg, len);
if (unlikely(err < 0)) {
if (zcopy_used) {
if (ubufs)
vhost_net_ubuf_put(ubufs);
nvq->upend_idx = ((unsigned)nvq->upend_idx - 1)
% UIO_MAXIOV;
}
vhost_discard_vq_desc(vq, 1);
break;
}
if (err != len)
pr_debug("Truncated TX packet: "
" len %d != %zd\n", err, len);
if (!zcopy_used)
vhost_add_used_and_signal(&net->dev, vq, head, 0);
else
vhost_zerocopy_signal_used(net, vq);
total_len += len;
vhost_net_tx_packet(net);
if (unlikely(total_len >= VHOST_NET_WEIGHT)) {
vhost_poll_queue(&vq->poll);
break;
}
}
mutex_unlock(&vq->mutex);
}