[PATCH 2/5] score: Fix blocking message queue receive

Gedare Bloom gedare at rtems.org
Wed Sep 1 16:58:38 UTC 2021


On Tue, Aug 31, 2021 at 5:24 AM Sebastian Huber
<sebastian.huber at embedded-brains.de> wrote:
>
> In order to ensure FIFO fairness across schedulers, the thread queue
> surrender operation must be used to dequeue a thread from the thread
> queue.  The thread queue extract operation is intended for timeouts.
>

Please add a note like this to the doxygen for _Thread_queue_Extract().

> Add _Thread_queue_Resume() which may be used to make extracted or
> surrendered threads ready again.
>
> Remove the now unused _Thread_queue_Extract_critical() function.
>
> Close #4509.
> ---
>  cpukit/include/rtems/score/coremsgimpl.h | 20 ++++----
>  cpukit/include/rtems/score/threadqimpl.h | 59 ++++++------------------
>  cpukit/score/src/coremsgseize.c          | 20 ++++----
>  cpukit/score/src/threadqenqueue.c        | 45 ++++++++++--------
>  4 files changed, 62 insertions(+), 82 deletions(-)
>
> diff --git a/cpukit/include/rtems/score/coremsgimpl.h b/cpukit/include/rtems/score/coremsgimpl.h
> index 161cf8f124..7f01769010 100644
> --- a/cpukit/include/rtems/score/coremsgimpl.h
> +++ b/cpukit/include/rtems/score/coremsgimpl.h
> @@ -616,7 +616,8 @@ RTEMS_INLINE_ROUTINE Thread_Control *_CORE_message_queue_Dequeue_receiver(
>    Thread_queue_Context            *queue_context
>  )
>  {
> -  Thread_Control *the_thread;
> +  Thread_queue_Heads *heads;
> +  Thread_Control     *the_thread;
>
>    /*
>     *  If there are pending messages, then there can't be threads
> @@ -634,14 +635,18 @@ RTEMS_INLINE_ROUTINE Thread_Control *_CORE_message_queue_Dequeue_receiver(
>     *  There must be no pending messages if there is a thread waiting to
>     *  receive a message.
>     */
> -  the_thread = _Thread_queue_First_locked(
> -    &the_message_queue->Wait_queue,
> -    the_message_queue->operations
> -  );
> -  if ( the_thread == NULL ) {
> +  heads = the_message_queue->Wait_queue.Queue.heads;
> +  if ( heads == NULL ) {
>      return NULL;
>    }
>
> +  the_thread = ( *the_message_queue->operations->surrender )(
> +    &the_message_queue->Wait_queue.Queue,
> +    heads,
> +    NULL,
> +    queue_context
> +  );
> +
>     *(size_t *) the_thread->Wait.return_argument = size;
>     the_thread->Wait.count = (uint32_t) submit_type;
>
> @@ -651,9 +656,8 @@ RTEMS_INLINE_ROUTINE Thread_Control *_CORE_message_queue_Dequeue_receiver(
>      size
>    );
>
> -  _Thread_queue_Extract_critical(
> +  _Thread_queue_Resume(
>      &the_message_queue->Wait_queue.Queue,
> -    the_message_queue->operations,
>      the_thread,
>      queue_context
>    );
> diff --git a/cpukit/include/rtems/score/threadqimpl.h b/cpukit/include/rtems/score/threadqimpl.h
> index 2465fc4499..0a24d2da3b 100644
> --- a/cpukit/include/rtems/score/threadqimpl.h
> +++ b/cpukit/include/rtems/score/threadqimpl.h
> @@ -975,56 +975,23 @@ void _Thread_queue_Unblock_critical(
>  );
>
>  /**
> - * @brief Extracts the thread from the thread queue and unblocks it.
> + * @brief Resumes the extracted or surrendered thread.
>   *
> - * The caller must be the owner of the thread queue lock.  This function will
> - * release the thread queue lock and restore the default thread lock.  Thread
> - * dispatching is disabled before the thread queue lock is released and an
> - * unblock is necessary.  Thread dispatching is enabled once the sequence to
> - * unblock the thread is complete.  This makes it possible to use the thread
> - * queue lock to protect the state of objects embedding the thread queue and
> - * directly enter _Thread_queue_Extract_critical() to finalize an operation in
> - * case a waiting thread exists.
> - *
> - * @code
> - * #include <rtems/score/threadqimpl.h>
> - *
> - * typedef struct {
> - *   Thread_queue_Control  Queue;
> - *   Thread_Control       *owner;
> - * } Mutex;
> + * This function makes the thread ready again.  If necessary, the thread is
> + * unblocked and its thread timer removed.
>   *
> - * void _Mutex_Release( Mutex *mutex )
> - * {
> - *   Thread_queue_Context  queue_context;
> - *   Thread_Control       *first;
> - *
> - *   _Thread_queue_Context_initialize( &queue_context, NULL );
> - *   _Thread_queue_Acquire( &mutex->Queue, queue_context );
> - *
> - *   first = _Thread_queue_First_locked( &mutex->Queue );
> - *   mutex->owner = first;
> - *
> - *   if ( first != NULL ) {
> - *     _Thread_queue_Extract_critical(
> - *       &mutex->Queue.Queue,
> - *       mutex->Queue.operations,
> - *       first,
> - *       &queue_context
> - *   );
> - * }
> - * @endcode
> + * The thread shall have been extracted from the thread queue or surrendered by
> + * the thread queue right before the call to this function.  The caller shall
> + * be the owner of the thread queue lock.
>   *
> - * @param queue The actual thread queue.
> - * @param operations The thread queue operations.
> - * @param[in, out] the_thread The thread to extract.
> - * @param[in, out] queue_context The thread queue context of the lock acquire.
> + * @param queue is the actual thread queue.
> + * @param[in, out] the_thread is the thread to make ready and unblock.
> + * @param[in, out] queue_context is the thread queue context.
>   */
> -void _Thread_queue_Extract_critical(
> -  Thread_queue_Queue            *queue,
> -  const Thread_queue_Operations *operations,
> -  Thread_Control                *the_thread,
> -  Thread_queue_Context          *queue_context
> +void _Thread_queue_Resume(
> +  Thread_queue_Queue   *queue,
> +  Thread_Control       *the_thread,
> +  Thread_queue_Context *queue_context
>  );
>
>  /**
> diff --git a/cpukit/score/src/coremsgseize.c b/cpukit/score/src/coremsgseize.c
> index e44538ee17..148e9dbb3c 100644
> --- a/cpukit/score/src/coremsgseize.c
> +++ b/cpukit/score/src/coremsgseize.c
> @@ -56,7 +56,8 @@ Status_Control _CORE_message_queue_Seize(
>        return STATUS_SUCCESSFUL;
>      #else
>      {
> -      Thread_Control   *the_thread;
> +      Thread_queue_Heads *heads;
> +      Thread_Control     *the_thread;
>
>        /*
>         *  There could be a thread waiting to send a message.  If there
> @@ -65,11 +66,8 @@ Status_Control _CORE_message_queue_Seize(
>         *  NOTE: If we note that the queue was not full before this receive,
>         *  then we can avoid this dequeue.
>         */
> -      the_thread = _Thread_queue_First_locked(
> -        &the_message_queue->Wait_queue,
> -        the_message_queue->operations
> -      );
> -      if ( the_thread == NULL ) {
> +      heads = the_message_queue->Wait_queue.Queue.heads;
> +      if ( heads == NULL ) {
>          _CORE_message_queue_Free_message_buffer(
>            the_message_queue,
>            the_message
> @@ -78,6 +76,13 @@ Status_Control _CORE_message_queue_Seize(
>          return STATUS_SUCCESSFUL;
>        }
>
> +      the_thread = ( *the_message_queue->operations->surrender )(
> +        &the_message_queue->Wait_queue.Queue,
> +        heads,
> +        NULL,
> +        queue_context
> +      );
> +
>        /*
>         *  There was a thread waiting to send a message.  This code
>         *  puts the messages in the message queue on behalf of the
> @@ -90,9 +95,8 @@ Status_Control _CORE_message_queue_Seize(
>          (size_t) the_thread->Wait.option,
>          (CORE_message_queue_Submit_types) the_thread->Wait.count
>        );
> -      _Thread_queue_Extract_critical(
> +      _Thread_queue_Resume(
>          &the_message_queue->Wait_queue.Queue,
> -        the_message_queue->operations,
>          the_thread,
>          queue_context
>        );
> diff --git a/cpukit/score/src/threadqenqueue.c b/cpukit/score/src/threadqenqueue.c
> index d165e30da7..833d37ee61 100644
> --- a/cpukit/score/src/threadqenqueue.c
> +++ b/cpukit/score/src/threadqenqueue.c
> @@ -7,9 +7,10 @@
>   *   _Thread_queue_Deadlock_fatal(), _Thread_queue_Deadlock_status(),
>   *   _Thread_queue_Do_dequeue(), _Thread_queue_Enqueue(),
>   *   _Thread_queue_Enqueue_do_nothing_extra(), _Thread_queue_Enqueue_sticky(),
> - *   _Thread_queue_Extract(), _Thread_queue_Extract_critical(),
> - *   _Thread_queue_Extract_locked(), _Thread_queue_Path_acquire_critical(),
> - *   _Thread_queue_Path_release_critical(), _Thread_queue_Surrender(),
> + *   _Thread_queue_Extract(), _Thread_queue_Extract_locked(),
> + *   _Thread_queue_Path_acquire_critical(),
> + *   _Thread_queue_Path_release_critical(),
> + *   _Thread_queue_Resume(),_Thread_queue_Surrender(),
>   *   _Thread_queue_Surrender_sticky(), and _Thread_queue_Unblock_critical().
>   */
>
> @@ -604,28 +605,32 @@ void _Thread_queue_Unblock_critical(
>    }
>  }
>
> -void _Thread_queue_Extract_critical(
> -  Thread_queue_Queue            *queue,
> -  const Thread_queue_Operations *operations,
> -  Thread_Control                *the_thread,
> -  Thread_queue_Context          *queue_context
> +void _Thread_queue_Resume(
> +  Thread_queue_Queue   *queue,
> +  Thread_Control       *the_thread,
> +  Thread_queue_Context *queue_context
>  )
>  {
>    bool unblock;
>
> -  unblock = _Thread_queue_Extract_locked(
> -    queue,
> -    operations,
> -    the_thread,
> -    queue_context
> -  );
> +  unblock = _Thread_queue_Make_ready_again( the_thread );
>
> -  _Thread_queue_Unblock_critical(
> -    unblock,
> -    queue,
> -    the_thread,
> -    &queue_context->Lock_context.Lock_context
> -  );
> +  if ( unblock ) {
> +    Per_CPU_Control *cpu_self;
> +
> +    cpu_self = _Thread_queue_Dispatch_disable( queue_context );
> +    _Thread_queue_Queue_release(
> +      queue, &queue_context->Lock_context.Lock_context
> +    );
> +
> +    _Thread_Remove_timer_and_unblock( the_thread, queue );
> +
> +    _Thread_Dispatch_enable( cpu_self );
> +  } else {
> +    _Thread_queue_Queue_release(
> +      queue, &queue_context->Lock_context.Lock_context
> +    );
> +  }
>  }
>
>  void _Thread_queue_Extract( Thread_Control *the_thread )
> --
> 2.26.2
>
> _______________________________________________
> devel mailing list
> devel at rtems.org
> http://lists.rtems.org/mailman/listinfo/devel


More information about the devel mailing list