Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

另一种线程通信方式——eventfd #13

Open
JemmyH opened this issue Jul 19, 2021 · 3 comments
Open

另一种线程通信方式——eventfd #13

JemmyH opened this issue Jul 19, 2021 · 3 comments
Assignees
Labels
documentation Improvements or additions to documentation done Implementation by Go

Comments

@JemmyH
Copy link
Owner

JemmyH commented Jul 19, 2021

eventfd 是一种linux上的线程通信方式,和信号量等其他线程通信不同的是,eventfd 可以用于进程间的通信,还可以用于内核发信号给用户态的进程eventfd 是linux上的系统调用,本质上是用于事件通知。这种文件 fd 无法传输数据,只能用来传输事件,通常用于生产者消费者模式的事件实现。

本 issue 旨在明确 eventfd 的用法,为说明原理会对对其实现细节作简要描述。

@JemmyH JemmyH self-assigned this Jul 19, 2021
@JemmyH JemmyH added Digging In I'm on it. documentation Improvements or additions to documentation Implementation by Go labels Jul 19, 2021
@JemmyH
Copy link
Owner Author

JemmyH commented Jul 24, 2021

认识 eventfd

eventfd 是一个和计数相关的 fdeventfd 实现了 read/write 的调用,在调用里面实现了一套计数器的逻辑。write 仅仅是加计数,read 是读计数,并且清零。即:计数不为 0 表示有可读事件发生,write 会将计数增加,而 read 会将计数的值读到用户空间并且将计数清零。我们来看看对应的数据结构:

struct eventfd_ctx {
        // 引用计数 
	struct kref kref;
        // 这个文件描述符对应的等待队列
	wait_queue_head_t wqh;
	/*
	 * Every time that a write(2) is performed on an eventfd, the
	 * value of the __u64 being written is added to "count" and a
	 * wakeup is performed on "wqh". A read(2) will return the "count"
	 * value to userspace, and will reset "count" to zero. The kernel
	 * side eventfd_signal() also, adds to the "count" counter and
	 * issue a wakeup.
	 */
       // 对应的计数器
	__u64 count;
        // 标志位
	unsigned int flags;
        // ida 相关的 id,与本次讨论无关,暂忽略
	int id;
};

其中有意义的 flag 为:

  • EFD_CLOEXEC 表示返回的eventfd文件描述符在fork后exec其他程序时会自动关闭这个文件描述符;
  • EFD_NONBLOCK 设置返回的 eventfd 为非阻塞;
  • EFD_SEMAPHORE 表示将 eventfd 作为一个信号量来使用。

在创建一个 eventfd 时,其实是分别创建了 eventfd_ctxfilefd,之后 eventfd_ctx 作为 fileprivate_data 存储,之后 fdfile 进行绑定,最后返回给用户态的是 fd

在如下的片段中,定义了 eventfd 所实现的操作:

static const struct file_operations eventfd_fops = {
	.release	= eventfd_release,
	.poll		= eventfd_poll,
	.read_iter	= eventfd_read,
	.write		= eventfd_write,
	.llseek		= noop_llseek,
};

我们需要关注的是 eventfd_writeeventfd_readeventfd_poll 这三个函数的实现。

@JemmyH
Copy link
Owner Author

JemmyH commented Jul 24, 2021

实现

创建一个 eventfd 实例

SYSCALL_DEFINE2(eventfd2, unsigned int, count, int, flags)
{
	return do_eventfd(count, flags);
}

SYSCALL_DEFINE1(eventfd, unsigned int, count)
{
	return do_eventfd(count, 0);
}
// 最终调用的是 do_eventfd
static int do_eventfd(unsigned int count, int flags)
{
	struct eventfd_ctx *ctx;
	struct file *file;
	int fd;

	/* Check the EFD_* constants for consistency.  */
	BUILD_BUG_ON(EFD_CLOEXEC != O_CLOEXEC);
	BUILD_BUG_ON(EFD_NONBLOCK != O_NONBLOCK);

	if (flags & ~EFD_FLAGS_SET)
		return -EINVAL;

	ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);
	if (!ctx)
		return -ENOMEM;

	// 设置引用计数值为 1
	kref_init(&ctx->kref);
	// 初始化等待队列
	init_waitqueue_head(&ctx->wqh);
        // 初始化 count 和 flags
	ctx->count = count;
	ctx->flags = flags;
	// ida 相关的 id
	ctx->id = ida_simple_get(&eventfd_ida, 0, 0, GFP_KERNEL);

	flags &= EFD_SHARED_FCNTL_FLAGS;
	flags |= O_RDWR;
	// 在当前进程下获取一个 fd
	fd = get_unused_fd_flags(flags);
	if (fd < 0)
		goto err;

	// 创建一个文件,并将 ctx 作为 private_data 存储
	file = anon_inode_getfile("[eventfd]", &eventfd_fops, ctx, flags);
	if (IS_ERR(file)) {
		put_unused_fd(fd);
		fd = PTR_ERR(file);
		goto err;
	}

	file->f_mode |= FMODE_NOWAIT;
	// 将 fd 与 file 绑定
	fd_install(fd, file);
	return fd;
err:
	eventfd_free_ctx(ctx);
	return fd;
}

整个函数也比较简单。不过需要注意的是,ctx->id 并不是返回给用户的 eventid

eventfd_write 增加计数

static ssize_t eventfd_write(struct file *file, const char __user *buf, size_t count, loff_t *ppos)
{
        // 从 file 的 private_data 中取出 ctx
	struct eventfd_ctx *ctx = file->private_data;
	ssize_t res;
	__u64 ucnt;
	DECLARE_WAITQUEUE(wait, current);

        // 读取用户传入的 count,存储在 ucnt 中
        // 不能太大
	if (count < sizeof(ucnt))
		return -EINVAL;
	if (copy_from_user(&ucnt, buf, sizeof(ucnt)))
		return -EFAULT;
	if (ucnt == ULLONG_MAX)
		return -EINVAL;
	spin_lock_irq(&ctx->wqh.lock);
	res = -EAGAIN;
	// 不能越界
	if (ULLONG_MAX - ctx->count > ucnt)
		// 还有空闲的cnt 可写,res 表示 write 函数的写入字节数
		res = sizeof(ucnt);
	else if (!(file->f_flags & O_NONBLOCK)) {
		// 还没空闲大小可写并且是阻塞模式
		__add_wait_queue(&ctx->wqh, &wait);
		// 一直循环,直到有可空闲大小可写
		for (res = 0;;) {
			set_current_state(TASK_INTERRUPTIBLE);
			if (ULLONG_MAX - ctx->count > ucnt) {
				res = sizeof(ucnt);
				break;
			}
			if (signal_pending(current)) {
				res = -ERESTARTSYS;
				break;
			}
			spin_unlock_irq(&ctx->wqh.lock);
			// 让出 CPU,自己阻塞
			schedule();
			spin_lock_irq(&ctx->wqh.lock);
		}
		// 条件满足,真正恢复运行
		__remove_wait_queue(&ctx->wqh, &wait);
		__set_current_state(TASK_RUNNING);
	}
	// 返回值大于 0,说明写入成功了,此时需要唤醒等待数据的进程
	if (likely(res > 0)) {
		// 从这里也看出,write 其实是对 cnt 的累加
		ctx->count += ucnt;
		// 唤醒等待队列中的进程
		if (waitqueue_active(&ctx->wqh))
			wake_up_locked_poll(&ctx->wqh, EPOLLIN);
	}
	spin_unlock_irq(&ctx->wqh.lock);

	return res;
}

write 负责将 ctx 中的 count 进行累加,当检查到累加之后的值超过了 count 的最大范围、并且设定为非阻塞时,会阻塞在此函数中,直到有空间可写。更为重要的一步是,如果检查到成功递增了 ctx.count,此时会唤醒当前 file 的等待队列中的进程。

eventfd_read

static ssize_t eventfd_read(struct kiocb *iocb, struct iov_iter *to)
{
	struct file *file = iocb->ki_filp;
	struct eventfd_ctx *ctx = file->private_data;
	__u64 ucnt = 0;
	DECLARE_WAITQUEUE(wait, current);

	if (iov_iter_count(to) < sizeof(ucnt))
		return -EINVAL;
	spin_lock_irq(&ctx->wqh.lock);
	// 当 ctx->count 的值为 0 时
	if (!ctx->count) {
		// 如果设置了未阻塞标志,此时会返回 EAGAIN 错误
		if ((file->f_flags & O_NONBLOCK) ||
		    (iocb->ki_flags & IOCB_NOWAIT)) {
			spin_unlock_irq(&ctx->wqh.lock);
			return -EAGAIN;
		}
		// 阻塞在此处,等待数据写入
		__add_wait_queue(&ctx->wqh, &wait);
		for (;;) {
			set_current_state(TASK_INTERRUPTIBLE);
			if (ctx->count)
				break;
			if (signal_pending(current)) {
				__remove_wait_queue(&ctx->wqh, &wait);
				__set_current_state(TASK_RUNNING);
				spin_unlock_irq(&ctx->wqh.lock);
				return -ERESTARTSYS;
			}
			spin_unlock_irq(&ctx->wqh.lock);
			schedule();
			spin_lock_irq(&ctx->wqh.lock);
		}
		__remove_wait_queue(&ctx->wqh, &wait);
		__set_current_state(TASK_RUNNING);
	}
	// 真正执行 read 操作
	eventfd_ctx_do_read(ctx, &ucnt);
	// 读出数据之后,此时就有空间可写,通知阻塞在 write 的进程
	if (waitqueue_active(&ctx->wqh))
		wake_up_locked_poll(&ctx->wqh, EPOLLOUT);
	spin_unlock_irq(&ctx->wqh.lock);
	// 将读到的值复制给调用方
	if (unlikely(copy_to_iter(&ucnt, sizeof(ucnt), to) != sizeof(ucnt)))
		return -EFAULT;

	return sizeof(ucnt);
}

eventfd_poll

static __poll_t eventfd_poll(struct file *file, poll_table *wait)
{
	struct eventfd_ctx *ctx = file->private_data;
	__poll_t events = 0;
	u64 count;

	poll_wait(file, &ctx->wqh, wait);
        // 读取 ctx 中的 count
	count = READ_ONCE(ctx->count);
	if (count > 0)
                // count 大于 0,说明可读
		events |= EPOLLIN;
	if (count == ULLONG_MAX)
                // count 非法,返回错误
		events |= EPOLLERR;
	if (ULLONG_MAX - 1 > count)
                // 还有空间可写,也置可写标志位
		events |= EPOLLOUT;
	return events;
}

有一点非常重要:当还有空间可写时,也会置可写标志。这说明,只要是 ctx->count 的值没有达到最大值,就一直是可写的!在实际通知中,可写的事件也就没必要去监听了。

@JemmyH
Copy link
Owner Author

JemmyH commented Jul 24, 2021

使用

在说明 epoll 时,我们提到过,当一个文件实现了 poll 方法,那么就可以被 epoll 监听在其上面发生的可读可写事件。正巧,eventfd 实现了此方法,eventfd 是专门用来传递事件的fd ,而 epoll 则是专门用来管理事件的池子,我们看看最常见的用法:多生产者和单消费者,借助 eventfd 优雅的完成事件通知

对多个生产者而言,它们会一直写数据到公共的队列中,之后唤醒消费者(如果消费者处于睡眠状态的话):

func producer(data){
    // 投递消息
    write_data_to_queue(quene, data)
    // 唤醒消费者去处理(让 eventfd 递增 1)
    write(eventfd, 1, 8)
}

对单个消费者而言,是另一个线程,它会将这个 eventfd 放进 epoll 的监听队列中,并设置监听 读事件(前面提到过,监听写事件没意义),之后阻塞等待 epoll_wait 上的读事件:

func Consumer(){
    // 添加 eventfd 到监听池,并设置标志为读
    epoll_ctl(ep, EPOLL_CTL_ADD, eventfd, xxx);
    // 进入轮询
    for {
        // 等待唤醒
        epoll_wait(ep, ... );
       // 读取新添加到列表里的元素个数,并且进行处理;
       n = read(eventfd, ... )
       // 根据n 处理全局队列中的数据
       ..
    }
}

@JemmyH JemmyH added done and removed Digging In I'm on it. labels Jul 24, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation done Implementation by Go
Projects
None yet
Development

No branches or pull requests

1 participant