[PATCH 24/45] score: Fine grained locking for message queues

Sebastian Huber sebastian.huber at embedded-brains.de
Fri May 15 11:41:24 UTC 2015


Aggregate several critical sections into a bigger one.  Sending and
receiving messages is now protected by an ISR lock.  Thread dispatching
is only disabled in case a blocking operation is necessary.  The message
copy procedure is done inside the critical section (interrupts
disabled).  Thus this change may have a negative impact on the interrupt
latency in case very large messages are transferred.

Update #2273.
---
 cpukit/posix/include/rtems/posix/mqueueimpl.h  |  15 ++++
 cpukit/posix/src/mqueuerecvsupp.c              |  15 ++--
 cpukit/posix/src/mqueuesendsupp.c              |  14 +--
 cpukit/rtems/include/rtems/rtems/messageimpl.h |  15 ++++
 cpukit/rtems/src/msgqbroadcast.c               |  12 ++-
 cpukit/rtems/src/msgqflush.c                   |  13 ++-
 cpukit/rtems/src/msgqreceive.c                 |  11 ++-
 cpukit/rtems/src/msgqsend.c                    |  12 ++-
 cpukit/rtems/src/msgqurgent.c                  |  11 ++-
 cpukit/score/include/rtems/score/coremsgimpl.h | 120 ++++++++++++++++++++++---
 cpukit/score/src/coremsgbroadcast.c            |  62 ++++++-------
 cpukit/score/src/coremsgclose.c                |   8 +-
 cpukit/score/src/coremsgflush.c                |   9 +-
 cpukit/score/src/coremsginsert.c               |   6 --
 cpukit/score/src/coremsgseize.c                |  30 +++++--
 cpukit/score/src/coremsgsubmit.c               |  84 +++++++++--------
 doc/user/msg.t                                 |   5 +-
 17 files changed, 305 insertions(+), 137 deletions(-)

diff --git a/cpukit/posix/include/rtems/posix/mqueueimpl.h b/cpukit/posix/include/rtems/posix/mqueueimpl.h
index bfc850d..90269bf 100644
--- a/cpukit/posix/include/rtems/posix/mqueueimpl.h
+++ b/cpukit/posix/include/rtems/posix/mqueueimpl.h
@@ -250,6 +250,21 @@ RTEMS_INLINE_ROUTINE POSIX_Message_queue_Control_fd *_POSIX_Message_queue_Get_fd
     location
   );
 }
+
+RTEMS_INLINE_ROUTINE POSIX_Message_queue_Control_fd *
+_POSIX_Message_queue_Get_fd_interrupt_disable(
+  mqd_t              id,
+  Objects_Locations *location,
+  ISR_lock_Context  *lock_context
+)
+{
+  return (POSIX_Message_queue_Control_fd *) _Objects_Get_isr_disable(
+    &_POSIX_Message_queue_Information_fds,
+    (Objects_Id)id,
+    location,
+    lock_context
+  );
+}
  
 /**
  * @see _POSIX_Name_to_id().
diff --git a/cpukit/posix/src/mqueuerecvsupp.c b/cpukit/posix/src/mqueuerecvsupp.c
index bea19fe..2f9bb2d 100644
--- a/cpukit/posix/src/mqueuerecvsupp.c
+++ b/cpukit/posix/src/mqueuerecvsupp.c
@@ -54,20 +54,25 @@ ssize_t _POSIX_Message_queue_Receive_support(
   size_t                           length_out;
   bool                             do_wait;
   Thread_Control                  *executing;
+  ISR_lock_Context                 lock_context;
 
-  the_mq_fd = _POSIX_Message_queue_Get_fd( mqdes, &location );
+  the_mq_fd = _POSIX_Message_queue_Get_fd_interrupt_disable(
+    mqdes,
+    &location,
+    &lock_context
+  );
   switch ( location ) {
 
     case OBJECTS_LOCAL:
       if ( (the_mq_fd->oflag & O_ACCMODE) == O_WRONLY ) {
-        _Objects_Put( &the_mq_fd->Object );
+        _ISR_lock_ISR_enable( &lock_context );
         rtems_set_errno_and_return_minus_one( EBADF );
       }
 
       the_mq = the_mq_fd->Queue;
 
       if ( msg_len < the_mq->Message_queue.maximum_message_size ) {
-        _Objects_Put( &the_mq_fd->Object );
+        _ISR_lock_ISR_enable( &lock_context );
         rtems_set_errno_and_return_minus_one( EMSGSIZE );
       }
 
@@ -97,10 +102,10 @@ ssize_t _POSIX_Message_queue_Receive_support(
         msg_ptr,
         &length_out,
         do_wait,
-        timeout
+        timeout,
+        &lock_context
       );
 
-      _Objects_Put( &the_mq_fd->Object );
       if (msg_prio) {
         *msg_prio = _POSIX_Message_queue_Priority_from_core(
              executing->Wait.count
diff --git a/cpukit/posix/src/mqueuesendsupp.c b/cpukit/posix/src/mqueuesendsupp.c
index 2d6ddae..d73538a 100644
--- a/cpukit/posix/src/mqueuesendsupp.c
+++ b/cpukit/posix/src/mqueuesendsupp.c
@@ -64,6 +64,7 @@ int _POSIX_Message_queue_Send_support(
   CORE_message_queue_Status       msg_status;
   bool                            do_wait;
   Thread_Control                 *executing;
+  ISR_lock_Context                lock_context;
 
   /*
    * Validate the priority.
@@ -73,12 +74,16 @@ int _POSIX_Message_queue_Send_support(
   if ( msg_prio > MQ_PRIO_MAX )
     rtems_set_errno_and_return_minus_one( EINVAL );
 
-  the_mq_fd = _POSIX_Message_queue_Get_fd( mqdes, &location );
+  the_mq_fd = _POSIX_Message_queue_Get_fd_interrupt_disable(
+    mqdes,
+    &location,
+    &lock_context
+  );
   switch ( location ) {
 
     case OBJECTS_LOCAL:
       if ( (the_mq_fd->oflag & O_ACCMODE) == O_RDONLY ) {
-        _Objects_Put( &the_mq_fd->Object );
+        _ISR_lock_ISR_enable( &lock_context );
         rtems_set_errno_and_return_minus_one( EBADF );
       }
 
@@ -105,11 +110,10 @@ int _POSIX_Message_queue_Send_support(
         NULL,
         _POSIX_Message_queue_Priority_to_core( msg_prio ),
         do_wait,
-        timeout    /* no timeout */
+        timeout,   /* no timeout */
+        &lock_context
       );
 
-      _Objects_Put( &the_mq_fd->Object );
-
       /*
        *  If we had to block, then this is where the task returns
        *  after it wakes up.  The returned status is correct for
diff --git a/cpukit/rtems/include/rtems/rtems/messageimpl.h b/cpukit/rtems/include/rtems/rtems/messageimpl.h
index fa9e573..2399d65 100644
--- a/cpukit/rtems/include/rtems/rtems/messageimpl.h
+++ b/cpukit/rtems/include/rtems/rtems/messageimpl.h
@@ -139,6 +139,21 @@ RTEMS_INLINE_ROUTINE Message_queue_Control *_Message_queue_Get (
      _Objects_Get( &_Message_queue_Information, id, location );
 }
 
+RTEMS_INLINE_ROUTINE Message_queue_Control *
+_Message_queue_Get_interrupt_disable(
+  Objects_Id         id,
+  Objects_Locations *location,
+  ISR_lock_Context  *lock_context
+)
+{
+  return (Message_queue_Control *) _Objects_Get_isr_disable(
+    &_Message_queue_Information,
+    id,
+    location,
+    lock_context
+  );
+}
+
 RTEMS_INLINE_ROUTINE Message_queue_Control *_Message_queue_Allocate( void )
 {
   return (Message_queue_Control *)
diff --git a/cpukit/rtems/src/msgqbroadcast.c b/cpukit/rtems/src/msgqbroadcast.c
index 64ea80e..aabbf3f 100644
--- a/cpukit/rtems/src/msgqbroadcast.c
+++ b/cpukit/rtems/src/msgqbroadcast.c
@@ -40,6 +40,7 @@ rtems_status_code rtems_message_queue_broadcast(
   Message_queue_Control          *the_message_queue;
   Objects_Locations               location;
   CORE_message_queue_Status       core_status;
+  ISR_lock_Context                lock_context;
 
   if ( !buffer )
     return RTEMS_INVALID_ADDRESS;
@@ -47,7 +48,11 @@ rtems_status_code rtems_message_queue_broadcast(
   if ( !count )
     return RTEMS_INVALID_ADDRESS;
 
-  the_message_queue = _Message_queue_Get( id, &location );
+  the_message_queue = _Message_queue_Get_interrupt_disable(
+    id,
+    &location,
+    &lock_context
+  );
   switch ( location ) {
 
     case OBJECTS_LOCAL:
@@ -61,10 +66,9 @@ rtems_status_code rtems_message_queue_broadcast(
                       #else
                         NULL,
                       #endif
-                      count
+                      count,
+                      &lock_context
                     );
-
-      _Objects_Put( &the_message_queue->Object );
       return
         _Message_queue_Translate_core_message_queue_return_code( core_status );
 
diff --git a/cpukit/rtems/src/msgqflush.c b/cpukit/rtems/src/msgqflush.c
index 7ae7ef4..809c243 100644
--- a/cpukit/rtems/src/msgqflush.c
+++ b/cpukit/rtems/src/msgqflush.c
@@ -54,16 +54,23 @@ rtems_status_code rtems_message_queue_flush(
 {
   Message_queue_Control          *the_message_queue;
   Objects_Locations               location;
+  ISR_lock_Context                lock_context;
 
   if ( !count )
     return RTEMS_INVALID_ADDRESS;
 
-  the_message_queue = _Message_queue_Get( id, &location );
+  the_message_queue = _Message_queue_Get_interrupt_disable(
+    id,
+    &location,
+    &lock_context
+  );
   switch ( location ) {
 
     case OBJECTS_LOCAL:
-      *count = _CORE_message_queue_Flush( &the_message_queue->message_queue );
-      _Objects_Put( &the_message_queue->Object );
+      *count = _CORE_message_queue_Flush(
+        &the_message_queue->message_queue,
+        &lock_context
+      );
       return RTEMS_SUCCESSFUL;
 
 #if defined(RTEMS_MULTIPROCESSING)
diff --git a/cpukit/rtems/src/msgqreceive.c b/cpukit/rtems/src/msgqreceive.c
index db09cfe..2b9a4e7 100644
--- a/cpukit/rtems/src/msgqreceive.c
+++ b/cpukit/rtems/src/msgqreceive.c
@@ -42,6 +42,7 @@ rtems_status_code rtems_message_queue_receive(
   Objects_Locations               location;
   bool                            wait;
   Thread_Control                 *executing;
+  ISR_lock_Context                lock_context;
 
   if ( !buffer )
     return RTEMS_INVALID_ADDRESS;
@@ -49,7 +50,11 @@ rtems_status_code rtems_message_queue_receive(
   if ( !size )
     return RTEMS_INVALID_ADDRESS;
 
-  the_message_queue = _Message_queue_Get( id, &location );
+  the_message_queue = _Message_queue_Get_interrupt_disable(
+    id,
+    &location,
+    &lock_context
+  );
   switch ( location ) {
 
     case OBJECTS_LOCAL:
@@ -66,9 +71,9 @@ rtems_status_code rtems_message_queue_receive(
         buffer,
         size,
         wait,
-        timeout
+        timeout,
+        &lock_context
       );
-      _Objects_Put( &the_message_queue->Object );
       return _Message_queue_Translate_core_message_queue_return_code(
         executing->Wait.return_code
       );
diff --git a/cpukit/rtems/src/msgqsend.c b/cpukit/rtems/src/msgqsend.c
index 34b7c29..fb3979e 100644
--- a/cpukit/rtems/src/msgqsend.c
+++ b/cpukit/rtems/src/msgqsend.c
@@ -62,11 +62,16 @@ rtems_status_code rtems_message_queue_send(
   Message_queue_Control           *the_message_queue;
   Objects_Locations                location;
   CORE_message_queue_Status        status;
+  ISR_lock_Context                 lock_context;
 
   if ( !buffer )
     return RTEMS_INVALID_ADDRESS;
 
-  the_message_queue = _Message_queue_Get( id, &location );
+  the_message_queue = _Message_queue_Get_interrupt_disable(
+    id,
+    &location,
+    &lock_context
+  );
   switch ( location ) {
 
     case OBJECTS_LOCAL:
@@ -77,11 +82,10 @@ rtems_status_code rtems_message_queue_send(
         id,
         MESSAGE_QUEUE_MP_HANDLER,
         false,   /* sender does not block */
-        0        /* no timeout */
+        0,       /* no timeout */
+        &lock_context
       );
 
-      _Objects_Put( &the_message_queue->Object );
-
       /*
        *  Since this API does not allow for blocking sends, we can directly
        *  return the returned status.
diff --git a/cpukit/rtems/src/msgqurgent.c b/cpukit/rtems/src/msgqurgent.c
index 85a9d4f..e6ae5ef 100644
--- a/cpukit/rtems/src/msgqurgent.c
+++ b/cpukit/rtems/src/msgqurgent.c
@@ -45,11 +45,16 @@ rtems_status_code rtems_message_queue_urgent(
   Message_queue_Control           *the_message_queue;
   Objects_Locations                location;
   CORE_message_queue_Status        status;
+  ISR_lock_Context                 lock_context;
 
   if ( !buffer )
     return RTEMS_INVALID_ADDRESS;
 
-  the_message_queue = _Message_queue_Get( id, &location );
+  the_message_queue = _Message_queue_Get_interrupt_disable(
+    id,
+    &location,
+    &lock_context
+  );
   switch ( location ) {
 
     case OBJECTS_LOCAL:
@@ -60,9 +65,9 @@ rtems_status_code rtems_message_queue_urgent(
         id,
         MESSAGE_QUEUE_MP_HANDLER,
         false,   /* sender does not block */
-        0        /* no timeout */
+        0,       /* no timeout */
+        &lock_context
       );
-      _Objects_Put( &the_message_queue->Object );
 
       /*
        *  Since this API does not allow for blocking sends, we can directly
diff --git a/cpukit/score/include/rtems/score/coremsgimpl.h b/cpukit/score/include/rtems/score/coremsgimpl.h
index 1f67969..ab9ea71 100644
--- a/cpukit/score/include/rtems/score/coremsgimpl.h
+++ b/cpukit/score/include/rtems/score/coremsgimpl.h
@@ -172,11 +172,13 @@ void _CORE_message_queue_Close(
  *  number of messages flushed from the queue is returned.
  *
  *  @param[in] the_message_queue points to the message queue to flush
+ *  @param[in] lock_context The lock context of the interrupt disable.
  *
  *  @retval This method returns the number of message pending messages flushed.
  */
 uint32_t   _CORE_message_queue_Flush(
-  CORE_message_queue_Control *the_message_queue
+  CORE_message_queue_Control *the_message_queue,
+  ISR_lock_Context           *lock_context
 );
 
 #if defined(FUNCTIONALITY_NOT_CURRENTLY_USED_BY_ANY_API)
@@ -215,6 +217,7 @@ uint32_t   _CORE_message_queue_Flush(
  *         a thread that is unblocked is actually a remote thread.
  *  @param[out] count points to the variable that will contain the
  *         number of tasks that are sent this message
+ *  @param[in] lock_context The lock context of the interrupt disable.
  *  @retval @a *count will contain the number of messages sent
  *  @retval indication of the successful completion or reason for failure
  */
@@ -224,7 +227,8 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
   size_t                                     size,
   Objects_Id                                 id,
   CORE_message_queue_API_mp_support_callout  api_message_queue_mp_support,
-  uint32_t                                  *count
+  uint32_t                                  *count,
+  ISR_lock_Context                          *lock_context
 );
 
 /**
@@ -250,6 +254,7 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
  *         if the message queue is full.
  *  @param[in] timeout is the maximum number of clock ticks that the calling
  *         thread is willing to block if the message queue is full.
+ *  @param[in] lock_context The lock context of the interrupt disable.
  *  @retval indication of the successful completion or reason for failure
  */
 CORE_message_queue_Status _CORE_message_queue_Submit(
@@ -261,7 +266,8 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
   CORE_message_queue_API_mp_support_callout  api_message_queue_mp_support,
   CORE_message_queue_Submit_types            submit_type,
   bool                                       wait,
-  Watchdog_Interval                          timeout
+  Watchdog_Interval                          timeout,
+  ISR_lock_Context                          *lock_context
 );
 
 /**
@@ -287,6 +293,7 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
  *         if the message queue is empty.
  *  @param[in] timeout is the maximum number of clock ticks that the calling
  *         thread is willing to block if the message queue is empty.
+ *  @param[in] lock_context The lock context of the interrupt disable.
  *
  *  @retval indication of the successful completion or reason for failure.
  *          On success, the location pointed to @a size_p will contain the
@@ -305,7 +312,8 @@ void _CORE_message_queue_Seize(
   void                            *buffer,
   size_t                          *size_p,
   bool                             wait,
-  Watchdog_Interval                timeout
+  Watchdog_Interval                timeout,
+  ISR_lock_Context                *lock_context
 );
 
 /**
@@ -338,8 +346,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
   size_t                                     size,
   Objects_Id                                 id,
   CORE_message_queue_API_mp_support_callout  api_message_queue_mp_support,
-  bool                                    wait,
-  Watchdog_Interval                          timeout
+  bool                                       wait,
+  Watchdog_Interval                          timeout,
+  ISR_lock_Context                          *lock_context
 )
 {
   return _CORE_message_queue_Submit(
@@ -351,7 +360,8 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Send(
     api_message_queue_mp_support,
     CORE_MESSAGE_QUEUE_SEND_REQUEST,
     wait,     /* sender may block */
-    timeout   /* timeout interval */
+    timeout,  /* timeout interval */
+    lock_context
   );
 }
 
@@ -364,8 +374,9 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent(
   size_t                                     size,
   Objects_Id                                 id,
   CORE_message_queue_API_mp_support_callout  api_message_queue_mp_support,
-  bool                                    wait,
-  Watchdog_Interval                          timeout
+  bool                                       wait,
+  Watchdog_Interval                          timeout,
+  ISR_lock_Context                          *lock_context
 )
 {
   return _CORE_message_queue_Submit(
@@ -377,10 +388,46 @@ RTEMS_INLINE_ROUTINE CORE_message_queue_Status _CORE_message_queue_Urgent(
     api_message_queue_mp_support,
     CORE_MESSAGE_QUEUE_URGENT_REQUEST,
     wait,     /* sender may block */
-    timeout   /* timeout interval */
+    timeout,  /* timeout interval */
+    lock_context
  );
 }
 
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Acquire(
+  CORE_message_queue_Control *the_message_queue,
+  ISR_lock_Context           *lock_context
+)
+{
+  _Thread_queue_Acquire( &the_message_queue->Wait_queue, lock_context );
+}
+
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Acquire_critical(
+  CORE_message_queue_Control *the_message_queue,
+  ISR_lock_Context           *lock_context
+)
+{
+  _Thread_queue_Acquire_critical( &the_message_queue->Wait_queue, lock_context );
+
+  #if defined(RTEMS_MULTIPROCESSING)
+    /*
+     * In case RTEMS_MULTIPROCESSING is enabled, then we have to prevent
+     * deletion of the executing thread after the thread queue operations.
+     */
+    _Thread_Dispatch_disable_critical();
+  #endif
+}
+
+RTEMS_INLINE_ROUTINE void _CORE_message_queue_Release(
+  CORE_message_queue_Control *the_message_queue,
+  ISR_lock_Context           *lock_context
+)
+{
+  _Thread_queue_Release( &the_message_queue->Wait_queue, lock_context );
+  #if defined(RTEMS_MULTIPROCESSING)
+    _Thread_Dispatch_enable( _Per_CPU_Get() );
+  #endif
+}
+
 /**
  * This routine copies the contents of the source message buffer
  * to the destination message buffer.
@@ -404,7 +451,7 @@ _CORE_message_queue_Allocate_message_buffer (
 )
 {
    return (CORE_message_queue_Buffer_control *)
-     _Chain_Get( &the_message_queue->Inactive_messages );
+     _Chain_Get_unprotected( &the_message_queue->Inactive_messages );
 }
 
 /**
@@ -416,7 +463,7 @@ RTEMS_INLINE_ROUTINE void _CORE_message_queue_Free_message_buffer (
   CORE_message_queue_Buffer_control *the_message
 )
 {
-  _Chain_Append( &the_message_queue->Inactive_messages, &the_message->Node );
+  _Chain_Append_unprotected( &the_message_queue->Inactive_messages, &the_message->Node );
 }
 
 /**
@@ -510,6 +557,55 @@ RTEMS_INLINE_ROUTINE bool _CORE_message_queue_Is_priority(
            the_message_queue, the_handler, the_argument )
 #endif
 
+RTEMS_INLINE_ROUTINE Thread_Control *_CORE_message_queue_Dequeue_receiver(
+  CORE_message_queue_Control      *the_message_queue,
+  const void                      *buffer,
+  size_t                           size,
+  CORE_message_queue_Submit_types  submit_type,
+  ISR_lock_Context                *lock_context
+)
+{
+  Thread_Control *the_thread;
+
+  /*
+   *  If there are pending messages, then there can't be threads
+   *  waiting for us to send them a message.
+   *
+   *  NOTE: This check is critical because threads can block on
+   *        send and receive and this ensures that we are broadcasting
+   *        the message to threads waiting to receive -- not to send.
+   */
+  if ( the_message_queue->number_of_pending_messages != 0 ) {
+    return NULL;
+  }
+
+  /*
+   *  There must be no pending messages if there is a thread waiting to
+   *  receive a message.
+   */
+  the_thread = _Thread_queue_First_locked( &the_message_queue->Wait_queue );
+  if ( the_thread == NULL ) {
+    return NULL;
+  }
+
+   *(size_t *) the_thread->Wait.return_argument = size;
+   the_thread->Wait.count = (uint32_t) submit_type;
+
+  _CORE_message_queue_Copy_buffer(
+    buffer,
+    the_thread->Wait.return_argument_second.mutable_object,
+    size
+  );
+
+  _Thread_queue_Extract_critical(
+    &the_message_queue->Wait_queue,
+    the_thread,
+    lock_context
+  );
+
+  return the_thread;
+}
+
 /** @} */
 
 #ifdef __cplusplus
diff --git a/cpukit/score/src/coremsgbroadcast.c b/cpukit/score/src/coremsgbroadcast.c
index ff9f3ec..6659170 100644
--- a/cpukit/score/src/coremsgbroadcast.c
+++ b/cpukit/score/src/coremsgbroadcast.c
@@ -19,8 +19,6 @@
 #endif
 
 #include <rtems/score/coremsgimpl.h>
-#include <rtems/score/objectimpl.h>
-#include <rtems/score/thread.h>
 
 CORE_message_queue_Status _CORE_message_queue_Broadcast(
   CORE_message_queue_Control                *the_message_queue,
@@ -33,55 +31,45 @@ CORE_message_queue_Status _CORE_message_queue_Broadcast(
     Objects_Id                                 id __attribute__((unused)),
     CORE_message_queue_API_mp_support_callout  api_message_queue_mp_support __attribute__((unused)),
   #endif
-  uint32_t                                  *count
+  uint32_t                                  *count,
+  ISR_lock_Context                          *lock_context
 )
 {
-  Thread_Control          *the_thread;
-  uint32_t                 number_broadcasted;
-  Thread_Wait_information *waitp;
+  Thread_Control             *the_thread;
+  uint32_t                    number_broadcasted;
 
   if ( size > the_message_queue->maximum_message_size ) {
+    _ISR_lock_ISR_enable( lock_context );
     return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
   }
 
-  /*
-   *  If there are pending messages, then there can't be threads
-   *  waiting for us to send them a message.
-   *
-   *  NOTE: This check is critical because threads can block on
-   *        send and receive and this ensures that we are broadcasting
-   *        the message to threads waiting to receive -- not to send.
-   */
+  number_broadcasted = 0;
 
-  if ( the_message_queue->number_of_pending_messages != 0 ) {
-    *count = 0;
-    return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
-  }
+  _CORE_message_queue_Acquire_critical( the_message_queue, lock_context );
 
-  /*
-   *  There must be no pending messages if there is a thread waiting to
-   *  receive a message.
-   */
-  number_broadcasted = 0;
-  while ((the_thread =
-          _Thread_queue_Dequeue(&the_message_queue->Wait_queue))) {
-    waitp = &the_thread->Wait;
+  while (
+    ( the_thread =
+      _CORE_message_queue_Dequeue_receiver(
+        the_message_queue,
+        buffer,
+        size,
+        0,
+        lock_context
+      )
+    )
+  ) {
     number_broadcasted += 1;
 
-    _CORE_message_queue_Copy_buffer(
-      buffer,
-      waitp->return_argument_second.mutable_object,
-      size
-    );
+#if defined(RTEMS_MULTIPROCESSING)
+    if ( !_Objects_Is_local_id( the_thread->Object.id ) )
+      (*api_message_queue_mp_support) ( the_thread, id );
+#endif
 
-    *(size_t *) the_thread->Wait.return_argument = size;
+    _CORE_message_queue_Acquire( the_message_queue, lock_context );
+  }
 
-    #if defined(RTEMS_MULTIPROCESSING)
-      if ( !_Objects_Is_local_id( the_thread->Object.id ) )
-        (*api_message_queue_mp_support) ( the_thread, id );
-    #endif
+  _CORE_message_queue_Release( the_message_queue, lock_context );
 
-  }
   *count = number_broadcasted;
   return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
 }
diff --git a/cpukit/score/src/coremsgclose.c b/cpukit/score/src/coremsgclose.c
index 437bf33..6d907c9 100644
--- a/cpukit/score/src/coremsgclose.c
+++ b/cpukit/score/src/coremsgclose.c
@@ -18,11 +18,7 @@
 #include "config.h"
 #endif
 
-#include <rtems/system.h>
-#include <rtems/score/chain.h>
-#include <rtems/score/isr.h>
 #include <rtems/score/coremsgimpl.h>
-#include <rtems/score/thread.h>
 #include <rtems/score/wkspace.h>
 
 void _CORE_message_queue_Close(
@@ -31,6 +27,7 @@ void _CORE_message_queue_Close(
   uint32_t                    status
 )
 {
+  ISR_lock_Context lock_context;
 
   /*
    *  This will flush blocked threads whether they were blocked on
@@ -49,7 +46,8 @@ void _CORE_message_queue_Close(
    *  the flush satisfying any blocked senders as a side-effect.
    */
 
-  (void) _CORE_message_queue_Flush( the_message_queue );
+  _ISR_lock_ISR_disable( &lock_context );
+  (void) _CORE_message_queue_Flush( the_message_queue, &lock_context );
 
   (void) _Workspace_Free( the_message_queue->message_buffers );
 
diff --git a/cpukit/score/src/coremsgflush.c b/cpukit/score/src/coremsgflush.c
index 05683f0..f67dcf2 100644
--- a/cpukit/score/src/coremsgflush.c
+++ b/cpukit/score/src/coremsgflush.c
@@ -20,13 +20,12 @@
 #endif
 
 #include <rtems/score/coremsgimpl.h>
-#include <rtems/score/isr.h>
 
 uint32_t   _CORE_message_queue_Flush(
-  CORE_message_queue_Control *the_message_queue
+  CORE_message_queue_Control *the_message_queue,
+  ISR_lock_Context           *lock_context
 )
 {
-  ISR_Level   level;
   Chain_Node *inactive_head;
   Chain_Node *inactive_first;
   Chain_Node *message_queue_first;
@@ -60,7 +59,7 @@ uint32_t   _CORE_message_queue_Flush(
    *  fixed execution time that only deals with pending messages.
    */
 
-  _ISR_Disable( level );
+  _CORE_message_queue_Acquire_critical( the_message_queue, lock_context );
 
   count = the_message_queue->number_of_pending_messages;
   if ( count != 0 ) {
@@ -79,6 +78,6 @@ uint32_t   _CORE_message_queue_Flush(
     _Chain_Initialize_empty( &the_message_queue->Pending_messages );
   }
 
-  _ISR_Enable( level );
+  _CORE_message_queue_Release( the_message_queue, lock_context );
   return count;
 }
diff --git a/cpukit/score/src/coremsginsert.c b/cpukit/score/src/coremsginsert.c
index 28407ba..0a73af8 100644
--- a/cpukit/score/src/coremsginsert.c
+++ b/cpukit/score/src/coremsginsert.c
@@ -19,7 +19,6 @@
 #endif
 
 #include <rtems/score/coremsgimpl.h>
-#include <rtems/score/isrlevel.h>
 
 #if defined(RTEMS_SCORE_COREMSG_ENABLE_MESSAGE_PRIORITY)
 static bool _CORE_message_queue_Order(
@@ -45,7 +44,6 @@ void _CORE_message_queue_Insert_message(
 )
 {
   Chain_Control *pending_messages;
-  ISR_Level      level;
 #if defined(RTEMS_SCORE_COREMSG_ENABLE_NOTIFICATION)
   bool           notify;
 #endif
@@ -53,8 +51,6 @@ void _CORE_message_queue_Insert_message(
   _CORE_message_queue_Set_message_priority( the_message, submit_type );
   pending_messages = &the_message_queue->Pending_messages;
 
-  _ISR_Disable( level );
-
 #if defined(RTEMS_SCORE_COREMSG_ENABLE_NOTIFICATION)
   notify = ( the_message_queue->number_of_pending_messages == 0 );
 #endif
@@ -74,8 +70,6 @@ void _CORE_message_queue_Insert_message(
     _Chain_Prepend_unprotected( pending_messages, &the_message->Node );
   }
 
-  _ISR_Enable( level );
-
   #if defined(RTEMS_SCORE_COREMSG_ENABLE_NOTIFICATION)
     /*
      *  According to POSIX, does this happen before or after the message
diff --git a/cpukit/score/src/coremsgseize.c b/cpukit/score/src/coremsgseize.c
index ec6cf8c..0d1c36f 100644
--- a/cpukit/score/src/coremsgseize.c
+++ b/cpukit/score/src/coremsgseize.c
@@ -33,18 +33,17 @@ void _CORE_message_queue_Seize(
   void                            *buffer,
   size_t                          *size_p,
   bool                             wait,
-  Watchdog_Interval                timeout
+  Watchdog_Interval                timeout,
+  ISR_lock_Context                *lock_context
 )
 {
-  ISR_lock_Context                   lock_context;
   CORE_message_queue_Buffer_control *the_message;
 
   executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
-  _Thread_queue_Acquire( &the_message_queue->Wait_queue, &lock_context );
+  _CORE_message_queue_Acquire_critical( the_message_queue, lock_context );
   the_message = _CORE_message_queue_Get_pending_message( the_message_queue );
   if ( the_message != NULL ) {
     the_message_queue->number_of_pending_messages -= 1;
-    _Thread_queue_Release( &the_message_queue->Wait_queue, &lock_context );
 
     *size_p = the_message->Contents.size;
     executing->Wait.count =
@@ -61,6 +60,7 @@ void _CORE_message_queue_Seize(
        *  So return immediately.
        */
       _CORE_message_queue_Free_message_buffer(the_message_queue, the_message);
+      _CORE_message_queue_Release( the_message_queue, lock_context );
       return;
     #else
     {
@@ -73,12 +73,15 @@ void _CORE_message_queue_Seize(
        *  NOTE: If we note that the queue was not full before this receive,
        *  then we can avoid this dequeue.
        */
-      the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
-      if ( !the_thread ) {
+      the_thread = _Thread_queue_First_locked(
+        &the_message_queue->Wait_queue
+      );
+      if ( the_thread == NULL ) {
         _CORE_message_queue_Free_message_buffer(
           the_message_queue,
           the_message
         );
+        _CORE_message_queue_Release( the_message_queue, lock_context );
         return;
       }
 
@@ -103,13 +106,21 @@ void _CORE_message_queue_Seize(
          the_message,
          _CORE_message_queue_Get_message_priority( the_message )
       );
+      _Thread_queue_Extract_critical(
+        &the_message_queue->Wait_queue,
+        the_thread,
+        lock_context
+      );
+      #if defined(RTEMS_MULTIPROCESSING)
+        _Thread_Dispatch_enable( _Per_CPU_Get() );
+      #endif
       return;
     }
     #endif
   }
 
   if ( !wait ) {
-    _Thread_queue_Release( &the_message_queue->Wait_queue, &lock_context );
+    _CORE_message_queue_Release( the_message_queue, lock_context );
     executing->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED_NOWAIT;
     return;
   }
@@ -125,6 +136,9 @@ void _CORE_message_queue_Seize(
     STATES_WAITING_FOR_MESSAGE,
     timeout,
     CORE_MESSAGE_QUEUE_STATUS_TIMEOUT,
-    &lock_context
+    lock_context
   );
+  #if defined(RTEMS_MULTIPROCESSING)
+    _Thread_Dispatch_enable( _Per_CPU_Get() );
+  #endif
 }
diff --git a/cpukit/score/src/coremsgsubmit.c b/cpukit/score/src/coremsgsubmit.c
index 0d0965f..90b7eb0 100644
--- a/cpukit/score/src/coremsgsubmit.c
+++ b/cpukit/score/src/coremsgsubmit.c
@@ -38,36 +38,42 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
   #endif
   CORE_message_queue_Submit_types            submit_type,
   bool                                       wait,
-  Watchdog_Interval                          timeout
+  Watchdog_Interval                          timeout,
+  ISR_lock_Context                          *lock_context
 )
 {
-  CORE_message_queue_Buffer_control   *the_message;
-  Thread_Control                      *the_thread;
+  CORE_message_queue_Buffer_control *the_message;
+  Thread_Control                    *the_thread;
+  #if defined(RTEMS_MULTIPROCESSING)
+    Per_CPU_Control                 *cpu_self;
+  #endif
 
   if ( size > the_message_queue->maximum_message_size ) {
+    _ISR_lock_ISR_enable( lock_context );
     return CORE_MESSAGE_QUEUE_STATUS_INVALID_SIZE;
   }
 
+  _CORE_message_queue_Acquire_critical( the_message_queue, lock_context );
+
   /*
    *  Is there a thread currently waiting on this message queue?
    */
-  if ( the_message_queue->number_of_pending_messages == 0 ) {
-    the_thread = _Thread_queue_Dequeue( &the_message_queue->Wait_queue );
-    if ( the_thread ) {
-      _CORE_message_queue_Copy_buffer(
-        buffer,
-        the_thread->Wait.return_argument_second.mutable_object,
-        size
-      );
-      *(size_t *) the_thread->Wait.return_argument = size;
-      the_thread->Wait.count = (uint32_t) submit_type;
-
-      #if defined(RTEMS_MULTIPROCESSING)
-        if ( !_Objects_Is_local_id( the_thread->Object.id ) )
-          (*api_message_queue_mp_support) ( the_thread, id );
-      #endif
-      return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
-    }
+
+  the_thread = _CORE_message_queue_Dequeue_receiver(
+    the_message_queue,
+    buffer,
+    size,
+    submit_type,
+    lock_context
+  );
+  if ( the_thread != NULL ) {
+    #if defined(RTEMS_MULTIPROCESSING)
+      if ( !_Objects_Is_local_id( the_thread->Object.id ) )
+        (*api_message_queue_mp_support) ( the_thread, id );
+
+      _Thread_Enable_dispatch( _Per_CPU_Get() );
+    #endif
+    return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
   }
 
   /*
@@ -77,23 +83,25 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
   the_message =
       _CORE_message_queue_Allocate_message_buffer( the_message_queue );
   if ( the_message ) {
+    the_message->Contents.size = size;
+    _CORE_message_queue_Set_message_priority( the_message, submit_type );
     _CORE_message_queue_Copy_buffer(
       buffer,
       the_message->Contents.buffer,
       size
     );
-    the_message->Contents.size = size;
-    _CORE_message_queue_Set_message_priority( the_message, submit_type );
 
     _CORE_message_queue_Insert_message(
        the_message_queue,
        the_message,
        submit_type
     );
+    _CORE_message_queue_Release( the_message_queue, lock_context );
     return CORE_MESSAGE_QUEUE_STATUS_SUCCESSFUL;
   }
 
   #if !defined(RTEMS_SCORE_COREMSG_ENABLE_BLOCKING_SEND)
+    _CORE_message_queue_Release( the_message_queue, lock_context );
     return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
   #else
     /*
@@ -102,6 +110,7 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
      *  on the queue.
      */
     if ( !wait ) {
+      _CORE_message_queue_Release( the_message_queue, lock_context );
       return CORE_MESSAGE_QUEUE_STATUS_TOO_MANY;
     }
 
@@ -110,6 +119,7 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
      *  deadly to block in an ISR.
      */
     if ( _ISR_Is_in_progress() ) {
+      _CORE_message_queue_Release( the_message_queue, lock_context );
       return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED;
     }
 
@@ -119,20 +129,22 @@ CORE_message_queue_Status _CORE_message_queue_Submit(
      *  it as a variable.  Doing this emphasizes how dangerous it
      *  would be to use this variable prior to here.
      */
-    {
-      executing->Wait.id = id;
-      executing->Wait.return_argument_second.immutable_object = buffer;
-      executing->Wait.option = (uint32_t) size;
-      executing->Wait.count = submit_type;
-
-      _Thread_queue_Enqueue(
-        &the_message_queue->Wait_queue,
-        executing,
-        STATES_WAITING_FOR_MESSAGE,
-        timeout,
-        CORE_MESSAGE_QUEUE_STATUS_TIMEOUT
-      );
-    }
+    executing->Wait.id = id;
+    executing->Wait.return_argument_second.immutable_object = buffer;
+    executing->Wait.option = (uint32_t) size;
+    executing->Wait.count = submit_type;
+
+    _Thread_queue_Enqueue_critical(
+      &the_message_queue->Wait_queue,
+      executing,
+      STATES_WAITING_FOR_MESSAGE,
+      timeout,
+      CORE_MESSAGE_QUEUE_STATUS_TIMEOUT,
+      lock_context
+    );
+    #if defined(RTEMS_MULTIPROCESSING)
+      _Thread_Dispatch_enable( _Per_CPU_Get() );
+    #endif
 
     return CORE_MESSAGE_QUEUE_STATUS_UNSATISFIED_WAIT;
   #endif
diff --git a/doc/user/msg.t b/doc/user/msg.t
index f58d677..4ca5611 100644
--- a/doc/user/msg.t
+++ b/doc/user/msg.t
@@ -50,7 +50,10 @@ wait for a message to arrive at a queue.  Also, a task may poll
 a queue for the arrival of a message.
 
 The maximum length message which can be sent is set
-on a per message queue basis.
+on a per message queue basis.  The message content must be copied in general
+to/from an internal buffer of the message queue or directly to a peer in
+certain cases.  This copy operation is performed with interrupts disabled.  So
+it is advisible to keep the messages as short as possible.
 
 @subsection Building a Message Queue Attribute Set
 
-- 
1.8.4.5




More information about the devel mailing list