[PATCH 13/20] score: Introduce _Thread_queue_Flush_critical()

Gedare Bloom gedare at rtems.org
Tue Apr 19 15:54:06 UTC 2016


On Tue, Apr 19, 2016 at 9:12 AM, Sebastian Huber
<sebastian.huber at embedded-brains.de> wrote:
> Replace _Thread_queue_Flush() with _Thread_queue_Flush_critical() and
> add a filter function for customization of the thread queue flush
> operation.
>
> Update #2555.
> ---
>  cpukit/rtems/src/semdelete.c                       |  2 +-
>  cpukit/rtems/src/semflush.c                        |  2 +-
>  cpukit/score/include/rtems/score/corebarrierimpl.h | 28 +++++--
>  cpukit/score/include/rtems/score/coremuteximpl.h   | 36 +++++++--
>  cpukit/score/include/rtems/score/coresemimpl.h     | 46 ++++++++---
>  cpukit/score/include/rtems/score/threadqimpl.h     | 59 ++++++++++----
>  cpukit/score/src/corebarrier.c                     | 12 ++-
>  cpukit/score/src/coremsgclose.c                    | 21 ++++-
>  cpukit/score/src/coremsgflush.c                    |  2 +-
>  cpukit/score/src/coremutex.c                       | 22 ++++++
>  cpukit/score/src/coresem.c                         | 22 ++++++
>  cpukit/score/src/threadqflush.c                    | 90 +++++++++++++++-------
>  12 files changed, 266 insertions(+), 76 deletions(-)
>
> diff --git a/cpukit/rtems/src/semdelete.c b/cpukit/rtems/src/semdelete.c
> index f503cd9..48a9055 100644
> --- a/cpukit/rtems/src/semdelete.c
> +++ b/cpukit/rtems/src/semdelete.c
> @@ -92,7 +92,7 @@ rtems_status_code rtems_semaphore_delete(
>        if ( !_Attributes_Is_counting_semaphore( attribute_set ) ) {
>          _CORE_mutex_Flush(
>            &the_semaphore->Core_control.mutex,
> -          CORE_MUTEX_WAS_DELETED,
> +          _CORE_mutex_Was_deleted,
>            _Semaphore_MP_Send_object_was_deleted,
>            id
>          );
> diff --git a/cpukit/rtems/src/semflush.c b/cpukit/rtems/src/semflush.c
> index 64386b0..01c5c0d 100644
> --- a/cpukit/rtems/src/semflush.c
> +++ b/cpukit/rtems/src/semflush.c
> @@ -53,7 +53,7 @@ rtems_status_code rtems_semaphore_flush(
>        if ( !_Attributes_Is_counting_semaphore( attribute_set ) ) {
>          _CORE_mutex_Flush(
>            &the_semaphore->Core_control.mutex,
> -          CORE_MUTEX_STATUS_UNSATISFIED_NOWAIT,
> +          _CORE_mutex_Unsatisfied_nowait,
>            _Semaphore_MP_Send_object_was_deleted,
>            id
>          );
> diff --git a/cpukit/score/include/rtems/score/corebarrierimpl.h b/cpukit/score/include/rtems/score/corebarrierimpl.h
> index 1d77405..8e923df 100644
> --- a/cpukit/score/include/rtems/score/corebarrierimpl.h
> +++ b/cpukit/score/include/rtems/score/corebarrierimpl.h
> @@ -193,19 +193,33 @@ uint32_t _CORE_barrier_Do_release(
>      )
>  #endif
>
> +Thread_Control *_CORE_barrier_Was_deleted(
> +  Thread_Control       *the_thread,
> +  Thread_queue_Control *the_thread_queue,
> +  ISR_lock_Context     *lock_context
> +);
> +
>  /* Must be a macro due to the multiprocessing dependent parameters */
>  #define _CORE_barrier_Flush( \
>    the_barrier, \
>    mp_callout, \
>    mp_id \
>  ) \
> -  _Thread_queue_Flush( \
> -    &( the_barrier )->Wait_queue, \
> -    CORE_BARRIER_TQ_OPERATIONS, \
> -    CORE_BARRIER_WAS_DELETED, \
> -    mp_callout, \
> -    mp_id \
> -  )
> +  do { \
> +    ISR_lock_Context _core_barrier_flush_lock_context; \
> +    _Thread_queue_Acquire( \
> +      &( the_barrier )->Wait_queue, \
> +      &_core_barrier_flush_lock_context \
> +    ); \
> +    _Thread_queue_Flush_critical( \
> +      &( the_barrier )->Wait_queue, \
> +      CORE_BARRIER_TQ_OPERATIONS, \
> +      _CORE_barrier_Was_deleted, \
> +      mp_callout, \
> +      mp_id, \
> +      &_core_barrier_flush_lock_context \
> +    ); \
> +  } while ( 0 )
>
>  /**
>   * This function returns true if the automatic release attribute is
> diff --git a/cpukit/score/include/rtems/score/coremuteximpl.h b/cpukit/score/include/rtems/score/coremuteximpl.h
> index eae6ef1..0f79923 100644
> --- a/cpukit/score/include/rtems/score/coremuteximpl.h
> +++ b/cpukit/score/include/rtems/score/coremuteximpl.h
> @@ -338,20 +338,40 @@ CORE_mutex_Status _CORE_mutex_Do_surrender(
>      )
>  #endif
>
> +Thread_Control *_CORE_mutex_Was_deleted(
> +  Thread_Control       *the_thread,
> +  Thread_queue_Control *the_thread_queue,
> +  ISR_lock_Context     *lock_context
> +);
> +
> +Thread_Control *_CORE_mutex_Unsatisfied_nowait(
> +  Thread_Control       *the_thread,
> +  Thread_queue_Control *the_thread_queue,
> +  ISR_lock_Context     *lock_context
> +);
> +
>  /* Must be a macro due to the multiprocessing dependent parameters */
>  #define _CORE_mutex_Flush( \
>    the_mutex, \
> -  status, \
> +  filter, \
>    mp_callout, \
>    mp_id \
>  ) \
> -  _Thread_queue_Flush( \
> -    &( the_mutex )->Wait_queue, \
> -    ( the_mutex )->operations, \
> -    status, \
> -    mp_callout, \
> -    mp_id \
> -  )
> +  do { \
> +    ISR_lock_Context _core_mutex_flush_lock_context; \
> +    _Thread_queue_Acquire( \
> +      &( the_mutex )->Wait_queue, \
> +      &_core_mutex_flush_lock_context \
> +    ); \
> +    _Thread_queue_Flush_critical( \
> +      &( the_mutex )->Wait_queue, \
> +      ( the_mutex )->operations, \
> +      filter, \
> +      mp_callout, \
> +      mp_id, \
> +      &_core_mutex_flush_lock_context \
> +    ); \
> +  } while ( 0 )
>
>  /**
>   * @brief Is mutex locked.
> diff --git a/cpukit/score/include/rtems/score/coresemimpl.h b/cpukit/score/include/rtems/score/coresemimpl.h
> index d68c3d7..a341f28 100644
> --- a/cpukit/score/include/rtems/score/coresemimpl.h
> +++ b/cpukit/score/include/rtems/score/coresemimpl.h
> @@ -86,19 +86,36 @@ void _CORE_semaphore_Initialize(
>    uint32_t                    initial_value
>  );
>
> +Thread_Control *_CORE_semaphore_Was_deleted(
> +  Thread_Control       *the_thread,
> +  Thread_queue_Control *the_thread_queue,
> +  ISR_lock_Context     *lock_context
> +);
> +
> +Thread_Control *_CORE_semaphore_Unsatisfied_nowait(
> +  Thread_Control       *the_thread,
> +  Thread_queue_Control *the_thread_queue,
> +  ISR_lock_Context     *lock_context
> +);
> +
>  #define _CORE_semaphore_Destroy( \
>    the_semaphore, \
>    mp_callout, \
>    mp_id \
>  ) \
>    do { \
> -    _Thread_queue_Flush( \
> +    ISR_lock_Context _core_semaphore_destroy_lock_context; \
> +    _Thread_queue_Acquire( \
> +      &( the_semaphore )->Wait_queue, \
> +      &_core_semaphore_destroy_lock_context \
> +    ); \
> +    _Thread_queue_Flush_critical( \
>        &( the_semaphore )->Wait_queue, \
>        ( the_semaphore )->operations, \
> -      CORE_SEMAPHORE_WAS_DELETED, \
> +      _CORE_semaphore_Was_deleted, \
>        mp_callout, \
>        mp_id, \
> -      &_core_semaphore_flush_lock_context \
> +      &_core_semaphore_destroy_lock_context \
>      ); \
>      _Thread_queue_Destroy( &( the_semaphore )->Wait_queue ); \
>    } while ( 0 )
> @@ -190,17 +207,24 @@ RTEMS_INLINE_ROUTINE CORE_semaphore_Status _CORE_semaphore_Do_surrender(
>  /* Must be a macro due to the multiprocessing dependent parameters */
>  #define _CORE_semaphore_Flush( \
>    the_semaphore, \
> -  CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT, \
>    mp_callout, \
>    mp_id \
>  ) \
> -  _Thread_queue_Flush( \
> -    &( the_semaphore )->Wait_queue, \
> -    ( the_semaphore )->operations, \
> -    CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT, \
> -    mp_callout, \
> -    mp_id \
> -  )
> +  do { \
> +    ISR_lock_Context _core_semaphore_flush_lock_context; \
> +    _Thread_queue_Acquire( \
> +      &( the_semaphore )->Wait_queue, \
> +      &_core_semaphore_flush_lock_context \
> +    ); \
> +    _Thread_queue_Flush_critical( \
> +      &( the_semaphore )->Wait_queue, \
> +      ( the_semaphore )->operations, \
> +      _CORE_semaphore_Unsatisfied_nowait, \
> +      mp_callout, \
> +      mp_id, \
> +      &_core_semaphore_flush_lock_context \
> +    ); \
> +  } while ( 0 )
>
>  /**
>   * This routine returns the current count associated with the semaphore.
> diff --git a/cpukit/score/include/rtems/score/threadqimpl.h b/cpukit/score/include/rtems/score/threadqimpl.h
> index d56be79..3462b41 100644
> --- a/cpukit/score/include/rtems/score/threadqimpl.h
> +++ b/cpukit/score/include/rtems/score/threadqimpl.h
> @@ -590,15 +590,37 @@ Thread_Control *_Thread_queue_First(
>    const Thread_queue_Operations *operations
>  );
>
> -void _Thread_queue_Do_flush(
> +/**
> + * @brief Thread queue flush filter function.
> + *
> + * @param the_thread The thread to extract.  This is the first parameter to
> + *   optimize for architectures that use the same register for the first
> + *   parameter and the return value.
> + * @param the_thread_queue The thread queue.
> + * @param lock_context The lock context of the lock acquire.  May be used to
> + *   pass additional data to the filter function via an overlay structure.  The
> + *   filter function should not release or acquire the thread queue lock.
> + *
> + * @retval the_thread Extract this thread.
> + * @retval NULL Do not extract this thread and stop the thread queue flush
> + *   operation.  Threads that are already extracted will complete the flush
> + *   operation.
> + */
Can you please add a bit more description about the intent of the
flush filter? It was not clear to me until I looked at the example how
the barrier is using filter to set a flag in the TCB of each flushed
thread. Filter is an unfortunately overloaded term.

> +typedef Thread_Control *( *Thread_queue_Flush_filter )(
> +  Thread_Control       *the_thread,
> +  Thread_queue_Control *the_thread_queue,
> +  ISR_lock_Context     *lock_context
> +);
> +
> +size_t _Thread_queue_Do_flush_critical(
>    Thread_queue_Control          *the_thread_queue,
>    const Thread_queue_Operations *operations,
> -  uint32_t                       status
> +  Thread_queue_Flush_filter      filter,
>  #if defined(RTEMS_MULTIPROCESSING)
> -  ,
>    Thread_queue_MP_callout        mp_callout,
> -  Objects_Id                     mp_id
> +  Objects_Id                     mp_id,
>  #endif
> +  ISR_lock_Context              *lock_context
>  );
>
>  /**
> @@ -608,39 +630,44 @@ void _Thread_queue_Do_flush(
>   *
>   * @param the_thread_queue The thread queue.
>   * @param operations The thread queue operations.
> - * @param status The return status for the threads.
> + * @param filter The filter functions called for each thread to extract from
> + *   the thread queue.
>   * @param mp_callout Callout to extract the proxy of a remote thread.  This
>   *   parameter is only used on multiprocessing configurations.
>   * @param mp_id Object identifier of the object containing the thread queue.
>   *   This parameter is only used on multiprocessing configurations.
>   */
>  #if defined(RTEMS_MULTIPROCESSING)
> -  #define _Thread_queue_Flush( \
> +  #define _Thread_queue_Flush_critical( \
>      the_thread_queue, \
>      operations, \
> -    status, \
> +    filter, \
>      mp_callout, \
> -    mp_id \
> +    mp_id, \
> +    lock_context \
>    ) \
> -    _Thread_queue_Do_flush( \
> +    _Thread_queue_Do_flush_critical( \
>        the_thread_queue, \
>        operations, \
> -      status, \
> +      filter, \
>        mp_callout, \
> -      mp_id \
> +      mp_id, \
> +      lock_context \
>      )
>  #else
> -  #define _Thread_queue_Flush( \
> +  #define _Thread_queue_Flush_critical( \
>      the_thread_queue, \
>      operations, \
> -    status, \
> +    filter, \
>      mp_callout, \
> -    mp_id \
> +    mp_id, \
> +    lock_context \
>    ) \
> -    _Thread_queue_Do_flush( \
> +    _Thread_queue_Do_flush_critical( \
>        the_thread_queue, \
>        operations, \
> -      status \
> +      filter, \
> +      lock_context \
>      )
>  #endif
>
> diff --git a/cpukit/score/src/corebarrier.c b/cpukit/score/src/corebarrier.c
> index 5313a0f..555e3c9 100644
> --- a/cpukit/score/src/corebarrier.c
> +++ b/cpukit/score/src/corebarrier.c
> @@ -19,7 +19,6 @@
>  #endif
>
>  #include <rtems/score/corebarrierimpl.h>
> -#include <rtems/score/threadqimpl.h>
>
>  void _CORE_barrier_Initialize(
>    CORE_barrier_Control       *the_barrier,
> @@ -32,3 +31,14 @@ void _CORE_barrier_Initialize(
>
>    _Thread_queue_Initialize( &the_barrier->Wait_queue );
>  }
> +
> +Thread_Control *_CORE_barrier_Was_deleted(
> +  Thread_Control       *the_thread,
> +  Thread_queue_Control *the_thread_queue,
> +  ISR_lock_Context     *lock_context
> +)
> +{
> +  the_thread->Wait.return_code = CORE_BARRIER_WAS_DELETED;
> +
> +  return the_thread;
> +}
> diff --git a/cpukit/score/src/coremsgclose.c b/cpukit/score/src/coremsgclose.c
> index 1511f83..acdc9d7 100644
> --- a/cpukit/score/src/coremsgclose.c
> +++ b/cpukit/score/src/coremsgclose.c
> @@ -21,6 +21,17 @@
>  #include <rtems/score/coremsgimpl.h>
>  #include <rtems/score/wkspace.h>
>
> +static Thread_Control *_CORE_message_queue_Was_deleted(
> +  Thread_Control       *the_thread,
> +  Thread_queue_Control *the_thread_queue,
> +  ISR_lock_Context     *lock_context
> +)
> +{
> +  the_thread->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED;
> +
> +  return the_thread;
> +}
> +
>  void _CORE_message_queue_Do_close(
>    CORE_message_queue_Control *the_message_queue
>  #if defined(RTEMS_MULTIPROCESSING)
> @@ -30,17 +41,21 @@ void _CORE_message_queue_Do_close(
>  #endif
>  )
>  {
> +  ISR_lock_Context lock_context;
> +
>    /*
>     *  This will flush blocked threads whether they were blocked on
>     *  a send or receive.
>     */
>
> -  _Thread_queue_Flush(
> +  _CORE_message_queue_Acquire( the_message_queue, &lock_context );
> +  _Thread_queue_Flush_critical(
>      &the_message_queue->Wait_queue,
>      the_message_queue->operations,
> -    CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED,
> +    _CORE_message_queue_Was_deleted,
>      mp_callout,
> -    mp_id
> +    mp_id,
> +    &lock_context
>    );
>
>    (void) _Workspace_Free( the_message_queue->message_buffers );
> diff --git a/cpukit/score/src/coremsgflush.c b/cpukit/score/src/coremsgflush.c
> index f67dcf2..38f26b7 100644
> --- a/cpukit/score/src/coremsgflush.c
> +++ b/cpukit/score/src/coremsgflush.c
> @@ -41,7 +41,7 @@ uint32_t   _CORE_message_queue_Flush(
>     *
>     *  (1) The thread queue of pending senders is a logical extension
>     *  of the pending message queue.  In this case, it should be
> -   *  flushed using the _Thread_queue_Flush() service with a status
> +   *  flushed using the _Thread_queue_Flush_critical() service with a status
>     *  such as CORE_MESSAGE_QUEUE_SENDER_FLUSHED (which currently does
>     *  not exist).  This can be implemented without changing the "big-O"
>     *  of the message flushing part of the routine.
> diff --git a/cpukit/score/src/coremutex.c b/cpukit/score/src/coremutex.c
> index 88d487c..0b07c89 100644
> --- a/cpukit/score/src/coremutex.c
> +++ b/cpukit/score/src/coremutex.c
> @@ -96,3 +96,25 @@ CORE_mutex_Status _CORE_mutex_Initialize(
>
>    return CORE_MUTEX_STATUS_SUCCESSFUL;
>  }
> +
> +Thread_Control *_CORE_mutex_Was_deleted(
> +  Thread_Control       *the_thread,
> +  Thread_queue_Control *the_thread_queue,
> +  ISR_lock_Context     *lock_context
> +)
> +{
> +  the_thread->Wait.return_code = CORE_MUTEX_WAS_DELETED;
> +
> +  return the_thread;
> +}
> +
> +Thread_Control *_CORE_mutex_Unsatisfied_nowait(
> +  Thread_Control       *the_thread,
> +  Thread_queue_Control *the_thread_queue,
> +  ISR_lock_Context     *lock_context
> +)
> +{
> +  the_thread->Wait.return_code = CORE_MUTEX_STATUS_UNSATISFIED_NOWAIT;
> +
> +  return the_thread;
> +}
> diff --git a/cpukit/score/src/coresem.c b/cpukit/score/src/coresem.c
> index 2bdd81c..a36a4f7 100644
> --- a/cpukit/score/src/coresem.c
> +++ b/cpukit/score/src/coresem.c
> @@ -36,3 +36,25 @@ void _CORE_semaphore_Initialize(
>      the_semaphore->operations = &_Thread_queue_Operations_FIFO;
>    }
>  }
> +
> +Thread_Control *_CORE_semaphore_Was_deleted(
> +  Thread_Control       *the_thread,
> +  Thread_queue_Control *the_thread_queue,
> +  ISR_lock_Context     *lock_context
> +)
> +{
> +  the_thread->Wait.return_code = CORE_SEMAPHORE_WAS_DELETED;
> +
> +  return the_thread;
> +}
> +
> +Thread_Control *_CORE_semaphore_Unsatisfied_nowait(
> +  Thread_Control       *the_thread,
> +  Thread_queue_Control *the_thread_queue,
> +  ISR_lock_Context     *lock_context
> +)
> +{
> +  the_thread->Wait.return_code = CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT;
> +
> +  return the_thread;
> +}
> diff --git a/cpukit/score/src/threadqflush.c b/cpukit/score/src/threadqflush.c
> index c508314..de64a49 100644
> --- a/cpukit/score/src/threadqflush.c
> +++ b/cpukit/score/src/threadqflush.c
> @@ -18,46 +18,82 @@
>  #include "config.h"
>  #endif
>
> -#include <rtems/score/threadqimpl.h>
> -#include <rtems/score/objectimpl.h>
> +#include <rtems/score/threadimpl.h>
>
> -void _Thread_queue_Do_flush(
> +size_t _Thread_queue_Do_flush_critical(
>    Thread_queue_Control          *the_thread_queue,
>    const Thread_queue_Operations *operations,
> -  uint32_t                       status
> +  Thread_queue_Flush_filter      filter,
>  #if defined(RTEMS_MULTIPROCESSING)
> -  ,
>    Thread_queue_MP_callout        mp_callout,
> -  Objects_Id                     mp_id
> +  Objects_Id                     mp_id,
>  #endif
> +  ISR_lock_Context              *lock_context
>  )
>  {
> -  ISR_lock_Context  lock_context;
> -  Thread_Control   *the_thread;
> -
> -  _Thread_queue_Acquire( the_thread_queue, &lock_context );
> -
> -  while (
> -    (
> -      the_thread = _Thread_queue_First_locked(
> -        the_thread_queue,
> -        operations
> -      )
> -    )
> -  ) {
> -    the_thread->Wait.return_code = status;
> -
> -    _Thread_queue_Extract_critical(
> +  size_t         flushed;
> +  Chain_Control  unblock;
> +  Chain_Node    *node;
> +  Chain_Node    *tail;
> +
> +  flushed = 0;
> +  _Chain_Initialize_empty( &unblock );
> +
> +  while ( true ) {
> +    Thread_queue_Heads *heads;
> +    Thread_Control     *first;
> +    bool                do_unblock;
> +
> +    heads = the_thread_queue->Queue.heads;
> +    if ( heads == NULL ) {
> +      break;
> +    }
> +
> +    first = ( *operations->first )( heads );
> +    first = ( *filter )( first, the_thread_queue, lock_context );
> +    if ( first == NULL ) {
> +      break;
> +    }
> +
> +    do_unblock = _Thread_queue_Extract_locked(
>        &the_thread_queue->Queue,
>        operations,
> -      the_thread,
> +      first,
>        mp_callout,
> -      mp_id,
> -      &lock_context
> +      mp_id
>      );
> +    if ( do_unblock ) {
> +      _Chain_Append_unprotected( &unblock, &first->Wait.Node.Chain );
> +    }
> +
> +    ++flushed;
> +  }
> +
> +  node = _Chain_First( &unblock );
> +  tail = _Chain_Tail( &unblock );
> +
> +  if ( node != tail ) {
> +    Per_CPU_Control *cpu_self;
> +
> +    cpu_self = _Thread_Dispatch_disable_critical( lock_context );
> +    _Thread_queue_Release( the_thread_queue, lock_context );
> +
> +    do {
> +      Thread_Control *the_thread;
> +      Chain_Node     *next;
> +
> +      next = _Chain_Next( node );
> +      the_thread = THREAD_CHAIN_NODE_TO_THREAD( node );
> +      _Thread_Timer_remove( the_thread );
> +      _Thread_Unblock( the_thread );
> +
> +      node = next;
> +    } while ( node != tail );
>
> -    _Thread_queue_Acquire( the_thread_queue, &lock_context );
> +    _Thread_Dispatch_enable( cpu_self );
> +  } else {
> +    _Thread_queue_Release( the_thread_queue, lock_context );
>    }
>
> -  _Thread_queue_Release( the_thread_queue, &lock_context );
> +  return flushed;
>  }
> --
> 1.8.4.5
>
> _______________________________________________
> devel mailing list
> devel at rtems.org
> http://lists.rtems.org/mailman/listinfo/devel



More information about the devel mailing list