[PATCH 13/20] score: Introduce _Thread_queue_Flush_critical()
Gedare Bloom
gedare at rtems.org
Tue Apr 19 15:54:06 UTC 2016
On Tue, Apr 19, 2016 at 9:12 AM, Sebastian Huber
<sebastian.huber at embedded-brains.de> wrote:
> Replace _Thread_queue_Flush() with _Thread_queue_Flush_critical() and
> add a filter function for customization of the thread queue flush
> operation.
>
> Update #2555.
> ---
> cpukit/rtems/src/semdelete.c | 2 +-
> cpukit/rtems/src/semflush.c | 2 +-
> cpukit/score/include/rtems/score/corebarrierimpl.h | 28 +++++--
> cpukit/score/include/rtems/score/coremuteximpl.h | 36 +++++++--
> cpukit/score/include/rtems/score/coresemimpl.h | 46 ++++++++---
> cpukit/score/include/rtems/score/threadqimpl.h | 59 ++++++++++----
> cpukit/score/src/corebarrier.c | 12 ++-
> cpukit/score/src/coremsgclose.c | 21 ++++-
> cpukit/score/src/coremsgflush.c | 2 +-
> cpukit/score/src/coremutex.c | 22 ++++++
> cpukit/score/src/coresem.c | 22 ++++++
> cpukit/score/src/threadqflush.c | 90 +++++++++++++++-------
> 12 files changed, 266 insertions(+), 76 deletions(-)
>
> diff --git a/cpukit/rtems/src/semdelete.c b/cpukit/rtems/src/semdelete.c
> index f503cd9..48a9055 100644
> --- a/cpukit/rtems/src/semdelete.c
> +++ b/cpukit/rtems/src/semdelete.c
> @@ -92,7 +92,7 @@ rtems_status_code rtems_semaphore_delete(
> if ( !_Attributes_Is_counting_semaphore( attribute_set ) ) {
> _CORE_mutex_Flush(
> &the_semaphore->Core_control.mutex,
> - CORE_MUTEX_WAS_DELETED,
> + _CORE_mutex_Was_deleted,
> _Semaphore_MP_Send_object_was_deleted,
> id
> );
> diff --git a/cpukit/rtems/src/semflush.c b/cpukit/rtems/src/semflush.c
> index 64386b0..01c5c0d 100644
> --- a/cpukit/rtems/src/semflush.c
> +++ b/cpukit/rtems/src/semflush.c
> @@ -53,7 +53,7 @@ rtems_status_code rtems_semaphore_flush(
> if ( !_Attributes_Is_counting_semaphore( attribute_set ) ) {
> _CORE_mutex_Flush(
> &the_semaphore->Core_control.mutex,
> - CORE_MUTEX_STATUS_UNSATISFIED_NOWAIT,
> + _CORE_mutex_Unsatisfied_nowait,
> _Semaphore_MP_Send_object_was_deleted,
> id
> );
> diff --git a/cpukit/score/include/rtems/score/corebarrierimpl.h b/cpukit/score/include/rtems/score/corebarrierimpl.h
> index 1d77405..8e923df 100644
> --- a/cpukit/score/include/rtems/score/corebarrierimpl.h
> +++ b/cpukit/score/include/rtems/score/corebarrierimpl.h
> @@ -193,19 +193,33 @@ uint32_t _CORE_barrier_Do_release(
> )
> #endif
>
> +Thread_Control *_CORE_barrier_Was_deleted(
> + Thread_Control *the_thread,
> + Thread_queue_Control *the_thread_queue,
> + ISR_lock_Context *lock_context
> +);
> +
> /* Must be a macro due to the multiprocessing dependent parameters */
> #define _CORE_barrier_Flush( \
> the_barrier, \
> mp_callout, \
> mp_id \
> ) \
> - _Thread_queue_Flush( \
> - &( the_barrier )->Wait_queue, \
> - CORE_BARRIER_TQ_OPERATIONS, \
> - CORE_BARRIER_WAS_DELETED, \
> - mp_callout, \
> - mp_id \
> - )
> + do { \
> + ISR_lock_Context _core_barrier_flush_lock_context; \
> + _Thread_queue_Acquire( \
> + &( the_barrier )->Wait_queue, \
> + &_core_barrier_flush_lock_context \
> + ); \
> + _Thread_queue_Flush_critical( \
> + &( the_barrier )->Wait_queue, \
> + CORE_BARRIER_TQ_OPERATIONS, \
> + _CORE_barrier_Was_deleted, \
> + mp_callout, \
> + mp_id, \
> + &_core_barrier_flush_lock_context \
> + ); \
> + } while ( 0 )
>
> /**
> * This function returns true if the automatic release attribute is
> diff --git a/cpukit/score/include/rtems/score/coremuteximpl.h b/cpukit/score/include/rtems/score/coremuteximpl.h
> index eae6ef1..0f79923 100644
> --- a/cpukit/score/include/rtems/score/coremuteximpl.h
> +++ b/cpukit/score/include/rtems/score/coremuteximpl.h
> @@ -338,20 +338,40 @@ CORE_mutex_Status _CORE_mutex_Do_surrender(
> )
> #endif
>
> +Thread_Control *_CORE_mutex_Was_deleted(
> + Thread_Control *the_thread,
> + Thread_queue_Control *the_thread_queue,
> + ISR_lock_Context *lock_context
> +);
> +
> +Thread_Control *_CORE_mutex_Unsatisfied_nowait(
> + Thread_Control *the_thread,
> + Thread_queue_Control *the_thread_queue,
> + ISR_lock_Context *lock_context
> +);
> +
> /* Must be a macro due to the multiprocessing dependent parameters */
> #define _CORE_mutex_Flush( \
> the_mutex, \
> - status, \
> + filter, \
> mp_callout, \
> mp_id \
> ) \
> - _Thread_queue_Flush( \
> - &( the_mutex )->Wait_queue, \
> - ( the_mutex )->operations, \
> - status, \
> - mp_callout, \
> - mp_id \
> - )
> + do { \
> + ISR_lock_Context _core_mutex_flush_lock_context; \
> + _Thread_queue_Acquire( \
> + &( the_mutex )->Wait_queue, \
> + &_core_mutex_flush_lock_context \
> + ); \
> + _Thread_queue_Flush_critical( \
> + &( the_mutex )->Wait_queue, \
> + ( the_mutex )->operations, \
> + filter, \
> + mp_callout, \
> + mp_id, \
> + &_core_mutex_flush_lock_context \
> + ); \
> + } while ( 0 )
>
> /**
> * @brief Is mutex locked.
> diff --git a/cpukit/score/include/rtems/score/coresemimpl.h b/cpukit/score/include/rtems/score/coresemimpl.h
> index d68c3d7..a341f28 100644
> --- a/cpukit/score/include/rtems/score/coresemimpl.h
> +++ b/cpukit/score/include/rtems/score/coresemimpl.h
> @@ -86,19 +86,36 @@ void _CORE_semaphore_Initialize(
> uint32_t initial_value
> );
>
> +Thread_Control *_CORE_semaphore_Was_deleted(
> + Thread_Control *the_thread,
> + Thread_queue_Control *the_thread_queue,
> + ISR_lock_Context *lock_context
> +);
> +
> +Thread_Control *_CORE_semaphore_Unsatisfied_nowait(
> + Thread_Control *the_thread,
> + Thread_queue_Control *the_thread_queue,
> + ISR_lock_Context *lock_context
> +);
> +
> #define _CORE_semaphore_Destroy( \
> the_semaphore, \
> mp_callout, \
> mp_id \
> ) \
> do { \
> - _Thread_queue_Flush( \
> + ISR_lock_Context _core_semaphore_destroy_lock_context; \
> + _Thread_queue_Acquire( \
> + &( the_semaphore )->Wait_queue, \
> + &_core_semaphore_destroy_lock_context \
> + ); \
> + _Thread_queue_Flush_critical( \
> &( the_semaphore )->Wait_queue, \
> ( the_semaphore )->operations, \
> - CORE_SEMAPHORE_WAS_DELETED, \
> + _CORE_semaphore_Was_deleted, \
> mp_callout, \
> mp_id, \
> - &_core_semaphore_flush_lock_context \
> + &_core_semaphore_destroy_lock_context \
> ); \
> _Thread_queue_Destroy( &( the_semaphore )->Wait_queue ); \
> } while ( 0 )
> @@ -190,17 +207,24 @@ RTEMS_INLINE_ROUTINE CORE_semaphore_Status _CORE_semaphore_Do_surrender(
> /* Must be a macro due to the multiprocessing dependent parameters */
> #define _CORE_semaphore_Flush( \
> the_semaphore, \
> - CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT, \
> mp_callout, \
> mp_id \
> ) \
> - _Thread_queue_Flush( \
> - &( the_semaphore )->Wait_queue, \
> - ( the_semaphore )->operations, \
> - CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT, \
> - mp_callout, \
> - mp_id \
> - )
> + do { \
> + ISR_lock_Context _core_semaphore_flush_lock_context; \
> + _Thread_queue_Acquire( \
> + &( the_semaphore )->Wait_queue, \
> + &_core_semaphore_flush_lock_context \
> + ); \
> + _Thread_queue_Flush_critical( \
> + &( the_semaphore )->Wait_queue, \
> + ( the_semaphore )->operations, \
> + _CORE_semaphore_Unsatisfied_nowait, \
> + mp_callout, \
> + mp_id, \
> + &_core_semaphore_flush_lock_context \
> + ); \
> + } while ( 0 )
>
> /**
> * This routine returns the current count associated with the semaphore.
> diff --git a/cpukit/score/include/rtems/score/threadqimpl.h b/cpukit/score/include/rtems/score/threadqimpl.h
> index d56be79..3462b41 100644
> --- a/cpukit/score/include/rtems/score/threadqimpl.h
> +++ b/cpukit/score/include/rtems/score/threadqimpl.h
> @@ -590,15 +590,37 @@ Thread_Control *_Thread_queue_First(
> const Thread_queue_Operations *operations
> );
>
> -void _Thread_queue_Do_flush(
> +/**
> + * @brief Thread queue flush filter function.
> + *
> + * @param the_thread The thread to extract. This is the first parameter to
> + * optimize for architectures that use the same register for the first
> + * parameter and the return value.
> + * @param the_thread_queue The thread queue.
> + * @param lock_context The lock context of the lock acquire. May be used to
> + * pass additional data to the filter function via an overlay structure. The
> + * filter function should not release or acquire the thread queue lock.
> + *
> + * @retval the_thread Extract this thread.
> + * @retval NULL Do not extract this thread and stop the thread queue flush
> + * operation. Threads that are already extracted will complete the flush
> + * operation.
> + */
Can you please add a bit more description about the intent of the
flush filter? It was not clear to me until I looked at the example how
the barrier is using filter to set a flag in the TCB of each flushed
thread. Filter is an unfortunately overloaded term.
> +typedef Thread_Control *( *Thread_queue_Flush_filter )(
> + Thread_Control *the_thread,
> + Thread_queue_Control *the_thread_queue,
> + ISR_lock_Context *lock_context
> +);
> +
> +size_t _Thread_queue_Do_flush_critical(
> Thread_queue_Control *the_thread_queue,
> const Thread_queue_Operations *operations,
> - uint32_t status
> + Thread_queue_Flush_filter filter,
> #if defined(RTEMS_MULTIPROCESSING)
> - ,
> Thread_queue_MP_callout mp_callout,
> - Objects_Id mp_id
> + Objects_Id mp_id,
> #endif
> + ISR_lock_Context *lock_context
> );
>
> /**
> @@ -608,39 +630,44 @@ void _Thread_queue_Do_flush(
> *
> * @param the_thread_queue The thread queue.
> * @param operations The thread queue operations.
> - * @param status The return status for the threads.
> + * @param filter The filter functions called for each thread to extract from
> + * the thread queue.
> * @param mp_callout Callout to extract the proxy of a remote thread. This
> * parameter is only used on multiprocessing configurations.
> * @param mp_id Object identifier of the object containing the thread queue.
> * This parameter is only used on multiprocessing configurations.
> */
> #if defined(RTEMS_MULTIPROCESSING)
> - #define _Thread_queue_Flush( \
> + #define _Thread_queue_Flush_critical( \
> the_thread_queue, \
> operations, \
> - status, \
> + filter, \
> mp_callout, \
> - mp_id \
> + mp_id, \
> + lock_context \
> ) \
> - _Thread_queue_Do_flush( \
> + _Thread_queue_Do_flush_critical( \
> the_thread_queue, \
> operations, \
> - status, \
> + filter, \
> mp_callout, \
> - mp_id \
> + mp_id, \
> + lock_context \
> )
> #else
> - #define _Thread_queue_Flush( \
> + #define _Thread_queue_Flush_critical( \
> the_thread_queue, \
> operations, \
> - status, \
> + filter, \
> mp_callout, \
> - mp_id \
> + mp_id, \
> + lock_context \
> ) \
> - _Thread_queue_Do_flush( \
> + _Thread_queue_Do_flush_critical( \
> the_thread_queue, \
> operations, \
> - status \
> + filter, \
> + lock_context \
> )
> #endif
>
> diff --git a/cpukit/score/src/corebarrier.c b/cpukit/score/src/corebarrier.c
> index 5313a0f..555e3c9 100644
> --- a/cpukit/score/src/corebarrier.c
> +++ b/cpukit/score/src/corebarrier.c
> @@ -19,7 +19,6 @@
> #endif
>
> #include <rtems/score/corebarrierimpl.h>
> -#include <rtems/score/threadqimpl.h>
>
> void _CORE_barrier_Initialize(
> CORE_barrier_Control *the_barrier,
> @@ -32,3 +31,14 @@ void _CORE_barrier_Initialize(
>
> _Thread_queue_Initialize( &the_barrier->Wait_queue );
> }
> +
> +Thread_Control *_CORE_barrier_Was_deleted(
> + Thread_Control *the_thread,
> + Thread_queue_Control *the_thread_queue,
> + ISR_lock_Context *lock_context
> +)
> +{
> + the_thread->Wait.return_code = CORE_BARRIER_WAS_DELETED;
> +
> + return the_thread;
> +}
> diff --git a/cpukit/score/src/coremsgclose.c b/cpukit/score/src/coremsgclose.c
> index 1511f83..acdc9d7 100644
> --- a/cpukit/score/src/coremsgclose.c
> +++ b/cpukit/score/src/coremsgclose.c
> @@ -21,6 +21,17 @@
> #include <rtems/score/coremsgimpl.h>
> #include <rtems/score/wkspace.h>
>
> +static Thread_Control *_CORE_message_queue_Was_deleted(
> + Thread_Control *the_thread,
> + Thread_queue_Control *the_thread_queue,
> + ISR_lock_Context *lock_context
> +)
> +{
> + the_thread->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED;
> +
> + return the_thread;
> +}
> +
> void _CORE_message_queue_Do_close(
> CORE_message_queue_Control *the_message_queue
> #if defined(RTEMS_MULTIPROCESSING)
> @@ -30,17 +41,21 @@ void _CORE_message_queue_Do_close(
> #endif
> )
> {
> + ISR_lock_Context lock_context;
> +
> /*
> * This will flush blocked threads whether they were blocked on
> * a send or receive.
> */
>
> - _Thread_queue_Flush(
> + _CORE_message_queue_Acquire( the_message_queue, &lock_context );
> + _Thread_queue_Flush_critical(
> &the_message_queue->Wait_queue,
> the_message_queue->operations,
> - CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED,
> + _CORE_message_queue_Was_deleted,
> mp_callout,
> - mp_id
> + mp_id,
> + &lock_context
> );
>
> (void) _Workspace_Free( the_message_queue->message_buffers );
> diff --git a/cpukit/score/src/coremsgflush.c b/cpukit/score/src/coremsgflush.c
> index f67dcf2..38f26b7 100644
> --- a/cpukit/score/src/coremsgflush.c
> +++ b/cpukit/score/src/coremsgflush.c
> @@ -41,7 +41,7 @@ uint32_t _CORE_message_queue_Flush(
> *
> * (1) The thread queue of pending senders is a logical extension
> * of the pending message queue. In this case, it should be
> - * flushed using the _Thread_queue_Flush() service with a status
> + * flushed using the _Thread_queue_Flush_critical() service with a status
> * such as CORE_MESSAGE_QUEUE_SENDER_FLUSHED (which currently does
> * not exist). This can be implemented without changing the "big-O"
> * of the message flushing part of the routine.
> diff --git a/cpukit/score/src/coremutex.c b/cpukit/score/src/coremutex.c
> index 88d487c..0b07c89 100644
> --- a/cpukit/score/src/coremutex.c
> +++ b/cpukit/score/src/coremutex.c
> @@ -96,3 +96,25 @@ CORE_mutex_Status _CORE_mutex_Initialize(
>
> return CORE_MUTEX_STATUS_SUCCESSFUL;
> }
> +
> +Thread_Control *_CORE_mutex_Was_deleted(
> + Thread_Control *the_thread,
> + Thread_queue_Control *the_thread_queue,
> + ISR_lock_Context *lock_context
> +)
> +{
> + the_thread->Wait.return_code = CORE_MUTEX_WAS_DELETED;
> +
> + return the_thread;
> +}
> +
> +Thread_Control *_CORE_mutex_Unsatisfied_nowait(
> + Thread_Control *the_thread,
> + Thread_queue_Control *the_thread_queue,
> + ISR_lock_Context *lock_context
> +)
> +{
> + the_thread->Wait.return_code = CORE_MUTEX_STATUS_UNSATISFIED_NOWAIT;
> +
> + return the_thread;
> +}
> diff --git a/cpukit/score/src/coresem.c b/cpukit/score/src/coresem.c
> index 2bdd81c..a36a4f7 100644
> --- a/cpukit/score/src/coresem.c
> +++ b/cpukit/score/src/coresem.c
> @@ -36,3 +36,25 @@ void _CORE_semaphore_Initialize(
> the_semaphore->operations = &_Thread_queue_Operations_FIFO;
> }
> }
> +
> +Thread_Control *_CORE_semaphore_Was_deleted(
> + Thread_Control *the_thread,
> + Thread_queue_Control *the_thread_queue,
> + ISR_lock_Context *lock_context
> +)
> +{
> + the_thread->Wait.return_code = CORE_SEMAPHORE_WAS_DELETED;
> +
> + return the_thread;
> +}
> +
> +Thread_Control *_CORE_semaphore_Unsatisfied_nowait(
> + Thread_Control *the_thread,
> + Thread_queue_Control *the_thread_queue,
> + ISR_lock_Context *lock_context
> +)
> +{
> + the_thread->Wait.return_code = CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT;
> +
> + return the_thread;
> +}
> diff --git a/cpukit/score/src/threadqflush.c b/cpukit/score/src/threadqflush.c
> index c508314..de64a49 100644
> --- a/cpukit/score/src/threadqflush.c
> +++ b/cpukit/score/src/threadqflush.c
> @@ -18,46 +18,82 @@
> #include "config.h"
> #endif
>
> -#include <rtems/score/threadqimpl.h>
> -#include <rtems/score/objectimpl.h>
> +#include <rtems/score/threadimpl.h>
>
> -void _Thread_queue_Do_flush(
> +size_t _Thread_queue_Do_flush_critical(
> Thread_queue_Control *the_thread_queue,
> const Thread_queue_Operations *operations,
> - uint32_t status
> + Thread_queue_Flush_filter filter,
> #if defined(RTEMS_MULTIPROCESSING)
> - ,
> Thread_queue_MP_callout mp_callout,
> - Objects_Id mp_id
> + Objects_Id mp_id,
> #endif
> + ISR_lock_Context *lock_context
> )
> {
> - ISR_lock_Context lock_context;
> - Thread_Control *the_thread;
> -
> - _Thread_queue_Acquire( the_thread_queue, &lock_context );
> -
> - while (
> - (
> - the_thread = _Thread_queue_First_locked(
> - the_thread_queue,
> - operations
> - )
> - )
> - ) {
> - the_thread->Wait.return_code = status;
> -
> - _Thread_queue_Extract_critical(
> + size_t flushed;
> + Chain_Control unblock;
> + Chain_Node *node;
> + Chain_Node *tail;
> +
> + flushed = 0;
> + _Chain_Initialize_empty( &unblock );
> +
> + while ( true ) {
> + Thread_queue_Heads *heads;
> + Thread_Control *first;
> + bool do_unblock;
> +
> + heads = the_thread_queue->Queue.heads;
> + if ( heads == NULL ) {
> + break;
> + }
> +
> + first = ( *operations->first )( heads );
> + first = ( *filter )( first, the_thread_queue, lock_context );
> + if ( first == NULL ) {
> + break;
> + }
> +
> + do_unblock = _Thread_queue_Extract_locked(
> &the_thread_queue->Queue,
> operations,
> - the_thread,
> + first,
> mp_callout,
> - mp_id,
> - &lock_context
> + mp_id
> );
> + if ( do_unblock ) {
> + _Chain_Append_unprotected( &unblock, &first->Wait.Node.Chain );
> + }
> +
> + ++flushed;
> + }
> +
> + node = _Chain_First( &unblock );
> + tail = _Chain_Tail( &unblock );
> +
> + if ( node != tail ) {
> + Per_CPU_Control *cpu_self;
> +
> + cpu_self = _Thread_Dispatch_disable_critical( lock_context );
> + _Thread_queue_Release( the_thread_queue, lock_context );
> +
> + do {
> + Thread_Control *the_thread;
> + Chain_Node *next;
> +
> + next = _Chain_Next( node );
> + the_thread = THREAD_CHAIN_NODE_TO_THREAD( node );
> + _Thread_Timer_remove( the_thread );
> + _Thread_Unblock( the_thread );
> +
> + node = next;
> + } while ( node != tail );
>
> - _Thread_queue_Acquire( the_thread_queue, &lock_context );
> + _Thread_Dispatch_enable( cpu_self );
> + } else {
> + _Thread_queue_Release( the_thread_queue, lock_context );
> }
>
> - _Thread_queue_Release( the_thread_queue, &lock_context );
> + return flushed;
> }
> --
> 1.8.4.5
>
> _______________________________________________
> devel mailing list
> devel at rtems.org
> http://lists.rtems.org/mailman/listinfo/devel
More information about the devel
mailing list