Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workqueue 去掉timer,使用list排序处理方式 #9825

Merged
merged 4 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion components/drivers/include/ipc/workqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include <rtdef.h>
#include <rtconfig.h>
#include "completion.h"

#ifdef __cplusplus
extern "C" {
Expand Down Expand Up @@ -42,6 +43,7 @@ struct rt_workqueue
struct rt_semaphore sem;
rt_thread_t work_thread;
struct rt_spinlock spinlock;
struct rt_completion wakeup_completion;
};

struct rt_work
Expand All @@ -52,7 +54,7 @@ struct rt_work
void *work_data;
rt_uint16_t flags;
rt_uint16_t type;
struct rt_timer timer;
rt_tick_t timeout_tick;
struct rt_workqueue *workqueue;
};

Expand Down
172 changes: 71 additions & 101 deletions components/drivers/ipc/workqueue.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2006-2023, RT-Thread Development Team
* Copyright (c) 2006-2022, RT-Thread Development Team
*
* SPDX-License-Identifier: Apache-2.0
*
Expand All @@ -10,15 +10,14 @@
* 2021-08-14 Jackistang add comments for function interface
* 2022-01-16 Meco Man add rt_work_urgent()
* 2023-09-15 xqyjlj perf rt_hw_interrupt_disable/enable
* 2024-12-21 yuqingli delete timer, using list
*/

#include <rthw.h>
#include <rtdevice.h>

#ifdef RT_USING_HEAP

static void _delayed_work_timeout_handler(void *parameter);

rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)
{
rt_err_t result;
Expand Down Expand Up @@ -50,38 +49,61 @@ rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue)

static void _workqueue_thread_entry(void *parameter)
{
rt_base_t level;
struct rt_work *work;
rt_base_t level;
struct rt_work *work;
struct rt_workqueue *queue;
rt_tick_t current_tick;
rt_int32_t delay_tick;
void (*work_func)(struct rt_work *work, void *work_data);
void *work_data;

queue = (struct rt_workqueue *) parameter;
queue = (struct rt_workqueue *)parameter;
RT_ASSERT(queue != RT_NULL);

while (1)
{
level = rt_spin_lock_irqsave(&(queue->spinlock));
if (rt_list_isempty(&(queue->work_list)))

/* timer check */
current_tick = rt_tick_get();
delay_tick = RT_WAITING_FOREVER;
while (!rt_list_isempty(&(queue->delayed_list)))
{
/* no software timer exist, suspend self. */
rt_thread_suspend_with_flag(rt_thread_self(), RT_UNINTERRUPTIBLE);
work = rt_list_entry(queue->delayed_list.next, struct rt_work, list);
if ((current_tick - work->timeout_tick) < RT_TICK_MAX / 2)
{
rt_list_remove(&(work->list));
rt_list_insert_after(queue->work_list.prev, &(work->list));
work->flags &= ~RT_WORK_STATE_SUBMITTING;
work->flags |= RT_WORK_STATE_PENDING;
}
else
{
delay_tick = work->timeout_tick - current_tick;
break;
}
}

/* release lock after suspend so we will not lost any wakeups */
if (rt_list_isempty(&(queue->work_list)))
{
rt_spin_unlock_irqrestore(&(queue->spinlock), level);

rt_schedule();
/* wait for work completion */
rt_completion_wait(&(queue->wakeup_completion), delay_tick);
continue;
}

/* we have work to do with. */
work = rt_list_entry(queue->work_list.next, struct rt_work, list);
rt_list_remove(&(work->list));
queue->work_current = work;
work->flags &= ~RT_WORK_STATE_PENDING;
work->workqueue = RT_NULL;
queue->work_current = work;
work->flags &= ~RT_WORK_STATE_PENDING;
work->workqueue = RT_NULL;
work_func = work->work_func;
work_data = work->work_data;
rt_spin_unlock_irqrestore(&(queue->spinlock), level);

/* do work */
work->work_func(work, work->work_data);
work_func(work, work_data);
/* clean current work */
queue->work_current = RT_NULL;

Expand All @@ -93,114 +115,68 @@ static void _workqueue_thread_entry(void *parameter)
static rt_err_t _workqueue_submit_work(struct rt_workqueue *queue,
struct rt_work *work, rt_tick_t ticks)
{
rt_base_t level;
rt_err_t err = RT_EOK;
rt_base_t level;
rt_err_t err = RT_EOK;
struct rt_work *work_tmp;
rt_list_t *list_tmp;

level = rt_spin_lock_irqsave(&(queue->spinlock));

/* remove list */
rt_list_remove(&(work->list));
work->flags &= ~RT_WORK_STATE_PENDING;
work->flags = 0;

if (ticks == 0)
{
rt_list_insert_after(queue->work_list.prev, &(work->list));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得这里缺少种情况的判断:比如创建一个工作队列,队列执行后再向其提交一次,正常来说第二次应该返回BUSY了
这块我看之前是有判断的,建议作者添加上:b065486#diff-b63048f1ad2089c43aeb589d9f396dfea3e95b253a0e9c7d13f785bc0613cb31L105

Copy link
Contributor Author

@yuqingli05 yuqingli05 Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

b065486#diff-b63048f1ad2089c43aeb589d9f396dfea3e95b253a0e9c7d13f785bc0613cb31L93-R354
1、看了下上面 b065486 ,这次提交不是去掉BUSY判断用的吗?和我的思路不冲突啊。
2、并且我认为去掉 busy是个明智之举。因为他可以在 自己的回调里面 再次触发自己,这个是很常用的(至少我很常用,我自己的项目里面大量的使用了回调函数里面再次触发同一个 work的逻辑)。
3、正在触发回调的work, b065486 和我的思路是一样的,都是运行再次添加。
4、实际上我为了处理这个问题,在线程回调的地方,把回调函数在临界区保存为临时变量了(work_func 和 work_data)。退出临界区之后实际上 work 结构体已经是安全的了。f0415bb?diff=split&w=0#diff-b63048f1ad2089c43aeb589d9f396dfea3e95b253a0e9c7d13f785bc0613cb31R96-R109

Copy link
Member

@Rbb666 Rbb666 Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我没描述清楚,假设这样的条件:工作队列线程回调还没执行完成再次发生了调度,然后在其他线程又向队列提交了一个任务,那么这个时候队列应该处于BUSY吧。

static void work_test_fun(struct rt_work *work, void *work_data)
{
    ......
    rt_thread_delay(10);
    ......
}

static void repeat_work_test02(void)
{
    /* 比当前测试线程优先级高 1 */
    curr_priority = get_test_thread_priority(-1);
    queue = rt_workqueue_create("test", 2048, curr_priority);

    rt_work_init(&work, work_test_fun, (void *)&work_flag);
    /* 提交任务,队列优先级高,会立即执行 */
    rt_workqueue_submit_work(queue, &work, 0);

    /* 延时让出 CPU */
    rt_thread_delay(5);
    /* 再次提交正在执行的任务,应该返回 BUSY */
    err = rt_workqueue_submit_work(queue, &work, 0);
    ......
}

Copy link
Contributor Author

@yuqingli05 yuqingli05 Dec 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1、现在需要明确的是:在我这个PR之前,BUSY判断就已经去掉了。我只是沿用他的去掉,我为啥要给他加回去那?
2、你第一条回复的那个提交 b065486 就是去掉 BUSY判断的啊。理由是work正在执行,这时候发生中断,这个中断调用rt_workqueue_submit_work,导致work丢掉。你为啥要用一和你观点完全想法的提交 b065486 来证明你的观点那?我又确认了一遍 b065486 ,好像不是我理解错了把?

3、同时说一下 我认为 去掉BUSY合理的原因如下(个人观点):

static void work_test_fun(struct rt_work *work, void *work_data)
{
    // 回调里面直接再次 触发自己,如果存在BUSY状态这个会触发失败
    // 至于为什么不用 循环直接执行,我理解是 是要利用 work的先进先出,进行排队执行(都是0延迟的时候),给其他任务执行的 空间
    // 我个人在项目中使用这种方式驱动了一个主逻辑
    rt_workqueue_submit_work(queue, &work, 0);
}

static void repeat_work_test02(void)
{

    rt_work_init(&work, work_test_fun, NULL);
    rt_workqueue_submit_work(queue, &work, 0);

}

work->flags |= RT_WORK_STATE_PENDING;
work->workqueue = queue;
work->flags |= RT_WORK_STATE_PENDING;
work->workqueue = queue;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我意思是不是在这里加个busy的返回判断就行了,因为我看到rt_workqueue_submit_work API返回值就是存在RT_EBUSY状态的:#L317

err = queue->work_current != work ? RT_EOK : -RT_EBUSY;

Copy link
Contributor Author

@yuqingli05 yuqingli05 Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个判断,在我这次PR之前就去掉了,已经不判断 BUSY了。BUSY状态下 会返回成功,回调执行完成之后 会立即重新触发。
去掉BUSY的PR不是我提交的, 在 b065486 里面提交的。

我找到了具体PR在 #6932 理由在这个PR里面有写。使用work代理中断,回调没执行完成,中断再次到来会丢中断。

我把注释更新下 去掉 BUSY 返回值的注释。

/* whether the workqueue is doing work */
if (queue->work_current == RT_NULL)
{
/* resume work thread, and do a re-schedule if succeed */
rt_thread_resume(queue->work_thread);
}
rt_completion_done(&(queue->wakeup_completion));
err = RT_EOK;
}
else if (ticks < RT_TICK_MAX / 2)
{
/* Timer started */
if (work->flags & RT_WORK_STATE_SUBMITTING)
{
rt_timer_control(&work->timer, RT_TIMER_CTRL_SET_TIME, &ticks);
}
else
/* insert delay work list */
work->flags |= RT_WORK_STATE_SUBMITTING;
work->workqueue = queue;
work->timeout_tick = rt_tick_get() + ticks;

list_tmp = &(queue->delayed_list);
rt_list_for_each_entry(work_tmp, &(queue->delayed_list), list)
{
rt_timer_init(&(work->timer), "work", _delayed_work_timeout_handler,
work, ticks, RT_TIMER_FLAG_ONE_SHOT | RT_TIMER_FLAG_SOFT_TIMER);
work->flags |= RT_WORK_STATE_SUBMITTING;
if ((work_tmp->timeout_tick - work->timeout_tick) < RT_TICK_MAX / 2)
{
list_tmp = &(work_tmp->list);
break;
}
}
work->workqueue = queue;
/* insert delay work list */
rt_list_insert_after(queue->delayed_list.prev, &(work->list));
rt_list_insert_before(list_tmp, &(work->list));

err = rt_timer_start(&(work->timer));
rt_completion_done(&(queue->wakeup_completion));
err = RT_EOK;
}
else
{
err = - RT_ERROR;
err = -RT_ERROR;
}

rt_spin_unlock_irqrestore(&(queue->spinlock), level);
return err;
}

static rt_err_t _workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work)
{
rt_base_t level;
rt_err_t err;
rt_err_t err;

level = rt_spin_lock_irqsave(&(queue->spinlock));
rt_list_remove(&(work->list));
work->flags &= ~RT_WORK_STATE_PENDING;
/* Timer started */
if (work->flags & RT_WORK_STATE_SUBMITTING)
{
if ((err = rt_timer_stop(&(work->timer))) != RT_EOK)
{
goto exit;
}
rt_timer_detach(&(work->timer));
work->flags &= ~RT_WORK_STATE_SUBMITTING;
}
err = queue->work_current != work ? RT_EOK : -RT_EBUSY;
work->flags = 0;
err = queue->work_current != work ? RT_EOK : -RT_EBUSY;
work->workqueue = RT_NULL;
exit:
rt_spin_unlock_irqrestore(&(queue->spinlock), level);
return err;
}

static void _delayed_work_timeout_handler(void *parameter)
{
struct rt_work *work;
struct rt_workqueue *queue;
rt_base_t level;

work = (struct rt_work *)parameter;
queue = work->workqueue;

RT_ASSERT(work->flags & RT_WORK_STATE_SUBMITTING);
RT_ASSERT(queue != RT_NULL);

level = rt_spin_lock_irqsave(&(queue->spinlock));
rt_timer_detach(&(work->timer));
work->flags &= ~RT_WORK_STATE_SUBMITTING;
/* remove delay list */
rt_list_remove(&(work->list));
/* insert work queue */
if (queue->work_current != work)
{
rt_list_insert_after(queue->work_list.prev, &(work->list));
work->flags |= RT_WORK_STATE_PENDING;
}
/* whether the workqueue is doing work */
if (queue->work_current == RT_NULL)
{
/* resume work thread, and do a re-schedule if succeed */
rt_thread_resume(queue->work_thread);
}

rt_spin_unlock_irqrestore(&(queue->spinlock), level);
}

/**
* @brief Initialize a work item, binding with a callback function.
*
Expand All @@ -221,8 +197,8 @@ void rt_work_init(struct rt_work *work,
work->work_func = work_func;
work->work_data = work_data;
work->workqueue = RT_NULL;
work->flags = 0;
work->type = 0;
work->flags = 0;
work->type = 0;
}

/**
Expand All @@ -248,6 +224,7 @@ struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_siz
rt_list_init(&(queue->delayed_list));
queue->work_current = RT_NULL;
rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO);
rt_completion_init(&(queue->wakeup_completion));

/* create the work thread */
queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10);
Expand Down Expand Up @@ -292,7 +269,6 @@ rt_err_t rt_workqueue_destroy(struct rt_workqueue *queue)
* @param work is a pointer to the work item object.
*
* @return RT_EOK Success.
* -RT_EBUSY This work item is executing.
*/
rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work)
{
Expand All @@ -314,7 +290,6 @@ rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work)
* NOTE: The max timeout tick should be no more than (RT_TICK_MAX/2 - 1)
*
* @return RT_EOK Success.
* -RT_EBUSY This work item is executing.
* -RT_ERROR The ticks parameter is invalid.
*/
rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t ticks)
Expand Down Expand Up @@ -346,14 +321,10 @@ rt_err_t rt_workqueue_urgent_work(struct rt_workqueue *queue, struct rt_work *wo
/* NOTE: the work MUST be initialized firstly */
rt_list_remove(&(work->list));
rt_list_insert_after(&queue->work_list, &(work->list));
/* whether the workqueue is doing work */
if (queue->work_current == RT_NULL)
{
/* resume work thread, and do a re-schedule if succeed */
rt_thread_resume(queue->work_thread);
}

rt_completion_done(&(queue->wakeup_completion));
rt_spin_unlock_irqrestore(&(queue->spinlock), level);

return RT_EOK;
}

Expand Down Expand Up @@ -448,7 +419,6 @@ static struct rt_workqueue *sys_workq; /* system work queue */
* NOTE: The max timeout tick should be no more than (RT_TICK_MAX/2 - 1)
*
* @return RT_EOK Success.
* -RT_EBUSY This work item is executing.
* -RT_ERROR The ticks parameter is invalid.
*/
rt_err_t rt_work_submit(struct rt_work *work, rt_tick_t ticks)
Expand Down
Loading