[PATCH 24/45] score: Fine grained locking for message queues
Sebastian Huber
sebastian.huber at embedded-brains.de
Fri May 15 11:41:24 UTC 2015
Aggregate several critical sections into a bigger one. Sending and
receiving messages is now protected by an ISR lock. Thread dispatching
is only disabled in case a blocking operation is necessary. The message
copy procedure is done inside the critical section (interrupts
disabled). Thus this change may have a negative impact on the interrupt
latency in case very large messages are transferred.
Update #2273.
---
cpukit/posix/include/rtems/posix/mqueueimpl.h | 15 ++++
cpukit/posix/src/mqueuerecvsupp.c | 15 ++--
cpukit/posix/src/mqueuesendsupp.c | 14 +--
cpukit/rtems/include/rtems/rtems/messageimpl.h | 15 ++++
cpukit/rtems/src/msgqbroadcast.c | 12 ++-
cpukit/rtems/src/msgqflush.c | 13 ++-
cpukit/rtems/src/msgqreceive.c | 11 ++-
cpukit/rtems/src/msgqsend.c | 12 ++-
cpukit/rtems/src/msgqurgent.c | 11 ++-
cpukit/score/include/rtems/score/coremsgimpl.h | 120 ++++++++++++++++++++++---
cpukit/score/src/coremsgbroadcast.c | 62 ++++++-------
cpukit/score/src/coremsgclose.c | 8 +-
cpukit/score/src/coremsgflush.c | 9 +-
cpukit/score/src/coremsginsert.c | 6 --
cpukit/score/src/coremsgseize.c | 30 +++++--
cpukit/score/src/coremsgsubmit.c | 84 +++++++++--------
doc/user/msg.t | 5 +-
17 files changed, 305 insertions(+), 137 deletions(-)
diff --git a/cpukit/posix/include/rtems/posix/mqueueimpl.h b/cpukit/posix/include/rtems/posix/mqueueimpl.h
index bfc850d..90269bf 100644
--- a/cpukit/posix/include/rtems/posix/mqueueimpl.h
+++ b/cpukit/posix/include/rtems/posix/mqueueimpl.h
@@ -250,6 +250,21 @@ RTEMS_INLINE_ROUTINE POSIX_Message_queue_Control_fd *_POSIX_Message_queue_Get_fd
location
);
}
+
+RTEMS_INLINE_ROUTINE POSIX_Message_queue_Control_fd *
+_POSIX_Message_queue_Get_fd_interrupt_disable(
+ mqd_t id,
+ Objects_Locations *location,
+ ISR_lock_Context *lock_context
+)
+{
+ return (POSIX_Message_queue_Control_fd *) _Objects_Get_isr_disable(
+ &_POSIX_Message_queue_Information_fds,
+ (Objects_Id)id,
+ location,
+ lock_context
+ );
+}
/**
* @see _POSIX_Name_to_id().
diff --git a/cpukit/posix/src/mqueuerecvsupp.c b/cpukit/posix/src/mqueuerecvsupp.c
index bea19fe..2f9bb2d 100644
--- a/cpukit/posix/src/mqueuerecvsupp.c
+++ b/cpukit/posix/src/mqueuerecvsupp.c
@@ -54,20 +54,25 @@ ssize_t _POSIX_Message_queue_Receive_support(
size_t length_out;
bool do_wait;
Thread_Control *executing;
+ ISR_lock_Context lock_context;
- the_mq_fd = _POSIX_Message_queue_Get_fd( mqdes, &location );
+ the_mq_fd = _POSIX_Message_queue_Get_fd_interrupt_disable(
+ mqdes,
+ &location,
+ &lock_context
+ );
switch ( location ) {
case OBJECTS_LOCAL:
if ( (the_mq_fd->oflag & O_ACCMODE) == O_WRONLY ) {
- _Objects_Put( &the_mq_fd->Object );
+ _ISR_lock_ISR_enable( &lock_context );
rtems_set_errno_and_return_minus_one( EBADF );
}
the_mq = the_mq_fd->Queue;
if ( msg_len < the_mq->Message_queue.maximum_message_size ) {
- _Objects_Put( &the_mq_fd->Object );
+ _ISR_lock_ISR_enable( &lock_context );
rtems_set_errno_and_return_minus_one( EMSGSIZE );
}
@@ -97,10 +102,10 @@ ssize_t _POSIX_Message_queue_Receive_support(
msg_ptr,
&length_out,
do_wait,
- timeout
+ timeout,
+ &lock_context
);
- _Objects_Put( &the_mq_fd->Object );
if (msg_prio) {
*msg_prio = _POSIX_Message_queue_Priority_from_core(
executing->Wait.count
diff --git a/cpukit/posix/src/mqueuesendsupp.c b/cpukit/posix/src/mqueuesendsupp.c
index 2d6ddae..d73538a 100644
--- a/cpukit/posix/src/mqueuesendsupp.c
+++ b/cpukit/posix/src/mqueuesendsupp.c
@@ -64,6 +64,7 @@ int _POSIX_Message_queue_Send_support(
CORE_message_queue_Status msg_status;
bool do_wait;
Thread_Control *executing;
+ ISR_lock_Context lock_context;
/*
* Validate the priority.
@@ -73,12 +74,16 @@ int _POSIX_Message_queue_Send_support(
if ( msg_prio > MQ_PRIO_MAX )
rtems_set_errno_and_return_minus_one( EINVAL );
- the_mq_fd = _POSIX_Message_queue_Get_fd( mqdes, &location );
+ the_mq_fd = _POSIX_Message_queue_Get_fd_interrupt_disable(
+ mqdes,
+ &location,
+ &lock_context
+ );
switch ( location ) {
case OBJECTS_LOCAL:
if ( (the_mq_fd->oflag & O_ACCMODE) == O_RDONLY ) {
- _Objects_Put( &the_mq_fd->Object );
+ _ISR_lock_ISR_enable( &lock_context );
rtems_set_errno_and_return_minus_one( EBADF );
}
@@ -105,11 +110,10 @@ int _POSIX_Message_queue_Send_support(
NULL,
_POSIX_Message_queue_Priority_to_core( msg_prio ),
do_wait,
- timeout /* no timeout */
+ timeout, /* no timeout */
+ &lock_context
);
- _Objects_Put( &the_mq_fd->Object );
-
/*
* If we had to block, then this is where the task returns
* after it wakes up. The returned status is correct for
diff --git a/cpukit/rtems/include/rtems/rtems/messageimpl.h b/cpukit/rtems/include/rtems/rtems/messageimpl.h
index fa9e573..2399d65 100644
--- a/cpukit/rtems/include/rtems/rtems/messageimpl.h
+++ b/cpukit/rtems/include/rtems/rtems/messageimpl.h
@@ -139,6 +139,21 @@ RTEMS_INLINE_ROUTINE Message_queue_Control *_Message_queue_Get (
_Objects_Get( &_Message_queue_Information, id, location );
}
+RTEMS_INLINE_ROUTINE Message_queue_Control *
+_Message_queue_Get_interrupt_disable(
+ Objects_Id id,
+ Objects_Locations *location,
+ ISR_lock_Context *lock_context
+)
+{
+ return (Message_queue_Control *) _Objects_Get_isr_disable(
+ &_Message_queue_Information,
+ id,
+ location,
+ lock_context
+ );
+}
+
RTEMS_INLINE_ROUTINE Message_queue_Control *_Message_queue_Allocate( void )
{
return (Message_queue_Control *)
diff --git a/cpukit/rtems/src/msgqbroadcast.c b/cpukit/rtems/src/msgqbroadcast.c
index 64ea80e..aabbf3f 100644
--- a/cpukit/rtems/src/msgqbroadcast.c
+++ b/cpukit/rtems/src/msgqbroadcast.c
@@ -40,6 +40,7 @@ rtems_status_code rtems_message_queue_broadcast(
Message_queue_Control *the_message_queue;
Objects_Locations location;
CORE_message_queue_Status core_status;
+ ISR_lock_Context lock_context;
if ( !buffer )
return RTEMS_INVALID_ADDRESS;
@@ -47,7 +48,11 @@ rtems_status_code rtems_message_queue_broadcast(
if ( !count )
return RTEMS_INVALID_ADDRESS;
- the_message_queue = _Message_queue_Get( id, &location );
+ the_message_queue = _Message_queue_Get_interrupt_disable(
+ id,
+ &location,
+ &lock_context
+ );
switch ( location ) {
case OBJECTS_LOCAL:
@@ -61,10 +66,9 @@ rtems_status_code rtems_message_queue_broadcast(
#else
NULL,
#endif
- count
+ count,
+ &lock_context
);
-
- _Objects_Put( &the_message_queue->Object );
return
_Message_queue_Translate_core_message_queue_return_code( core_status );
diff --git a/cpukit/rtems/src/msgqflush.c b/cpukit/rtems/src/msgqflush.c
index 7ae7ef4..809c243 100644
--- a/cpukit/rtems/src/msgqflush.c
+++ b/cpukit/rtems/src/msgqflush.c
@@ -54,16 +54,23 @@ rtems_status_code rtems_message_queue_flush(
{
Message_queue_Control *the_message_queue;
Objects_Locations location;
+ ISR_lock_Context lock_context;
if ( !count )
return RTEMS_INVALID_ADDRESS;
- the_message_queue = _Message_queue_Get( id, &location );
+ the_message_queue = _Message_queue_Get_interrupt_disable(
+ id,
+ &location,
+ &lock_context
+ );
switch ( location ) {
case OBJECTS_LOCAL:
- *count = _CORE_message_queue_Flush( &the_message_queue->message_queue );
- _Objects_Put( &the_message_queue->Object );
+ *count = _CORE_message_queue_Flush(
+ &the_message_queue->message_queue,
+ &lock_context
+ );
return RTEMS_SUCCESSFUL;
#if defined(RTEMS_MULTIPROCESSING)
diff --git a/cpukit/rtems/src/msgqreceive.c b/cpukit/rtems/src/msgqreceive.c
index db09cfe..2b9a4e7 100644
--- a/cpukit/rtems/src/msgqreceive.c
+++ b/cpukit/rtems/src/msgqreceive.c
@@ -42,6 +42,7 @@ rtems_status_code rtems_message_queue_receive(
Objects_Locations location;
bool wait;
Thread_Control *executing;
+ ISR_lock_Context lock_context;
if ( !buffer )
return RTEMS_INVALID_ADDRESS;
@@ -49,7 +50,11 @@ rtems_status_code rtems_message_queue_receive(
if ( !size )
return RTEMS_INVALID_ADDRESS;
- the_message_queue = _Message_queue_Get( id, &location );
+ the_message_queue = _Message_queue_Get_interrupt_disable(
+ id,
+ &location,
+ &lock_context
+ );
switch ( location ) {
case OBJECTS_LOCAL:
@@ -66,9 +71,9 @@ rtems_status_code rtems_message_queue_receive(
buffer,
size,
wait,
- timeout
+ timeout,
+ &lock_context
);
- _Objects_Put( &the_message_queue->Object );
return _Message_queue_Translate_core_message_queue_return_code(
executing->Wait.return_code
);
diff --git a/cpukit/rtems/src/msgqsend.c b/cpukit/rtems/src/msgqsend.c
index 34b7c29..fb3979e 100644
--- a/cpukit/rtems/src/msgqsend.c
+++ b/cpukit/rtems/src/msgqsend.c
@@ -62,11 +62,16 @@ rtems_status_code rtems_message_queue_send(
Message_queue_Control *the_message_queue;
Objects_Locations location;
CORE_message_queue_Status status;
+ ISR_lock_Context lock_context;
if ( !buffer )
return RTEMS_INVALID_ADDRESS;
- the_message_queue = _Message_queue_Get( id, &location );
+ the_message_queue = _Message_queue_Get_interrupt_disable(
+ id,
+ &location,
+ &lock_context
+ );
switch ( location ) {
case OBJECTS_LOCAL:
@@ -77,11 +82,10 @@ rtems_status_code rtems_message_queue_send(
id,
MESSAGE_QUEUE_MP_HANDLER,
false, /* sender does not block */
- 0 /* no timeout */
+ 0, /* no timeout */
+ &lock_context
);
- _Objects_Put( &the_message_queue->Object );
-
/*
* Since this API does not allow for blocking sends, we can directly
* return the returned status.
diff --git a/cpukit/rtems/src/msgqurgent.c b/cpukit/rtems/src/msgqurgent.c
index 85a9d4f..e6ae5ef 100644
--- a/cpukit/rtems/src/msgqurgent.c
+++ b/cpukit/rtems/src/msgqurgent.c
@@ -45,11 +45,16 @@ rtems_status_code rtems_message_queue_urgent(
Message_queue_Control *the_message_queue;
Objects_Locations location;
CORE_message_queue_Status status;
+ ISR_lock_Context lock_context;
if ( !buffer )
return RTEMS_INVALID_ADDRESS;
- the_message_queue = _Message_queue_Get( id, &location );
+ the_message_queue = _Message_queue_Get_interrupt_disable(
+ id,
+ &location,
+ &lock_context
+ );
switch ( location ) {
case OBJECTS_LOCAL:
@@ -60,9 +65,9 @@ rtems_status_code rtems_message_queue_urgent(
id,
MESSAGE_QUEUE_MP_HANDLER,
false, /* sender does not block */
- 0 /* no timeout */
+ 0, /* no timeout */
+ &lock_context
);
- _Objects_Put( &the_message_queue->Object );
/*
* Since this API does not allow for blocking sends, we can directly
diff --git a/cpukit/score/include/rtems/score/coremsgimpl.h b/cpukit/score/include/rtems/score/coremsgimpl.h
index 1f67969..ab9ea71 100644
--- a/cpukit/score/include/rtems/score/coremsgimpl.h
+++ b/cpukit/score/include/rtems/score/coremsgimpl.h
@@ -172,11 +172,13 @@ void _CORE_message_queue_Close(
* number of messages flushed from the queue is returned.
*
* @param[in] the_message_queue points to the message queue to flush
+ * @param[in] lock_context The lock context of the interrupt disable.
*
* @retval This method returns the number of message pending messages flushed.
*/
uint32_t _CORE_message_queue_Flush(
- CORE_message_queue_Control *the_message_queue
+ CORE_message_queue_Control *the_message_queue,
+ ISR_lock_Context *lock_context
);
#if defined(FUNCTIONALITY_NOT_CURRENTLY_USED_BY_ANY_API)
@@ -215,6 +217,7 @@ uint32_t _CORE_message_queue_Flush(
* a thread that is unblocked is actually a remote thread.
* @param[out] count points to the variable that will contain the
* number of tasks that are sent this message
+ * @param[in] lock_context The lock context of the interrupt disable.
* @retval @a *count will contain the number of messages sent
* @retval indication of the successful completion or reason for failure
*/
@@ -224,7 +227,8 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
size_t size,
Objects_Id id,
CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
- uint32_t *count
+ uint32_t *count,
+ ISR_lock_Context *lock_context
);
/**
@@ -250,6 +254,7 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
* if the message queue is full.
* @param[in] timeout is the maximum number of clock ticks that the calling
* thread is willing to block if the message queue is full.
+ * @param[in] lock_context The lock context of the interrupt disable.
* @retval indication of the successful completion or reason for failure
*/
CORE_message_queue_Status _CORE_message_queue_Submit(
@@ -261,7 +266,8 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
CORE_message_queue_Submit_types submit_type,
bool wait,
- Watchdog_Interval timeout
+ Watchdog_Interval timeout,
+ ISR_lock_Context *lock_context
);
/**
@@ -287,6 +293,7 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
* if the message queue is empty.
* @param[in] timeout is the maximum number of clock ticks that the calling
* thread is willing to block if the message queue is empty.
+ * @param[in] lock_context The lock context of the interrupt disable.
*
* @retval indication of the successful completion or reason for failure.
* On success, the location pointed to @a size_p will contain the
@@ -305,7 +312,8 @@ void _CORE_message_queue_Seize(
void *buffer,
size_t *size_p,
bool wait,
- Watchdog_Interval timeout
+ Watchdog_Interval timeout,
+ ISR_lock_Context *lock_context
);
/**
@@ -338,8 +346,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
size_t size,
Objects_Id id,
CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
- bool wait,
- Watchdog_Interval timeout
+ bool wait,
+ Watchdog_Interval timeout,
+ ISR_lock_Context *lock_context
)
{
return _CORE_message_queue_Submit(
@@ -351,7 +360,8 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
api_message_queue_mp_support,
CORE_MESSAGE_QUEUE_SEND_REQUEST,
wait, /* sender may block */
- timeout /* timeout interval */
+ timeout, /* timeout interval */
+ lock_context
);
}
@@ -364,8 +374,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent(
size_t size,
Objects_Id id,
CORE_message_queue_API_mp_support_callout api_message_queue_mp_support,
- bool wait,
- Watchdog_Interval timeout
+ bool wait,
+ Watchdog_Interval timeout,
+ ISR_lock_Context *lock_context
)
{
return _CORE_message_queue_Submit(
@@ -377,10 +388,46 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent(
api_message_queue_mp_support,
CORE_MESSAGE_QUEUE_URGENT_REQUEST,
wait, /* sender may block */
- timeout /* timeout interval */
+ timeout, /* timeout interval */
+ lock_context
);
}
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Acquire(
+ CORE_message_queue_Control *the_message_queue,
+ ISR_lock_Context *lock_context
+)
+{
+ _Thread_queue_Acquire( &the_message_queue->Wait_queue, lock_context );
+}
+
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Acquire_critical(
+ CORE_message_queue_Control *the_message_queue,
+ ISR_lock_Context *lock_context
+)
+{
+ _Thread_queue_Acquire_critical( &the_message_queue->Wait_queue, lock_context );
+
+ #if defined(RTEMS_MULTIPROCESSING)
+ /*
+ * In case RTEMS_MULTIPROCESSING is enabled, then we have to prevent
+ * deletion of the executing thread after the thread queue operations.
+ */
+ _Thread_Dispatch_disable_critical();
+ #endif
+}
+
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Release(
+ CORE_message_queue_Control *the_message_queue,
+ ISR_lock_Context *lock_context
+)
+{
+ _Thread_queue_Release( &the_message_queue->Wait_queue, lock_context );
+ #if defined(RTEMS_MULTIPROCESSING)
+ _Thread_Dispatch_enable( _Per_CPU_Get() );
+ #endif
+}
+
/**
* This routine copies the contents of the source message buffer
* to the destination message buffer.
@@ -404,7 +451,7 @@ _CORE_message_queue_Allocate_message_buffer (
)
{
return (CORE_message_queue_Buffer_control *)
- _Chain_Get( &the_message_queue->Inactive_messages );
+ _Chain_Get_unprotected( &the_message_queue->Inactive_messages );
}
/**
@@ -416,7 +463,7 @@ RTEMS_INLINE_ROUTINE void _CORE_message_queue_Free_message_buffer (
CORE_message_queue_Buffer_control *the_message
)
{
- _Chain_Append( &the_message_queue->Inactive_messages, &the_message->Node );
+ _Chain_Append_unprotected( &the_message_queue->Inactive_messages, &the_message->Node );
}
/**
@@ -510,6 +557,55 @@ RTEMS_INLINE_ROUTINE bool _CORE_message_queue_Is_priority(
the_message_queue, the_handler, the_argument )
#endif
+RTEMS_INLINE_ROUTINE Thread_Control *_CORE_message_queue_Dequeue_receiver(
+ CORE_message_queue_Control *the_message_queue,
+ const void *buffer,
+ size_t size,
+ CORE_message_queue_Submit_types submit_type,
+ ISR_lock_Context *lock_context
+)
+{
+ Thread_Control *the_thread;
+
+ /*
+ * If there are pending messages, then there can't be threads
+ * waiting for us to send them a message.
+ *
+ * NOTE: This check is critical because threads can block on
+ * send and receive and this ensures that we are broadcasting
+ * the message to threads waiting to receive -- not to send.
+ */
+ if ( the_message_queue->number_of_pending_messages != 0 ) {
+ return NULL;
+ }
+
+ /*
+ * There must be no pending messages if there is a thread waiting to
+ * receive a message.
+ */
+ the_thread = _Thread_queue_First_locked( &the_message_queue->Wait_queue );
+ if ( the_thread == NULL ) {
+ return NULL;
+ }
+
+ *(size_t *) the_thread->Wait.return_argument = size;
+ the_thread->Wait.count = (uint32_t) submit_type;
+
+ _CORE_message_queue_Copy_buffer(
+ buffer,
+ the_thread->Wait.return_argument_second.mutable_object,
+ size
+ );
+
+ _Thread_queue_Extract_critical(
+ &the_message_queue->Wait_queue,
+ the_thread,
+ lock_context
+ );
+
+ return the_thread;
+}
+
/** @} */
#ifdef __cplusplus
diff --git a/cpukit/score/src/coremsgbroadcast.c b/cpukit/score/src/coremsgbroadcast.c
index ff9f3ec..6659170 100644
--- a/cpukit/score/src/coremsgbroadcast.c
+++ b/cpukit/score/src/coremsgbroadcast.c
@@ -19,8 +19,6 @@
#endif
#include <rtems/score/coremsgimpl.h>
-#include <rtems/score/objectimpl.h>
-#include <rtems/score/thread.h>
CORE_message_queue_Status _CORE_message_queue_Broadcast(
CORE_message_queue_Control *the_message_queue,
@@ -33,55 +31,45 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
Objects_Id id __attribute__((unused)),
CORE_message_queue_API_mp_support_callout api_message_queue_mp_support __attribute__((unused)),
#endif
- uint32_t *count
+ uint32_t *count,
+ ISR_lock_Context *lock_context
)
{
- Thread_Control *the_thread;
- uint32_t number_broadcasted;
- Thread_Wait_information *waitp;
+ Thread_Control *the_thread;
+ uint32_t number_broadcasted;
if ( size > the_message_queue->maximum_message_size ) {
+ _ISR_lock_ISR_enable( lock_context );
return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
}
- /*
- * If there are pending messages, then there can't be threads
- * waiting for us to send them a message.
- *
- * NOTE: This check is critical because threads can block on
- * send and receive and this ensures that we are broadcasting
- * the message to threads waiting to receive -- not to send.
- */
+ number_broadcasted = 0;
- if ( the_message_queue->number_of_pending_messages != 0 ) {
- *count = 0;
- return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
- }
+ _CORE_message_queue_Acquire_critical( the_message_queue, lock_context );
- /*
- * There must be no pending messages if there is a thread waiting to
- * receive a message.
- */
- number_broadcasted = 0;
- while ((the_thread =
- _Thread_queue_Dequeue(&the_message_queue->Wait_queue))) {
- waitp = &the_thread->Wait;
+ while (
+ ( the_thread =
+ _CORE_message_queue_Dequeue_receiver(
+ the_message_queue,
+ buffer,
+ size,
+ 0,
+ lock_context
+ )
+ )
+ ) {
number_broadcasted += 1;
- _CORE_message_queue_Copy_buffer(
- buffer,
- waitp->return_argument_second.mutable_object,
- size
- );
+#if defined(RTEMS_MULTIPROCESSING)
+ if ( !_Objects_Is_local_id( the_thread->Object.id ) )
+ (*api_message_queue_mp_support) ( the_thread, id );
+#endif
- *(size_t *) the_thread->Wait.return_argument = size;
+ _CORE_message_queue_Acquire( the_message_queue, lock_context );
+ }
- #if defined(RTEMS_MULTIPROCESSING)
- if ( !_Objects_Is_local_id( the_thread->Object.id ) )
- (*api_message_queue_mp_support) ( the_thread, id );
- #endif
+ _CORE_message_queue_Release( the_message_queue, lock_context );
- }
*count = number_broadcasted;
return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
}
diff --git a/cpukit/score/src/coremsgclose.c b/cpukit/score/src/coremsgclose.c
index 437bf33..6d907c9 100644
--- a/cpukit/score/src/coremsgclose.c
+++ b/cpukit/score/src/coremsgclose.c
@@ -18,11 +18,7 @@
#include "config.h"
#endif
-#include <rtems/system.h>
-#include <rtems/score/chain.h>
-#include <rtems/score/isr.h>
#include <rtems/score/coremsgimpl.h>
-#include <rtems/score/thread.h>
#include <rtems/score/wkspace.h>
void _CORE_message_queue_Close(
@@ -31,6 +27,7 @@ void _CORE_message_queue_Close(
uint32_t status
)
{
+ ISR_lock_Context lock_context;
/*
* This will flush blocked threads whether they were blocked on
@@ -49,7 +46,8 @@ void _CORE_message_queue_Close(
* the flush satisfying any blocked senders as a side-effect.
*/
- (void) _CORE_message_queue_Flush( the_message_queue );
+ _ISR_lock_ISR_disable( &lock_context );
+ (void) _CORE_message_queue_Flush( the_message_queue, &lock_context );
(void) _Workspace_Free( the_message_queue->message_buffers );
diff --git a/cpukit/score/src/coremsgflush.c b/cpukit/score/src/coremsgflush.c
index 05683f0..f67dcf2 100644
--- a/cpukit/score/src/coremsgflush.c
+++ b/cpukit/score/src/coremsgflush.c
@@ -20,13 +20,12 @@
#endif
#include <rtems/score/coremsgimpl.h>
-#include <rtems/score/isr.h>
uint32_t _CORE_message_queue_Flush(
- CORE_message_queue_Control *the_message_queue
+ CORE_message_queue_Control *the_message_queue,
+ ISR_lock_Context *lock_context
)
{
- ISR_Level level;
Chain_Node *inactive_head;
Chain_Node *inactive_first;
Chain_Node *message_queue_first;
@@ -60,7 +59,7 @@ uint32_t _CORE_message_queue_Flush(
* fixed execution time that only deals with pending messages.
*/
- _ISR_Disable( level );
+ _CORE_message_queue_Acquire_critical( the_message_queue, lock_context );
count = the_message_queue->number_of_pending_messages;
if ( count != 0 ) {
@@ -79,6 +78,6 @@ uint32_t _CORE_message_queue_Flush(
_Chain_Initialize_empty( &the_message_queue->Pending_messages );
}
- _ISR_Enable( level );
+ _CORE_message_queue_Release( the_message_queue, lock_context );
return count;
}
diff --git a/cpukit/score/src/coremsginsert.c b/cpukit/score/src/coremsginsert.c
index 28407ba..0a73af8 100644
--- a/cpukit/score/src/coremsginsert.c
+++ b/cpukit/score/src/coremsginsert.c
@@ -19,7 +19,6 @@
#endif
#include <rtems/score/coremsgimpl.h>
-#include <rtems/score/isrlevel.h>
#if defined(RTEMS_SCORE_COREMSG_ENABLE_MESSAGE_PRIORITY)
static bool _CORE_message_queue_Order(
@@ -45,7 +44,6 @@ void _CORE_message_queue_Insert_message(
)
{
Chain_Control *pending_messages;
- ISR_Level level;
#if defined(RTEMS_SCORE_COREMSG_ENABLE_NOTIFICATION)
bool notify;
#endif
@@ -53,8 +51,6 @@ void _CORE_message_queue_Insert_message(
_CORE_message_queue_Set_message_priority( the_message, submit_type );
pending_messages = &the_message_queue->Pending_messages;
- _ISR_Disable( level );
-
#if defined(RTEMS_SCORE_COREMSG_ENABLE_NOTIFICATION)
notify = ( the_message_queue->number_of_pending_messages == 0 );
#endif
@@ -74,8 +70,6 @@ void _CORE_message_queue_Insert_message(
_Chain_Prepend_unprotected( pending_messages, &the_message->Node );
}
- _ISR_Enable( level );
-
#if defined(RTEMS_SCORE_COREMSG_ENABLE_NOTIFICATION)
/*
* According to POSIX, does this happen before or after the message
diff --git a/cpukit/score/src/coremsgseize.c b/cpukit/score/src/coremsgseize.c
index ec6cf8c..0d1c36f 100644
--- a/cpukit/score/src/coremsgseize.c
+++ b/cpukit/score/src/coremsgseize.c
@@ -33,18 +33,17 @@ void _CORE_message_queue_Seize(
void *buffer,
size_t *size_p,
bool wait,
- Watchdog_Interval timeout
+ Watchdog_Interval timeout,
+ ISR_lock_Context *lock_context
)
{
- ISR_lock_Context lock_context;
CORE_message_queue_Buffer_control *the_message;
executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
- _Thread_queue_Acquire( &the_message_queue->Wait_queue, &lock_context );
+ _CORE_message_queue_Acquire_critical( the_message_queue, lock_context );
the_message = _CORE_message_queue_Get_pending_message( the_message_queue );
if ( the_message != NULL ) {
the_message_queue->number_of_pending_messages -= 1;
- _Thread_queue_Release( &the_message_queue->Wait_queue, &lock_context );
*size_p = the_message->Contents.size;
executing->Wait.count =
@@ -61,6 +60,7 @@ void _CORE_message_queue_Seize(
* So return immediately.
*/
_CORE_message_queue_Free_message_buffer(the_message_queue, the_message);
+ _CORE_message_queue_Release( the_message_queue, lock_context );
return;
#else
{
@@ -73,12 +73,15 @@ void _CORE_message_queue_Seize(
* NOTE: If we note that the queue was not full before this receive,
* then we can avoid this dequeue.
*/
- the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
- if ( !the_thread ) {
+ the_thread = _Thread_queue_First_locked(
+ &the_message_queue->Wait_queue
+ );
+ if ( the_thread == NULL ) {
_CORE_message_queue_Free_message_buffer(
the_message_queue,
the_message
);
+ _CORE_message_queue_Release( the_message_queue, lock_context );
return;
}
@@ -103,13 +106,21 @@ void _CORE_message_queue_Seize(
the_message,
_CORE_message_queue_Get_message_priority( the_message )
);
+ _Thread_queue_Extract_critical(
+ &the_message_queue->Wait_queue,
+ the_thread,
+ lock_context
+ );
+ #if defined(RTEMS_MULTIPROCESSING)
+ _Thread_Dispatch_enable( _Per_CPU_Get() );
+ #endif
return;
}
#endif
}
if ( !wait ) {
- _Thread_queue_Release( &the_message_queue->Wait_queue, &lock_context );
+ _CORE_message_queue_Release( the_message_queue, lock_context );
executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED_NOWAIT;
return;
}
@@ -125,6 +136,9 @@ void _CORE_message_queue_Seize(
STATES_WAITING_FOR_MESSAGE,
timeout,
CORE_MESSAGE_QUEUE_STATUS_TIMEOUT,
- &lock_context
+ lock_context
);
+ #if defined(RTEMS_MULTIPROCESSING)
+ _Thread_Dispatch_enable( _Per_CPU_Get() );
+ #endif
}
diff --git a/cpukit/score/src/coremsgsubmit.c b/cpukit/score/src/coremsgsubmit.c
index 0d0965f..90b7eb0 100644
--- a/cpukit/score/src/coremsgsubmit.c
+++ b/cpukit/score/src/coremsgsubmit.c
@@ -38,36 +38,42 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
#endif
CORE_message_queue_Submit_types submit_type,
bool wait,
- Watchdog_Interval timeout
+ Watchdog_Interval timeout,
+ ISR_lock_Context *lock_context
)
{
- CORE_message_queue_Buffer_control *the_message;
- Thread_Control *the_thread;
+ CORE_message_queue_Buffer_control *the_message;
+ Thread_Control *the_thread;
+ #if defined(RTEMS_MULTIPROCESSING)
+ Per_CPU_Control *cpu_self;
+ #endif
if ( size > the_message_queue->maximum_message_size ) {
+ _ISR_lock_ISR_enable( lock_context );
return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
}
+ _CORE_message_queue_Acquire_critical( the_message_queue, lock_context );
+
/*
* Is there a thread currently waiting on this message queue?
*/
- if ( the_message_queue->number_of_pending_messages == 0 ) {
- the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
- if ( the_thread ) {
- _CORE_message_queue_Copy_buffer(
- buffer,
- the_thread->Wait.return_argument_second.mutable_object,
- size
- );
- *(size_t *) the_thread->Wait.return_argument = size;
- the_thread->Wait.count = (uint32_t) submit_type;
-
- #if defined(RTEMS_MULTIPROCESSING)
- if ( !_Objects_Is_local_id( the_thread->Object.id ) )
- (*api_message_queue_mp_support) ( the_thread, id );
- #endif
- return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
- }
+
+ the_thread = _CORE_message_queue_Dequeue_receiver(
+ the_message_queue,
+ buffer,
+ size,
+ submit_type,
+ lock_context
+ );
+ if ( the_thread != NULL ) {
+ #if defined(RTEMS_MULTIPROCESSING)
+ if ( !_Objects_Is_local_id( the_thread->Object.id ) )
+ (*api_message_queue_mp_support) ( the_thread, id );
+
+ _Thread_Enable_dispatch( _Per_CPU_Get() );
+ #endif
+ return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
}
/*
@@ -77,23 +83,25 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
the_message =
_CORE_message_queue_Allocate_message_buffer( the_message_queue );
if ( the_message ) {
+ the_message->Contents.size = size;
+ _CORE_message_queue_Set_message_priority( the_message, submit_type );
_CORE_message_queue_Copy_buffer(
buffer,
the_message->Contents.buffer,
size
);
- the_message->Contents.size = size;
- _CORE_message_queue_Set_message_priority( the_message, submit_type );
_CORE_message_queue_Insert_message(
the_message_queue,
the_message,
submit_type
);
+ _CORE_message_queue_Release( the_message_queue, lock_context );
return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
}
#if !defined(RTEMS_SCORE_COREMSG_ENABLE_BLOCKING_SEND)
+ _CORE_message_queue_Release( the_message_queue, lock_context );
return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
#else
/*
@@ -102,6 +110,7 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
* on the queue.
*/
if ( !wait ) {
+ _CORE_message_queue_Release( the_message_queue, lock_context );
return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
}
@@ -110,6 +119,7 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
* deadly to block in an ISR.
*/
if ( _ISR_Is_in_progress() ) {
+ _CORE_message_queue_Release( the_message_queue, lock_context );
return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED;
}
@@ -119,20 +129,22 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
* it as a variable. Doing this emphasizes how dangerous it
* would be to use this variable prior to here.
*/
- {
- executing->Wait.id = id;
- executing->Wait.return_argument_second.immutable_object = buffer;
- executing->Wait.option = (uint32_t) size;
- executing->Wait.count = submit_type;
-
- _Thread_queue_Enqueue(
- &the_message_queue->Wait_queue,
- executing,
- STATES_WAITING_FOR_MESSAGE,
- timeout,
- CORE_MESSAGE_QUEUE_STATUS_TIMEOUT
- );
- }
+ executing->Wait.id = id;
+ executing->Wait.return_argument_second.immutable_object = buffer;
+ executing->Wait.option = (uint32_t) size;
+ executing->Wait.count = submit_type;
+
+ _Thread_queue_Enqueue_critical(
+ &the_message_queue->Wait_queue,
+ executing,
+ STATES_WAITING_FOR_MESSAGE,
+ timeout,
+ CORE_MESSAGE_QUEUE_STATUS_TIMEOUT,
+ lock_context
+ );
+ #if defined(RTEMS_MULTIPROCESSING)
+ _Thread_Dispatch_enable( _Per_CPU_Get() );
+ #endif
return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED_WAIT;
#endif
diff --git a/doc/user/msg.t b/doc/user/msg.t
index f58d677..4ca5611 100644
--- a/doc/user/msg.t
+++ b/doc/user/msg.t
@@ -50,7 +50,10 @@ wait for a message to arrive at a queue. Also, a task may poll
a queue for the arrival of a message.
The maximum length message which can be sent is set
-on a per message queue basis.
+on a per message queue basis. The message content must be copied in general
+to/from an internal buffer of the message queue or directly to a peer in
+certain cases. This copy operation is performed with interrupts disabled. So
+it is advisible to keep the messages as short as possible.
@subsection Building a Message Queue Attribute Set
--
1.8.4.5
More information about the devel
mailing list