Skip to content

Commit

Permalink
Merge pull request #16 from At-EC/private/riven/msgq_send_front
Browse files Browse the repository at this point in the history
The message queue feature should to add a interface to send the messag…
  • Loading branch information
At-EC authored Mar 9, 2024
2 parents f732132 + 849afef commit 0b07627
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 42 deletions.
6 changes: 3 additions & 3 deletions build_version.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
extern "C" {
#endif

#define ATOS_BUILD_TIME "2024-03-03,21:12"
#define ATOS_COMMIT_HEAD_ID "885c527a8d65f05fc34321ff3e3e1e63a4e12f85"
#define ATOS_BUILD_TIME "2024-03-09,20:56"
#define ATOS_COMMIT_HEAD_ID "f73213240817ea6f3dd241c53ebcd1fc2fc7db3c"
#define ATOS_VERSION_MAJOR_NUMBER (1u)
#define ATOS_VERSION_MINOR_NUMBER (2u)
#define ATOS_VERSION_PATCH_NUMBER (3u)
#define ATOS_VERSION_PATCH_NUMBER (4u)

#define ATOS_VERSION_MAJOR_NUMBER_MASK (0x03FFu)
#define ATOS_VERSION_MAJOR_NUMBER_POS (22u)
Expand Down
15 changes: 8 additions & 7 deletions include/kernal/at_rtos.h
Original file line number Diff line number Diff line change
Expand Up @@ -572,21 +572,22 @@ static inline os_msgq_id_t msgq_init(const void *pQueueBufferAddr, u16_t element
* @param id The queue unique id.
* @param pUserBuffer The pointer of the message buffer address.
* @param bufferSize The queue buffer size.
* @param isToFront The direction of the message operation.
* @param timeout_ms The queue send timeout option.
*
* @return The result of the operation.
**
* demo usage:
*
* u8_t txdata = 0u;
* u32p_t postcode = msgq_send(sample_id, &txdata, 0x01u, OS_WAIT_FOREVER);
* u32p_t postcode = msgq_send(sample_id, &txdata, 0x01u, FALSE, OS_WAIT_FOREVER);
* if (PC_IOK(postcode)) {
* printf("Message queue send successful\n");
* } else {
* printf("Message queue send error: 0x%x\n", postcode);
* }
*
* postcode = msgq_send(sample_id, &txdata, 0x01u, 1000u);
* postcode = msgq_send(sample_id, &txdata, 0x01u, TRUE, 1000u);
* if (PC_IOK(postcode)) {
* if (postcode == PC_SC_TIMEOUT) {
* printf("Message queue send timeout\n");
Expand All @@ -597,9 +598,9 @@ static inline os_msgq_id_t msgq_init(const void *pQueueBufferAddr, u16_t element
* printf("Message queue send error: 0x%x\n", postcode);
* }
*/
static inline u32p_t msgq_send(os_msgq_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, u32_t timeout_ms)
static inline u32p_t msgq_put(os_msgq_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, b_t isToFront, u32_t timeout_ms)
{
return (u32p_t)_impl_queue_send(id.val, pUserBuffer, bufferSize, timeout_ms);
return (u32p_t)_impl_queue_send(id.val, pUserBuffer, bufferSize, isToFront, timeout_ms);
}

/**
Expand Down Expand Up @@ -633,7 +634,7 @@ static inline u32p_t msgq_send(os_msgq_id_t id, const u8_t *pUserBuffer, u16_t b
* printf("Message queue receive error: 0x%x\n", postcode);
* }
*/
static inline u32p_t msgq_receive(os_msgq_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, u32_t timeout_ms)
static inline u32p_t msgq_get(os_msgq_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, u32_t timeout_ms)
{
return (u32p_t)_impl_queue_receive(id.val, pUserBuffer, bufferSize, timeout_ms);
}
Expand Down Expand Up @@ -739,8 +740,8 @@ typedef struct {
u32p_t (*evt_wait)(os_evt_id_t, u32_t *, u32_t, u32_t, u32_t);

os_msgq_id_t (*msgq_init)(const void *, u16_t, u16_t, const char_t *);
u32p_t (*msgq_send)(os_msgq_id_t, const u8_t *, u16_t, u32_t);
u32p_t (*msgq_receive)(os_msgq_id_t, const u8_t *, u16_t, u32_t);
u32p_t (*msgq_put)(os_msgq_id_t, const u8_t *, u16_t, b_t, u32_t);
u32p_t (*msgq_get)(os_msgq_id_t, const u8_t *, u16_t, u32_t);

b_t (*id_isInvalid)(struct os_id);
u32p_t (*at_rtos_run)(void);
Expand Down
2 changes: 2 additions & 0 deletions include/kernal/kstruct.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ typedef struct {
typedef struct {
const u8_t *pUserBufferAddress;
u16_t userBufferSize;
b_t toFront;
b_t fromBack;
} action_queue_t;

typedef struct {
Expand Down
2 changes: 1 addition & 1 deletion include/kernal/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ extern "C" {
*/
u32_t _impl_queue_os_id_to_number(os_id_t id);
os_id_t _impl_queue_init(const void *pQueueBufferAddr, u16_t elementLen, u16_t elementNum, const char_t *pName);
u32p_t _impl_queue_send(os_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, u32_t timeout_ms);
u32p_t _impl_queue_send(os_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, b_t isToFront, u32_t timeout_ms);
u32p_t _impl_queue_receive(os_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, u32_t timeout_ms);

#ifdef __cplusplus
Expand Down
7 changes: 5 additions & 2 deletions kernal/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ static void _event_callback_fromTimeOut(os_id_t id)
*/
u32_t _impl_event_os_id_to_number(os_id_t id)
{
return (u32_t)(_event_id_isInvalid(id) ? (0u)
: (id - _impl_kernal_member_id_toUnifiedIdStart(KERNAL_MEMBER_EVENT)) / sizeof(event_context_t));
if (_event_id_isInvalid(id)) {
return 0u;
}

return (u32_t)((id - _impl_kernal_member_id_toUnifiedIdStart(KERNAL_MEMBER_EVENT)) / sizeof(event_context_t));
}

/**
Expand Down
4 changes: 2 additions & 2 deletions kernal/kthread.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ const at_rtos_api_t AtOS = {
.evt_wait = evt_wait,

.msgq_init = msgq_init,
.msgq_send = msgq_send,
.msgq_receive = msgq_receive,
.msgq_put = msgq_put,
.msgq_get = msgq_get,

.id_isInvalid = os_id_is_invalid,
.at_rtos_run = kernal_atos_run,
Expand Down
83 changes: 59 additions & 24 deletions kernal/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,29 @@ static void _message_send(queue_context_t *pCurQueue, const u8_t *pUserBuffer, u
pCurQueue->cacheSize++;
}

/**
* @brief Send a queue message to front.
*
* @param pCurQueue The current queue context.
* @param pUserBuffer The pointer of user's message buffer.
* @param userSize The size of user's message buffer.
*/
static void _message_send_front(queue_context_t *pCurQueue, const u8_t *pUserBuffer, u16_t userSize)
{
u8_t *pInBuffer = NULL;

if (pCurQueue->rightPosition) {
pCurQueue->rightPosition--;
} else {
pCurQueue->rightPosition = pCurQueue->elementNumber - 1;
}
pCurQueue->cacheSize++;

pInBuffer = (u8_t *)((u32_t)((pCurQueue->rightPosition * pCurQueue->elementLength) + (u32_t)pCurQueue->pQueueBufferAddress));
_memset((char_t *)pInBuffer, 0x0u, pCurQueue->elementLength);
_memcpy((char_t *)pInBuffer, (const char_t *)pUserBuffer, userSize);
}

/**
* @brief Receive a queue message.
*
Expand Down Expand Up @@ -225,11 +248,12 @@ static void _message_receive(queue_context_t *pCurQueue, const u8_t *pUserBuffer
* @param id The queue unique id.
* @param pUserBuffer The pointer of the message buffer address.
* @param bufferSize The queue buffer size.
* @param isToFront The direction of the message operation.
* @param timeout_ms The queue send timeout option.
*
* @return The result of the operation.
*/
u32p_t _impl_queue_send(os_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, u32_t timeout_ms)
u32p_t _impl_queue_send(os_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, b_t isToFront, u32_t timeout_ms)
{
if (_queue_id_isInvalid(id)) {
return _PC_CMPT_FAILED;
Expand All @@ -246,10 +270,8 @@ u32p_t _impl_queue_send(os_id_t id, const u8_t *pUserBuffer, u16_t bufferSize, u
}

arguments_t arguments[] = {
[0] = {.u32_val = (u32_t)id},
[1] = {.ptr_val = (const void *)pUserBuffer},
[2] = {.u16_val = (u16_t)bufferSize},
[3] = {.u32_val = (u32_t)timeout_ms},
[0] = {.u32_val = (u32_t)id}, [1] = {.ptr_val = (const void *)pUserBuffer}, [2] = {.u16_val = (u16_t)bufferSize},
[3] = {.b_val = (b_t)isToFront}, [4] = {.u32_val = (u32_t)timeout_ms},
};

u32p_t postcode = _impl_kernal_privilege_invoke((const void *)_queue_send_privilege_routine, arguments);
Expand Down Expand Up @@ -384,7 +406,8 @@ static u32_t _queue_send_privilege_routine(arguments_t *pArgs)
os_id_t id = (os_id_t)pArgs[0].u32_val;
const u8_t *pUserBuffer = (const void *)pArgs[1].ptr_val;
u16_t bufferSize = (u16_t)pArgs[2].u16_val;
u32_t timeout_ms = (u32_t)pArgs[3].u32_val;
b_t isFront = (b_t)pArgs[3].b_val;
u32_t timeout_ms = (u32_t)pArgs[4].u32_val;
queue_context_t *pCurQueue = NULL;
thread_context_t *pCurThread = NULL;
u32p_t postcode = PC_SC_SUCCESS;
Expand All @@ -400,19 +423,26 @@ static u32_t _queue_send_privilege_routine(arguments_t *pArgs)
if (timeout_ms == OS_TIME_NOWAIT_VAL) {
EXIT_CRITICAL_SECTION();
return _PC_CMPT_FAILED;
} else {
pCurThread->queue.pUserBufferAddress = pUserBuffer;
pCurThread->queue.userBufferSize = bufferSize;
}

postcode = _impl_kernal_thread_exit_trigger(pCurThread->head.id, id, _queue_list_inBlockingHeadGet(id), timeout_ms,
_queue_callback_fromTimeOut);
_memset((char *)&pCurThread->queue, 0x0u, sizeof(action_queue_t));

if (PC_IOK(postcode)) {
postcode = PC_SC_UNAVAILABLE;
}
pCurThread->queue.pUserBufferAddress = pUserBuffer;
pCurThread->queue.userBufferSize = bufferSize;
pCurThread->queue.toFront = isFront;

postcode = _impl_kernal_thread_exit_trigger(pCurThread->head.id, id, _queue_list_inBlockingHeadGet(id), timeout_ms,
_queue_callback_fromTimeOut);

if (PC_IOK(postcode)) {
postcode = PC_SC_UNAVAILABLE;
}
} else {
_message_send(pCurQueue, pUserBuffer, bufferSize);
if (pCurThread->queue.toFront) {
_message_send_front(pCurQueue, pUserBuffer, bufferSize);
} else {
_message_send(pCurQueue, pUserBuffer, bufferSize);
}

/* Try to wakeup a blocking thread */
list_iterator_t it = {0u};
Expand Down Expand Up @@ -457,16 +487,16 @@ static u32_t _queue_receive_privilege_routine(arguments_t *pArgs)
if (timeout_ms == OS_TIME_NOWAIT_VAL) {
EXIT_CRITICAL_SECTION();
return _PC_CMPT_FAILED;
} else {
pCurThread->queue.pUserBufferAddress = pUserBuffer;
pCurThread->queue.userBufferSize = bufferSize;
}

postcode = _impl_kernal_thread_exit_trigger(pCurThread->head.id, id, _queue_list_OutBlockingHeadGet(id), timeout_ms,
_queue_callback_fromTimeOut);
pCurThread->queue.pUserBufferAddress = pUserBuffer;
pCurThread->queue.userBufferSize = bufferSize;

if (PC_IOK(postcode)) {
postcode = PC_SC_UNAVAILABLE;
}
postcode = _impl_kernal_thread_exit_trigger(pCurThread->head.id, id, _queue_list_OutBlockingHeadGet(id), timeout_ms,
_queue_callback_fromTimeOut);

if (PC_IOK(postcode)) {
postcode = PC_SC_UNAVAILABLE;
}
} else {
_message_receive(pCurQueue, pUserBuffer, bufferSize);
Expand Down Expand Up @@ -542,7 +572,12 @@ static void _queue_schedule(os_id_t id)
if (isRxAvail) {
_message_receive((queue_context_t *)pCurQueue, pEntryThread->queue.pUserBufferAddress, pEntryThread->queue.userBufferSize);
} else if (isTxAvail) {
_message_send((queue_context_t *)pCurQueue, pEntryThread->queue.pUserBufferAddress, pEntryThread->queue.userBufferSize);
if (pEntryThread->queue.toFront) {
_message_send_front((queue_context_t *)pCurQueue, pEntryThread->queue.pUserBufferAddress,
pEntryThread->queue.userBufferSize);
} else {
_message_send((queue_context_t *)pCurQueue, pEntryThread->queue.pUserBufferAddress, pEntryThread->queue.userBufferSize);
}
}
pEntry->result = PC_SC_SUCCESS;
}
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "At-RTOS",
"homepage": "https://github.com/At-EC/At-RTOS",
"version": "1.2.3",
"timestamp": "2024-03-03,21:12",
"commit_id": "2430d2a6e685f2ec221717cd217e669cc9b45bdc"
"version": "1.2.4",
"timestamp": "2024-03-09,20:56",
"commit_id": "885c527a8d65f05fc34321ff3e3e1e63a4e12f85"
}

0 comments on commit 0b07627

Please sign in to comment.