[PATCH 2/5] score: Fix blocking message queue receive

Sebastian Huber sebastian.huber at embedded-brains.de
Tue Aug 31 11:24:22 UTC 2021


In order to ensure FIFO fairness across schedulers, the thread queue
surrender operation must be used to dequeue a thread from the thread
queue.  The thread queue extract operation is intended for timeouts.

Add _Thread_queue_Resume() which may be used to make extracted or
surrendered threads ready again.

Remove the now unused _Thread_queue_Extract_critical() function.

Close #4509.
---
 cpukit/include/rtems/score/coremsgimpl.h | 20 ++++----
 cpukit/include/rtems/score/threadqimpl.h | 59 ++++++------------------
 cpukit/score/src/coremsgseize.c          | 20 ++++----
 cpukit/score/src/threadqenqueue.c        | 45 ++++++++++--------
 4 files changed, 62 insertions(+), 82 deletions(-)

diff --git a/cpukit/include/rtems/score/coremsgimpl.h b/cpukit/include/rtems/score/coremsgimpl.h
index 161cf8f124..7f01769010 100644
--- a/cpukit/include/rtems/score/coremsgimpl.h
+++ b/cpukit/include/rtems/score/coremsgimpl.h
@@ -616,7 +616,8 @@ RTEMS_INLINE_ROUTINE Thread_Control *_CORE_message_queue_Dequeue_receiver(
   Thread_queue_Context            *queue_context
 )
 {
-  Thread_Control *the_thread;
+  Thread_queue_Heads *heads;
+  Thread_Control     *the_thread;
 
   /*
    *  If there are pending messages, then there can't be threads
@@ -634,14 +635,18 @@ RTEMS_INLINE_ROUTINE Thread_Control *_CORE_message_queue_Dequeue_receiver(
    *  There must be no pending messages if there is a thread waiting to
    *  receive a message.
    */
-  the_thread = _Thread_queue_First_locked(
-    &the_message_queue->Wait_queue,
-    the_message_queue->operations
-  );
-  if ( the_thread == NULL ) {
+  heads = the_message_queue->Wait_queue.Queue.heads;
+  if ( heads == NULL ) {
     return NULL;
   }
 
+  the_thread = ( *the_message_queue->operations->surrender )(
+    &the_message_queue->Wait_queue.Queue,
+    heads,
+    NULL,
+    queue_context
+  );
+
    *(size_t *) the_thread->Wait.return_argument = size;
    the_thread->Wait.count = (uint32_t) submit_type;
 
@@ -651,9 +656,8 @@ RTEMS_INLINE_ROUTINE Thread_Control *_CORE_message_queue_Dequeue_receiver(
     size
   );
 
-  _Thread_queue_Extract_critical(
+  _Thread_queue_Resume(
     &the_message_queue->Wait_queue.Queue,
-    the_message_queue->operations,
     the_thread,
     queue_context
   );
diff --git a/cpukit/include/rtems/score/threadqimpl.h b/cpukit/include/rtems/score/threadqimpl.h
index 2465fc4499..0a24d2da3b 100644
--- a/cpukit/include/rtems/score/threadqimpl.h
+++ b/cpukit/include/rtems/score/threadqimpl.h
@@ -975,56 +975,23 @@ void _Thread_queue_Unblock_critical(
 );
 
 /**
- * @brief Extracts the thread from the thread queue and unblocks it.
+ * @brief Resumes the extracted or surrendered thread.
  *
- * The caller must be the owner of the thread queue lock.  This function will
- * release the thread queue lock and restore the default thread lock.  Thread
- * dispatching is disabled before the thread queue lock is released and an
- * unblock is necessary.  Thread dispatching is enabled once the sequence to
- * unblock the thread is complete.  This makes it possible to use the thread
- * queue lock to protect the state of objects embedding the thread queue and
- * directly enter _Thread_queue_Extract_critical() to finalize an operation in
- * case a waiting thread exists.
- *
- * @code
- * #include <rtems/score/threadqimpl.h>
- *
- * typedef struct {
- *   Thread_queue_Control  Queue;
- *   Thread_Control       *owner;
- * } Mutex;
+ * This function makes the thread ready again.  If necessary, the thread is
+ * unblocked and its thread timer removed.
  *
- * void _Mutex_Release( Mutex *mutex )
- * {
- *   Thread_queue_Context  queue_context;
- *   Thread_Control       *first;
- *
- *   _Thread_queue_Context_initialize( &queue_context, NULL );
- *   _Thread_queue_Acquire( &mutex->Queue, queue_context );
- *
- *   first = _Thread_queue_First_locked( &mutex->Queue );
- *   mutex->owner = first;
- *
- *   if ( first != NULL ) {
- *     _Thread_queue_Extract_critical(
- *       &mutex->Queue.Queue,
- *       mutex->Queue.operations,
- *       first,
- *       &queue_context
- *   );
- * }
- * @endcode
+ * The thread shall have been extracted from the thread queue or surrendered by
+ * the thread queue right before the call to this function.  The caller shall
+ * be the owner of the thread queue lock.
  *
- * @param queue The actual thread queue.
- * @param operations The thread queue operations.
- * @param[in, out] the_thread The thread to extract.
- * @param[in, out] queue_context The thread queue context of the lock acquire.
+ * @param queue is the actual thread queue.
+ * @param[in, out] the_thread is the thread to make ready and unblock.
+ * @param[in, out] queue_context is the thread queue context.
  */
-void _Thread_queue_Extract_critical(
-  Thread_queue_Queue            *queue,
-  const Thread_queue_Operations *operations,
-  Thread_Control                *the_thread,
-  Thread_queue_Context          *queue_context
+void _Thread_queue_Resume(
+  Thread_queue_Queue   *queue,
+  Thread_Control       *the_thread,
+  Thread_queue_Context *queue_context
 );
 
 /**
diff --git a/cpukit/score/src/coremsgseize.c b/cpukit/score/src/coremsgseize.c
index e44538ee17..148e9dbb3c 100644
--- a/cpukit/score/src/coremsgseize.c
+++ b/cpukit/score/src/coremsgseize.c
@@ -56,7 +56,8 @@ Status_Control _CORE_message_queue_Seize(
       return STATUS_SUCCESSFUL;
     #else
     {
-      Thread_Control   *the_thread;
+      Thread_queue_Heads *heads;
+      Thread_Control     *the_thread;
 
       /*
        *  There could be a thread waiting to send a message.  If there
@@ -65,11 +66,8 @@ Status_Control _CORE_message_queue_Seize(
        *  NOTE: If we note that the queue was not full before this receive,
        *  then we can avoid this dequeue.
        */
-      the_thread = _Thread_queue_First_locked(
-        &the_message_queue->Wait_queue,
-        the_message_queue->operations
-      );
-      if ( the_thread == NULL ) {
+      heads = the_message_queue->Wait_queue.Queue.heads;
+      if ( heads == NULL ) {
         _CORE_message_queue_Free_message_buffer(
           the_message_queue,
           the_message
@@ -78,6 +76,13 @@ Status_Control _CORE_message_queue_Seize(
         return STATUS_SUCCESSFUL;
       }
 
+      the_thread = ( *the_message_queue->operations->surrender )(
+        &the_message_queue->Wait_queue.Queue,
+        heads,
+        NULL,
+        queue_context
+      );
+
       /*
        *  There was a thread waiting to send a message.  This code
        *  puts the messages in the message queue on behalf of the
@@ -90,9 +95,8 @@ Status_Control _CORE_message_queue_Seize(
         (size_t) the_thread->Wait.option,
         (CORE_message_queue_Submit_types) the_thread->Wait.count
       );
-      _Thread_queue_Extract_critical(
+      _Thread_queue_Resume(
         &the_message_queue->Wait_queue.Queue,
-        the_message_queue->operations,
         the_thread,
         queue_context
       );
diff --git a/cpukit/score/src/threadqenqueue.c b/cpukit/score/src/threadqenqueue.c
index d165e30da7..833d37ee61 100644
--- a/cpukit/score/src/threadqenqueue.c
+++ b/cpukit/score/src/threadqenqueue.c
@@ -7,9 +7,10 @@
  *   _Thread_queue_Deadlock_fatal(), _Thread_queue_Deadlock_status(),
  *   _Thread_queue_Do_dequeue(), _Thread_queue_Enqueue(),
  *   _Thread_queue_Enqueue_do_nothing_extra(), _Thread_queue_Enqueue_sticky(),
- *   _Thread_queue_Extract(), _Thread_queue_Extract_critical(),
- *   _Thread_queue_Extract_locked(), _Thread_queue_Path_acquire_critical(),
- *   _Thread_queue_Path_release_critical(), _Thread_queue_Surrender(),
+ *   _Thread_queue_Extract(), _Thread_queue_Extract_locked(),
+ *   _Thread_queue_Path_acquire_critical(),
+ *   _Thread_queue_Path_release_critical(),
+ *   _Thread_queue_Resume(),_Thread_queue_Surrender(),
  *   _Thread_queue_Surrender_sticky(), and _Thread_queue_Unblock_critical().
  */
 
@@ -604,28 +605,32 @@ void _Thread_queue_Unblock_critical(
   }
 }
 
-void _Thread_queue_Extract_critical(
-  Thread_queue_Queue            *queue,
-  const Thread_queue_Operations *operations,
-  Thread_Control                *the_thread,
-  Thread_queue_Context          *queue_context
+void _Thread_queue_Resume(
+  Thread_queue_Queue   *queue,
+  Thread_Control       *the_thread,
+  Thread_queue_Context *queue_context
 )
 {
   bool unblock;
 
-  unblock = _Thread_queue_Extract_locked(
-    queue,
-    operations,
-    the_thread,
-    queue_context
-  );
+  unblock = _Thread_queue_Make_ready_again( the_thread );
 
-  _Thread_queue_Unblock_critical(
-    unblock,
-    queue,
-    the_thread,
-    &queue_context->Lock_context.Lock_context
-  );
+  if ( unblock ) {
+    Per_CPU_Control *cpu_self;
+
+    cpu_self = _Thread_queue_Dispatch_disable( queue_context );
+    _Thread_queue_Queue_release(
+      queue, &queue_context->Lock_context.Lock_context
+    );
+
+    _Thread_Remove_timer_and_unblock( the_thread, queue );
+
+    _Thread_Dispatch_enable( cpu_self );
+  } else {
+    _Thread_queue_Queue_release(
+      queue, &queue_context->Lock_context.Lock_context
+    );
+  }
 }
 
 void _Thread_queue_Extract( Thread_Control *the_thread )
-- 
2.26.2



More information about the devel mailing list