Skip to content

Commit

Permalink
The message queue feature should to add a interface to receive the me…
Browse files Browse the repository at this point in the history
…ssage from back of the list.
  • Loading branch information
At-EC committed Mar 13, 2024
1 parent cd5b874 commit 731a6f2
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 20 deletions.
6 changes: 3 additions & 3 deletions build_version.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
extern "C" {
#endif

#define ATOS_BUILD_TIME "2024-03-13,19:50"
#define ATOS_COMMIT_HEAD_ID "0b07627541ff9e203edfad7b9cefbc8eb33a7bdd"
#define ATOS_BUILD_TIME "2024-03-13,21:07"
#define ATOS_COMMIT_HEAD_ID "cd5b8742d18ccd1df77056635438b1b1bdf857b8"
#define ATOS_VERSION_MAJOR_NUMBER (1u)
#define ATOS_VERSION_MINOR_NUMBER (2u)
#define ATOS_VERSION_PATCH_NUMBER (5u)
#define ATOS_VERSION_PATCH_NUMBER (6u)

#define ATOS_VERSION_MAJOR_NUMBER_MASK (0x03FFu)
#define ATOS_VERSION_MAJOR_NUMBER_POS (22u)
Expand Down
11 changes: 6 additions & 5 deletions include/kernal/at_rtos.h
Original file line number Diff line number Diff line change
Expand Up @@ -609,21 +609,22 @@ static inline u32p_t msgq_put(os_msgq_id_t id, const u8_t *pUserBuffer, u16_t bu
* @param id The queue unique id.
* @param pUserBuffer The pointer of the message buffer address.
* @param bufferSize The queue buffer size.
* @param isFromBack The direction of the message operation.
* @param timeout_ms The queue send timeout option.
*
* @return The result of the operation.
**
* demo usage:
*
* u8_t rxdata = 0u;
* u32p_t postcode = msgq_receive(sample_id, &rxdata, 0x01u, OS_WAIT_FOREVER);
* u32p_t postcode = msgq_receive(sample_id, &rxdata, 0x01u, TRUE, OS_WAIT_FOREVER);
* if (PC_IOK(postcode)) {
* printf("Message queue receive successful, the rx data is 0x%x\n", rxdata);
* } else {
* printf("Message queue receive error: 0x%x\n", postcode);
* }
*
* postcode = msgq_receive(sample_id, &rxdata, 0x01u, 1000u);
* postcode = msgq_receive(sample_id, &rxdata, 0x01u, FALSE, 1000u);
* if (PC_IOK(postcode)) {
* if (postcode == PC_SC_TIMEOUT) {
* printf("Message queue receive timeout\n");
Expand All @@ -634,9 +635,9 @@ static inline u32p_t msgq_put(os_msgq_id_t id, const u8_t *pUserBuffer, u16_t bu
* printf("Message queue receive error: 0x%x\n", postcode);
* }
*/
static inline u32p_t msgq_get(os_msgq_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, u32_t timeout_ms)
static inline u32p_t msgq_get(os_msgq_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, b_t isFromBack, u32_t timeout_ms)
{
return (u32p_t)_impl_queue_receive(id.val, pUserBuffer, bufferSize, timeout_ms);
return (u32p_t)_impl_queue_receive(id.val, pUserBuffer, bufferSize, isFromBack, timeout_ms);
}

/**
Expand Down Expand Up @@ -741,7 +742,7 @@ typedef struct {

os_msgq_id_t (*msgq_init)(const void *, u16_t, u16_t, const char_t *);
u32p_t (*msgq_put)(os_msgq_id_t, const u8_t *, u16_t, b_t, u32_t);
u32p_t (*msgq_get)(os_msgq_id_t, const u8_t *, u16_t, u32_t);
u32p_t (*msgq_get)(os_msgq_id_t, const u8_t *, u16_t, b_t, u32_t);

b_t (*id_isInvalid)(struct os_id);
u32p_t (*at_rtos_run)(void);
Expand Down
2 changes: 1 addition & 1 deletion include/kernal/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ extern "C" {
u32_t _impl_queue_os_id_to_number(os_id_t id);
os_id_t _impl_queue_init(const void *pQueueBufferAddr, u16_t elementLen, u16_t elementNum, const char_t *pName);
u32p_t _impl_queue_send(os_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, b_t isToFront, u32_t timeout_ms);
u32p_t _impl_queue_receive(os_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, u32_t timeout_ms);
u32p_t _impl_queue_receive(os_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, b_t isFromBack, u32_t timeout_ms);

#ifdef __cplusplus
}
Expand Down
50 changes: 42 additions & 8 deletions kernal/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,29 @@ static void _message_receive(queue_context_t *pCurQueue, const u8_t *pUserBuffer
pCurQueue->cacheSize--;
}

/**
* @brief Receive a queue message from back.
*
* @param pCurQueue The current queue context.
* @param pUserBuffer The pointer of user's message buffer.
* @param userSize The size of user's message buffer.
*/
static void _message_receive_behind(queue_context_t *pCurQueue, const u8_t *pUserBuffer, u16_t userSize)
{
u8_t *pInBuffer = NULL;

if (pCurQueue->leftPosition) {
pCurQueue->leftPosition--;
} else {
pCurQueue->leftPosition = pCurQueue->elementNumber - 1;
}
pCurQueue->cacheSize--;

pInBuffer = (u8_t *)((u32_t)((pCurQueue->leftPosition * pCurQueue->elementLength) + (u32_t)pCurQueue->pQueueBufferAddress));
_memset((char_t *)pInBuffer, 0x0u, pCurQueue->elementLength);
_memcpy((char_t *)pInBuffer, (const char_t *)pUserBuffer, userSize);
}

/**
* @brief Send a queue message.
*
Expand Down Expand Up @@ -296,11 +319,12 @@ u32p_t _impl_queue_send(os_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, b
* @param id The queue unique id.
* @param pUserBuffer The pointer of the message buffer address.
* @param bufferSize The queue buffer size.
* @param isFromBack The direction of the message operation.
* @param timeout_ms The queue send timeout option.
*
* @return The result of the operation.
*/
u32p_t _impl_queue_receive(os_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, u32_t timeout_ms)
u32p_t _impl_queue_receive(os_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, b_t isFromBack, u32_t timeout_ms)
{
if (_queue_id_isInvalid(id)) {
return _PC_CMPT_FAILED;
Expand All @@ -317,10 +341,8 @@ u32p_t _impl_queue_receive(os_id_t id, const u8_t *pUserBuffer, u16_t bufferSize
}

arguments_t arguments[] = {
[0] = {.u32_val = (u32_t)id},
[1] = {.ptr_val = (const void *)pUserBuffer},
[2] = {.u16_val = (u16_t)bufferSize},
[3] = {.u32_val = (u32_t)timeout_ms},
[0] = {.u32_val = (u32_t)id}, [1] = {.ptr_val = (const void *)pUserBuffer}, [2] = {.u16_val = (u16_t)bufferSize},
[3] = {.b_val = (b_t)isFromBack}, [4] = {.u32_val = (u32_t)timeout_ms},
};

u32p_t postcode = _impl_kernal_privilege_invoke((const void *)_queue_receive_privilege_routine, arguments);
Expand Down Expand Up @@ -471,7 +493,8 @@ static u32_t _queue_receive_privilege_routine(arguments_t *pArgs)
os_id_t id = (os_id_t)pArgs[0].u32_val;
const u8_t *pUserBuffer = (const u8_t *)pArgs[1].ptr_val;
u16_t bufferSize = (u16_t)pArgs[2].u16_val;
u32_t timeout_ms = (u32_t)pArgs[3].u32_val;
b_t isBack = (b_t)pArgs[3].b_val;
u32_t timeout_ms = (u32_t)pArgs[4].u32_val;
queue_context_t *pCurQueue = NULL;
thread_context_t *pCurThread = NULL;
u32p_t postcode = PC_SC_SUCCESS;
Expand All @@ -489,8 +512,10 @@ static u32_t _queue_receive_privilege_routine(arguments_t *pArgs)
return _PC_CMPT_FAILED;
}

_memset((char *)&pCurThread->queue, 0x0u, sizeof(action_queue_t));
pCurThread->queue.pUserBufferAddress = pUserBuffer;
pCurThread->queue.userBufferSize = bufferSize;
pCurThread->queue.fromBack = isBack;

postcode = _impl_kernal_thread_exit_trigger(pCurThread->head.id, id, _queue_list_OutBlockingHeadGet(id), timeout_ms,
_queue_callback_fromTimeOut);
Expand All @@ -499,7 +524,11 @@ static u32_t _queue_receive_privilege_routine(arguments_t *pArgs)
postcode = PC_SC_UNAVAILABLE;
}
} else {
_message_receive(pCurQueue, pUserBuffer, bufferSize);
if (pCurThread->queue.fromBack) {
_message_receive_behind(pCurQueue, pUserBuffer, bufferSize);
} else {
_message_receive(pCurQueue, pUserBuffer, bufferSize);
}

/* Try to wakeup a blocking thread */
list_iterator_t it = {0u};
Expand Down Expand Up @@ -570,7 +599,12 @@ static void _queue_schedule(os_id_t id)

if ((isRxAvail) || (isTxAvail)) {
if (isRxAvail) {
_message_receive((queue_context_t *)pCurQueue, pEntryThread->queue.pUserBufferAddress, pEntryThread->queue.userBufferSize);
if (pEntryThread->queue.fromBack) {
_message_receive_behind((queue_context_t *)pCurQueue, pEntryThread->queue.pUserBufferAddress,
pEntryThread->queue.userBufferSize);
} else {
_message_receive((queue_context_t *)pCurQueue, pEntryThread->queue.pUserBufferAddress, pEntryThread->queue.userBufferSize);
}
} else if (isTxAvail) {
if (pEntryThread->queue.toFront) {
_message_send_front((queue_context_t *)pCurQueue, pEntryThread->queue.pUserBufferAddress,
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "At-RTOS",
"homepage": "https://github.com/At-EC/At-RTOS",
"version": "1.2.5",
"timestamp": "2024-03-13,19:50",
"commit_id": "f73213240817ea6f3dd241c53ebcd1fc2fc7db3c"
"version": "1.2.6",
"timestamp": "2024-03-13,21:07",
"commit_id": "0b07627541ff9e203edfad7b9cefbc8eb33a7bdd"
}

0 comments on commit 731a6f2

Please sign in to comment.