-
Notifications
You must be signed in to change notification settings - Fork 5.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
workqueue 去掉timer,使用list排序处理方式 #9825
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright (c) 2006-2023, RT-Thread Development Team | ||
* Copyright (c) 2006-2022, RT-Thread Development Team | ||
* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
|
@@ -10,15 +10,14 @@ | |
* 2021-08-14 Jackistang add comments for function interface | ||
* 2022-01-16 Meco Man add rt_work_urgent() | ||
* 2023-09-15 xqyjlj perf rt_hw_interrupt_disable/enable | ||
* 2024-12-21 yuqingli delete timer, using list | ||
*/ | ||
|
||
#include <rthw.h> | ||
#include <rtdevice.h> | ||
|
||
#ifdef RT_USING_HEAP | ||
|
||
static void _delayed_work_timeout_handler(void *parameter); | ||
|
||
rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue) | ||
{ | ||
rt_err_t result; | ||
|
@@ -50,38 +49,61 @@ rt_inline rt_err_t _workqueue_work_completion(struct rt_workqueue *queue) | |
|
||
static void _workqueue_thread_entry(void *parameter) | ||
{ | ||
rt_base_t level; | ||
struct rt_work *work; | ||
rt_base_t level; | ||
struct rt_work *work; | ||
struct rt_workqueue *queue; | ||
rt_tick_t current_tick; | ||
rt_int32_t delay_tick; | ||
void (*work_func)(struct rt_work *work, void *work_data); | ||
void *work_data; | ||
|
||
queue = (struct rt_workqueue *) parameter; | ||
queue = (struct rt_workqueue *)parameter; | ||
RT_ASSERT(queue != RT_NULL); | ||
|
||
while (1) | ||
{ | ||
level = rt_spin_lock_irqsave(&(queue->spinlock)); | ||
if (rt_list_isempty(&(queue->work_list))) | ||
|
||
/* timer check */ | ||
current_tick = rt_tick_get(); | ||
delay_tick = RT_WAITING_FOREVER; | ||
while (!rt_list_isempty(&(queue->delayed_list))) | ||
{ | ||
/* no software timer exist, suspend self. */ | ||
rt_thread_suspend_with_flag(rt_thread_self(), RT_UNINTERRUPTIBLE); | ||
work = rt_list_entry(queue->delayed_list.next, struct rt_work, list); | ||
if ((current_tick - work->timeout_tick) < RT_TICK_MAX / 2) | ||
{ | ||
rt_list_remove(&(work->list)); | ||
rt_list_insert_after(queue->work_list.prev, &(work->list)); | ||
work->flags &= ~RT_WORK_STATE_SUBMITTING; | ||
work->flags |= RT_WORK_STATE_PENDING; | ||
} | ||
else | ||
{ | ||
delay_tick = work->timeout_tick - current_tick; | ||
break; | ||
} | ||
} | ||
|
||
/* release lock after suspend so we will not lost any wakeups */ | ||
if (rt_list_isempty(&(queue->work_list))) | ||
{ | ||
rt_spin_unlock_irqrestore(&(queue->spinlock), level); | ||
|
||
rt_schedule(); | ||
/* wait for work completion */ | ||
rt_completion_wait(&(queue->wakeup_completion), delay_tick); | ||
continue; | ||
} | ||
|
||
/* we have work to do with. */ | ||
work = rt_list_entry(queue->work_list.next, struct rt_work, list); | ||
rt_list_remove(&(work->list)); | ||
queue->work_current = work; | ||
work->flags &= ~RT_WORK_STATE_PENDING; | ||
work->workqueue = RT_NULL; | ||
queue->work_current = work; | ||
work->flags &= ~RT_WORK_STATE_PENDING; | ||
work->workqueue = RT_NULL; | ||
work_func = work->work_func; | ||
work_data = work->work_data; | ||
rt_spin_unlock_irqrestore(&(queue->spinlock), level); | ||
|
||
/* do work */ | ||
work->work_func(work, work->work_data); | ||
work_func(work, work_data); | ||
/* clean current work */ | ||
queue->work_current = RT_NULL; | ||
|
||
|
@@ -93,114 +115,68 @@ static void _workqueue_thread_entry(void *parameter) | |
static rt_err_t _workqueue_submit_work(struct rt_workqueue *queue, | ||
struct rt_work *work, rt_tick_t ticks) | ||
{ | ||
rt_base_t level; | ||
rt_err_t err = RT_EOK; | ||
rt_base_t level; | ||
rt_err_t err = RT_EOK; | ||
struct rt_work *work_tmp; | ||
rt_list_t *list_tmp; | ||
|
||
level = rt_spin_lock_irqsave(&(queue->spinlock)); | ||
|
||
/* remove list */ | ||
rt_list_remove(&(work->list)); | ||
work->flags &= ~RT_WORK_STATE_PENDING; | ||
work->flags = 0; | ||
|
||
if (ticks == 0) | ||
{ | ||
rt_list_insert_after(queue->work_list.prev, &(work->list)); | ||
work->flags |= RT_WORK_STATE_PENDING; | ||
work->workqueue = queue; | ||
work->flags |= RT_WORK_STATE_PENDING; | ||
work->workqueue = queue; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 我意思是不是在这里加个busy的返回判断就行了,因为我看到 err = queue->work_current != work ? RT_EOK : -RT_EBUSY; There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
/* whether the workqueue is doing work */ | ||
if (queue->work_current == RT_NULL) | ||
{ | ||
/* resume work thread, and do a re-schedule if succeed */ | ||
rt_thread_resume(queue->work_thread); | ||
} | ||
rt_completion_done(&(queue->wakeup_completion)); | ||
err = RT_EOK; | ||
} | ||
else if (ticks < RT_TICK_MAX / 2) | ||
{ | ||
/* Timer started */ | ||
if (work->flags & RT_WORK_STATE_SUBMITTING) | ||
{ | ||
rt_timer_control(&work->timer, RT_TIMER_CTRL_SET_TIME, &ticks); | ||
} | ||
else | ||
/* insert delay work list */ | ||
work->flags |= RT_WORK_STATE_SUBMITTING; | ||
work->workqueue = queue; | ||
work->timeout_tick = rt_tick_get() + ticks; | ||
|
||
list_tmp = &(queue->delayed_list); | ||
rt_list_for_each_entry(work_tmp, &(queue->delayed_list), list) | ||
{ | ||
rt_timer_init(&(work->timer), "work", _delayed_work_timeout_handler, | ||
work, ticks, RT_TIMER_FLAG_ONE_SHOT | RT_TIMER_FLAG_SOFT_TIMER); | ||
work->flags |= RT_WORK_STATE_SUBMITTING; | ||
if ((work_tmp->timeout_tick - work->timeout_tick) < RT_TICK_MAX / 2) | ||
{ | ||
list_tmp = &(work_tmp->list); | ||
break; | ||
} | ||
} | ||
work->workqueue = queue; | ||
/* insert delay work list */ | ||
rt_list_insert_after(queue->delayed_list.prev, &(work->list)); | ||
rt_list_insert_before(list_tmp, &(work->list)); | ||
|
||
err = rt_timer_start(&(work->timer)); | ||
rt_completion_done(&(queue->wakeup_completion)); | ||
err = RT_EOK; | ||
} | ||
else | ||
{ | ||
err = - RT_ERROR; | ||
err = -RT_ERROR; | ||
} | ||
|
||
rt_spin_unlock_irqrestore(&(queue->spinlock), level); | ||
return err; | ||
} | ||
|
||
static rt_err_t _workqueue_cancel_work(struct rt_workqueue *queue, struct rt_work *work) | ||
{ | ||
rt_base_t level; | ||
rt_err_t err; | ||
rt_err_t err; | ||
|
||
level = rt_spin_lock_irqsave(&(queue->spinlock)); | ||
rt_list_remove(&(work->list)); | ||
work->flags &= ~RT_WORK_STATE_PENDING; | ||
/* Timer started */ | ||
if (work->flags & RT_WORK_STATE_SUBMITTING) | ||
{ | ||
if ((err = rt_timer_stop(&(work->timer))) != RT_EOK) | ||
{ | ||
goto exit; | ||
} | ||
rt_timer_detach(&(work->timer)); | ||
work->flags &= ~RT_WORK_STATE_SUBMITTING; | ||
} | ||
err = queue->work_current != work ? RT_EOK : -RT_EBUSY; | ||
work->flags = 0; | ||
err = queue->work_current != work ? RT_EOK : -RT_EBUSY; | ||
work->workqueue = RT_NULL; | ||
exit: | ||
rt_spin_unlock_irqrestore(&(queue->spinlock), level); | ||
return err; | ||
} | ||
|
||
static void _delayed_work_timeout_handler(void *parameter) | ||
{ | ||
struct rt_work *work; | ||
struct rt_workqueue *queue; | ||
rt_base_t level; | ||
|
||
work = (struct rt_work *)parameter; | ||
queue = work->workqueue; | ||
|
||
RT_ASSERT(work->flags & RT_WORK_STATE_SUBMITTING); | ||
RT_ASSERT(queue != RT_NULL); | ||
|
||
level = rt_spin_lock_irqsave(&(queue->spinlock)); | ||
rt_timer_detach(&(work->timer)); | ||
work->flags &= ~RT_WORK_STATE_SUBMITTING; | ||
/* remove delay list */ | ||
rt_list_remove(&(work->list)); | ||
/* insert work queue */ | ||
if (queue->work_current != work) | ||
{ | ||
rt_list_insert_after(queue->work_list.prev, &(work->list)); | ||
work->flags |= RT_WORK_STATE_PENDING; | ||
} | ||
/* whether the workqueue is doing work */ | ||
if (queue->work_current == RT_NULL) | ||
{ | ||
/* resume work thread, and do a re-schedule if succeed */ | ||
rt_thread_resume(queue->work_thread); | ||
} | ||
|
||
rt_spin_unlock_irqrestore(&(queue->spinlock), level); | ||
} | ||
|
||
/** | ||
* @brief Initialize a work item, binding with a callback function. | ||
* | ||
|
@@ -221,8 +197,8 @@ void rt_work_init(struct rt_work *work, | |
work->work_func = work_func; | ||
work->work_data = work_data; | ||
work->workqueue = RT_NULL; | ||
work->flags = 0; | ||
work->type = 0; | ||
work->flags = 0; | ||
work->type = 0; | ||
} | ||
|
||
/** | ||
|
@@ -248,6 +224,7 @@ struct rt_workqueue *rt_workqueue_create(const char *name, rt_uint16_t stack_siz | |
rt_list_init(&(queue->delayed_list)); | ||
queue->work_current = RT_NULL; | ||
rt_sem_init(&(queue->sem), "wqueue", 0, RT_IPC_FLAG_FIFO); | ||
rt_completion_init(&(queue->wakeup_completion)); | ||
|
||
/* create the work thread */ | ||
queue->work_thread = rt_thread_create(name, _workqueue_thread_entry, queue, stack_size, priority, 10); | ||
|
@@ -292,7 +269,6 @@ rt_err_t rt_workqueue_destroy(struct rt_workqueue *queue) | |
* @param work is a pointer to the work item object. | ||
* | ||
* @return RT_EOK Success. | ||
* -RT_EBUSY This work item is executing. | ||
*/ | ||
rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work) | ||
{ | ||
|
@@ -314,7 +290,6 @@ rt_err_t rt_workqueue_dowork(struct rt_workqueue *queue, struct rt_work *work) | |
* NOTE: The max timeout tick should be no more than (RT_TICK_MAX/2 - 1) | ||
* | ||
* @return RT_EOK Success. | ||
* -RT_EBUSY This work item is executing. | ||
* -RT_ERROR The ticks parameter is invalid. | ||
*/ | ||
rt_err_t rt_workqueue_submit_work(struct rt_workqueue *queue, struct rt_work *work, rt_tick_t ticks) | ||
|
@@ -346,14 +321,10 @@ rt_err_t rt_workqueue_urgent_work(struct rt_workqueue *queue, struct rt_work *wo | |
/* NOTE: the work MUST be initialized firstly */ | ||
rt_list_remove(&(work->list)); | ||
rt_list_insert_after(&queue->work_list, &(work->list)); | ||
/* whether the workqueue is doing work */ | ||
if (queue->work_current == RT_NULL) | ||
{ | ||
/* resume work thread, and do a re-schedule if succeed */ | ||
rt_thread_resume(queue->work_thread); | ||
} | ||
|
||
rt_completion_done(&(queue->wakeup_completion)); | ||
rt_spin_unlock_irqrestore(&(queue->spinlock), level); | ||
|
||
return RT_EOK; | ||
} | ||
|
||
|
@@ -448,7 +419,6 @@ static struct rt_workqueue *sys_workq; /* system work queue */ | |
* NOTE: The max timeout tick should be no more than (RT_TICK_MAX/2 - 1) | ||
* | ||
* @return RT_EOK Success. | ||
* -RT_EBUSY This work item is executing. | ||
* -RT_ERROR The ticks parameter is invalid. | ||
*/ | ||
rt_err_t rt_work_submit(struct rt_work *work, rt_tick_t ticks) | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我觉得这里缺少种情况的判断:比如创建一个工作队列,队列执行后再向其提交一次,正常来说第二次应该返回BUSY了
这块我看之前是有判断的,建议作者添加上:b065486#diff-b63048f1ad2089c43aeb589d9f396dfea3e95b253a0e9c7d13f785bc0613cb31L105
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
b065486#diff-b63048f1ad2089c43aeb589d9f396dfea3e95b253a0e9c7d13f785bc0613cb31L93-R354
1、看了下上面 b065486 ,这次提交不是去掉BUSY判断用的吗?和我的思路不冲突啊。
2、并且我认为去掉 busy是个明智之举。因为他可以在 自己的回调里面 再次触发自己,这个是很常用的(至少我很常用,我自己的项目里面大量的使用了回调函数里面再次触发同一个 work的逻辑)。
3、正在触发回调的work, b065486 和我的思路是一样的,都是运行再次添加。
4、实际上我为了处理这个问题,在线程回调的地方,把回调函数在临界区保存为临时变量了(work_func 和 work_data)。退出临界区之后实际上 work 结构体已经是安全的了。f0415bb?diff=split&w=0#diff-b63048f1ad2089c43aeb589d9f396dfea3e95b253a0e9c7d13f785bc0613cb31R96-R109
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我没描述清楚,假设这样的条件:工作队列线程回调还没执行完成再次发生了调度,然后在其他线程又向队列提交了一个任务,那么这个时候队列应该处于BUSY吧。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1、现在需要明确的是:在我这个PR之前,BUSY判断就已经去掉了。我只是沿用他的去掉,我为啥要给他加回去那?
2、你第一条回复的那个提交 b065486 就是去掉 BUSY判断的啊。理由是work正在执行,这时候发生中断,这个中断调用rt_workqueue_submit_work,导致work丢掉。你为啥要用一和你观点完全想法的提交 b065486 来证明你的观点那?我又确认了一遍 b065486 ,好像不是我理解错了把?
3、同时说一下 我认为 去掉BUSY合理的原因如下(个人观点):