[rtems commit] score: Introduce _Thread_queue_Flush_critical()

Sebastian Huber sebh at rtems.org
Thu Apr 21 05:34:43 UTC 2016


Module:    rtems
Branch:    master
Commit:    adbedd10cfe5259018b1682d903ab40f6005b3f0
Changeset: http://git.rtems.org/rtems/commit/?id=adbedd10cfe5259018b1682d903ab40f6005b3f0

Author:    Sebastian Huber <sebastian.huber at embedded-brains.de>
Date:      Fri Apr 15 21:18:26 2016 +0200

score: Introduce _Thread_queue_Flush_critical()

Replace _Thread_queue_Flush() with _Thread_queue_Flush_critical() and
add a filter function for customization of the thread queue flush
operation.

Update #2555.

---

 cpukit/rtems/src/semdelete.c                       |  2 +-
 cpukit/rtems/src/semflush.c                        |  2 +-
 cpukit/score/include/rtems/score/corebarrierimpl.h | 28 +++++--
 cpukit/score/include/rtems/score/coremuteximpl.h   | 36 +++++++--
 cpukit/score/include/rtems/score/coresemimpl.h     | 46 ++++++++---
 cpukit/score/include/rtems/score/threadqimpl.h     | 87 ++++++++++++++------
 cpukit/score/src/corebarrier.c                     | 12 ++-
 cpukit/score/src/coremsgclose.c                    | 23 +++++-
 cpukit/score/src/coremsgflush.c                    |  2 +-
 cpukit/score/src/coremutex.c                       | 22 +++++
 cpukit/score/src/coresem.c                         | 22 +++++
 cpukit/score/src/threadqflush.c                    | 94 +++++++++++++++-------
 12 files changed, 290 insertions(+), 86 deletions(-)

diff --git a/cpukit/rtems/src/semdelete.c b/cpukit/rtems/src/semdelete.c
index f503cd9..48a9055 100644
--- a/cpukit/rtems/src/semdelete.c
+++ b/cpukit/rtems/src/semdelete.c
@@ -92,7 +92,7 @@ rtems_status_code rtems_semaphore_delete(
       if ( !_Attributes_Is_counting_semaphore( attribute_set ) ) {
         _CORE_mutex_Flush(
           &the_semaphore->Core_control.mutex,
-          CORE_MUTEX_WAS_DELETED,
+          _CORE_mutex_Was_deleted,
           _Semaphore_MP_Send_object_was_deleted,
           id
         );
diff --git a/cpukit/rtems/src/semflush.c b/cpukit/rtems/src/semflush.c
index 64386b0..01c5c0d 100644
--- a/cpukit/rtems/src/semflush.c
+++ b/cpukit/rtems/src/semflush.c
@@ -53,7 +53,7 @@ rtems_status_code rtems_semaphore_flush(
       if ( !_Attributes_Is_counting_semaphore( attribute_set ) ) {
         _CORE_mutex_Flush(
           &the_semaphore->Core_control.mutex,
-          CORE_MUTEX_STATUS_UNSATISFIED_NOWAIT,
+          _CORE_mutex_Unsatisfied_nowait,
           _Semaphore_MP_Send_object_was_deleted,
           id
         );
diff --git a/cpukit/score/include/rtems/score/corebarrierimpl.h b/cpukit/score/include/rtems/score/corebarrierimpl.h
index 1d77405..03abecf 100644
--- a/cpukit/score/include/rtems/score/corebarrierimpl.h
+++ b/cpukit/score/include/rtems/score/corebarrierimpl.h
@@ -193,19 +193,33 @@ uint32_t _CORE_barrier_Do_release(
     )
 #endif
 
+Thread_Control *_CORE_barrier_Was_deleted(
+  Thread_Control     *the_thread,
+  Thread_queue_Queue *queue,
+  ISR_lock_Context   *lock_context
+);
+
 /* Must be a macro due to the multiprocessing dependent parameters */
 #define _CORE_barrier_Flush( \
   the_barrier, \
   mp_callout, \
   mp_id \
 ) \
-  _Thread_queue_Flush( \
-    &( the_barrier )->Wait_queue, \
-    CORE_BARRIER_TQ_OPERATIONS, \
-    CORE_BARRIER_WAS_DELETED, \
-    mp_callout, \
-    mp_id \
-  )
+  do { \
+    ISR_lock_Context _core_barrier_flush_lock_context; \
+    _Thread_queue_Acquire( \
+      &( the_barrier )->Wait_queue, \
+      &_core_barrier_flush_lock_context \
+    ); \
+    _Thread_queue_Flush_critical( \
+      &( the_barrier )->Wait_queue.Queue, \
+      CORE_BARRIER_TQ_OPERATIONS, \
+      _CORE_barrier_Was_deleted, \
+      mp_callout, \
+      mp_id, \
+      &_core_barrier_flush_lock_context \
+    ); \
+  } while ( 0 )
 
 /**
  * This function returns true if the automatic release attribute is
diff --git a/cpukit/score/include/rtems/score/coremuteximpl.h b/cpukit/score/include/rtems/score/coremuteximpl.h
index eae6ef1..73331a5 100644
--- a/cpukit/score/include/rtems/score/coremuteximpl.h
+++ b/cpukit/score/include/rtems/score/coremuteximpl.h
@@ -338,20 +338,40 @@ CORE_mutex_Status _CORE_mutex_Do_surrender(
     )
 #endif
 
+Thread_Control *_CORE_mutex_Was_deleted(
+  Thread_Control     *the_thread,
+  Thread_queue_Queue *queue,
+  ISR_lock_Context   *lock_context
+);
+
+Thread_Control *_CORE_mutex_Unsatisfied_nowait(
+  Thread_Control     *the_thread,
+  Thread_queue_Queue *queue,
+  ISR_lock_Context   *lock_context
+);
+
 /* Must be a macro due to the multiprocessing dependent parameters */
 #define _CORE_mutex_Flush( \
   the_mutex, \
-  status, \
+  filter, \
   mp_callout, \
   mp_id \
 ) \
-  _Thread_queue_Flush( \
-    &( the_mutex )->Wait_queue, \
-    ( the_mutex )->operations, \
-    status, \
-    mp_callout, \
-    mp_id \
-  )
+  do { \
+    ISR_lock_Context _core_mutex_flush_lock_context; \
+    _Thread_queue_Acquire( \
+      &( the_mutex )->Wait_queue, \
+      &_core_mutex_flush_lock_context \
+    ); \
+    _Thread_queue_Flush_critical( \
+      &( the_mutex )->Wait_queue.Queue, \
+      ( the_mutex )->operations, \
+      filter, \
+      mp_callout, \
+      mp_id, \
+      &_core_mutex_flush_lock_context \
+    ); \
+  } while ( 0 )
 
 /**
  * @brief Is mutex locked.
diff --git a/cpukit/score/include/rtems/score/coresemimpl.h b/cpukit/score/include/rtems/score/coresemimpl.h
index e0e2788..fd01f93 100644
--- a/cpukit/score/include/rtems/score/coresemimpl.h
+++ b/cpukit/score/include/rtems/score/coresemimpl.h
@@ -86,18 +86,36 @@ void _CORE_semaphore_Initialize(
   uint32_t                    initial_value
 );
 
+Thread_Control *_CORE_semaphore_Was_deleted(
+  Thread_Control     *the_thread,
+  Thread_queue_Queue *queue,
+  ISR_lock_Context   *lock_context
+);
+
+Thread_Control *_CORE_semaphore_Unsatisfied_nowait(
+  Thread_Control     *the_thread,
+  Thread_queue_Queue *queue,
+  ISR_lock_Context   *lock_context
+);
+
 #define _CORE_semaphore_Destroy( \
   the_semaphore, \
   mp_callout, \
   mp_id \
 ) \
   do { \
-    _Thread_queue_Flush( \
+    ISR_lock_Context _core_semaphore_destroy_lock_context; \
+    _Thread_queue_Acquire( \
       &( the_semaphore )->Wait_queue, \
+      &_core_semaphore_destroy_lock_context \
+    ); \
+    _Thread_queue_Flush_critical( \
+      &( the_semaphore )->Wait_queue.Queue, \
       ( the_semaphore )->operations, \
-      CORE_SEMAPHORE_WAS_DELETED, \
+      _CORE_semaphore_Was_deleted, \
       mp_callout, \
-      mp_id \
+      mp_id, \
+      &_core_semaphore_destroy_lock_context \
     ); \
     _Thread_queue_Destroy( &( the_semaphore )->Wait_queue ); \
   } while ( 0 )
@@ -192,13 +210,21 @@ RTEMS_INLINE_ROUTINE CORE_semaphore_Status _CORE_semaphore_Do_surrender(
   mp_callout, \
   mp_id \
 ) \
-  _Thread_queue_Flush( \
-    &( the_semaphore )->Wait_queue, \
-    ( the_semaphore )->operations, \
-    CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT, \
-    mp_callout, \
-    mp_id \
-  )
+  do { \
+    ISR_lock_Context _core_semaphore_flush_lock_context; \
+    _Thread_queue_Acquire( \
+      &( the_semaphore )->Wait_queue, \
+      &_core_semaphore_flush_lock_context \
+    ); \
+    _Thread_queue_Flush_critical( \
+      &( the_semaphore )->Wait_queue.Queue, \
+      ( the_semaphore )->operations, \
+      _CORE_semaphore_Unsatisfied_nowait, \
+      mp_callout, \
+      mp_id, \
+      &_core_semaphore_flush_lock_context \
+    ); \
+  } while ( 0 )
 
 /**
  * This routine returns the current count associated with the semaphore.
diff --git a/cpukit/score/include/rtems/score/threadqimpl.h b/cpukit/score/include/rtems/score/threadqimpl.h
index d56be79..7b1c896 100644
--- a/cpukit/score/include/rtems/score/threadqimpl.h
+++ b/cpukit/score/include/rtems/score/threadqimpl.h
@@ -590,57 +590,96 @@ Thread_Control *_Thread_queue_First(
   const Thread_queue_Operations *operations
 );
 
-void _Thread_queue_Do_flush(
-  Thread_queue_Control          *the_thread_queue,
+/**
+ * @brief Thread queue flush filter function.
+ *
+ * Called under protection of the thread queue lock by
+ * _Thread_queue_Flush_critical() to optionally alter the thread wait
+ * information and control the iteration.
+ *
+ * @param the_thread The thread to extract.  This is the first parameter to
+ *   optimize for architectures that use the same register for the first
+ *   parameter and the return value.
+ * @param queue The actual thread queue.
+ * @param lock_context The lock context of the lock acquire.  May be used to
+ *   pass additional data to the filter function via an overlay structure.  The
+ *   filter function should not release or acquire the thread queue lock.
+ *
+ * @retval the_thread Extract this thread.
+ * @retval NULL Do not extract this thread and stop the thread queue flush
+ *   operation.  Threads that are already extracted will complete the flush
+ *   operation.
+ */
+typedef Thread_Control *( *Thread_queue_Flush_filter )(
+  Thread_Control     *the_thread,
+  Thread_queue_Queue *queue,
+  ISR_lock_Context   *lock_context
+);
+
+size_t _Thread_queue_Do_flush_critical(
+  Thread_queue_Queue            *queue,
   const Thread_queue_Operations *operations,
-  uint32_t                       status
+  Thread_queue_Flush_filter      filter,
 #if defined(RTEMS_MULTIPROCESSING)
-  ,
   Thread_queue_MP_callout        mp_callout,
-  Objects_Id                     mp_id
+  Objects_Id                     mp_id,
 #endif
+  ISR_lock_Context              *lock_context
 );
 
 /**
- * @brief Unblocks all threads blocked on the thread queue.
+ * @brief Unblocks all threads enqueued on the thread queue.
  *
- * The thread timers of the threads are cancelled.
+ * This function iteratively extracts the first enqueued thread of the thread
+ * queue until the thread queue is empty or the filter function indicates a
+ * stop.  The thread timers of the extracted threads are cancelled.  The
+ * extracted threads are unblocked.
  *
- * @param the_thread_queue The thread queue.
+ * @param queue The actual thread queue.
  * @param operations The thread queue operations.
- * @param status The return status for the threads.
+ * @param filter The filter functions is called for each thread to extract from
+ *   the thread queue.  It may be used to alter the thread under protection of
+ *   the thread queue lock, for example to set the thread wait return code.
+ *   The return value of the filter function controls if the thread queue flush
+ *   operation should stop or continue.
  * @param mp_callout Callout to extract the proxy of a remote thread.  This
  *   parameter is only used on multiprocessing configurations.
  * @param mp_id Object identifier of the object containing the thread queue.
  *   This parameter is only used on multiprocessing configurations.
+ *
+ * @return The count of extracted threads.
  */
 #if defined(RTEMS_MULTIPROCESSING)
-  #define _Thread_queue_Flush( \
-    the_thread_queue, \
+  #define _Thread_queue_Flush_critical( \
+    queue, \
     operations, \
-    status, \
+    filter, \
     mp_callout, \
-    mp_id \
+    mp_id, \
+    lock_context \
   ) \
-    _Thread_queue_Do_flush( \
-      the_thread_queue, \
+    _Thread_queue_Do_flush_critical( \
+      queue, \
       operations, \
-      status, \
+      filter, \
       mp_callout, \
-      mp_id \
+      mp_id, \
+      lock_context \
     )
 #else
-  #define _Thread_queue_Flush( \
-    the_thread_queue, \
+  #define _Thread_queue_Flush_critical( \
+    queue, \
     operations, \
-    status, \
+    filter, \
     mp_callout, \
-    mp_id \
+    mp_id, \
+    lock_context \
   ) \
-    _Thread_queue_Do_flush( \
-      the_thread_queue, \
+    _Thread_queue_Do_flush_critical( \
+      queue, \
       operations, \
-      status \
+      filter, \
+      lock_context \
     )
 #endif
 
diff --git a/cpukit/score/src/corebarrier.c b/cpukit/score/src/corebarrier.c
index 5313a0f..3cb7906 100644
--- a/cpukit/score/src/corebarrier.c
+++ b/cpukit/score/src/corebarrier.c
@@ -19,7 +19,6 @@
 #endif
 
 #include <rtems/score/corebarrierimpl.h>
-#include <rtems/score/threadqimpl.h>
 
 void _CORE_barrier_Initialize(
   CORE_barrier_Control       *the_barrier,
@@ -32,3 +31,14 @@ void _CORE_barrier_Initialize(
 
   _Thread_queue_Initialize( &the_barrier->Wait_queue );
 }
+
+Thread_Control *_CORE_barrier_Was_deleted(
+  Thread_Control     *the_thread,
+  Thread_queue_Queue *queue,
+  ISR_lock_Context   *lock_context
+)
+{
+  the_thread->Wait.return_code = CORE_BARRIER_WAS_DELETED;
+
+  return the_thread;
+}
diff --git a/cpukit/score/src/coremsgclose.c b/cpukit/score/src/coremsgclose.c
index 1511f83..7184b11 100644
--- a/cpukit/score/src/coremsgclose.c
+++ b/cpukit/score/src/coremsgclose.c
@@ -21,6 +21,17 @@
 #include <rtems/score/coremsgimpl.h>
 #include <rtems/score/wkspace.h>
 
+static Thread_Control *_CORE_message_queue_Was_deleted(
+  Thread_Control     *the_thread,
+  Thread_queue_Queue *queue,
+  ISR_lock_Context   *lock_context
+)
+{
+  the_thread->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED;
+
+  return the_thread;
+}
+
 void _CORE_message_queue_Do_close(
   CORE_message_queue_Control *the_message_queue
 #if defined(RTEMS_MULTIPROCESSING)
@@ -30,17 +41,21 @@ void _CORE_message_queue_Do_close(
 #endif
 )
 {
+  ISR_lock_Context lock_context;
+
   /*
    *  This will flush blocked threads whether they were blocked on
    *  a send or receive.
    */
 
-  _Thread_queue_Flush(
-    &the_message_queue->Wait_queue,
+  _CORE_message_queue_Acquire( the_message_queue, &lock_context );
+  _Thread_queue_Flush_critical(
+    &the_message_queue->Wait_queue.Queue,
     the_message_queue->operations,
-    CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED,
+    _CORE_message_queue_Was_deleted,
     mp_callout,
-    mp_id
+    mp_id,
+    &lock_context
   );
 
   (void) _Workspace_Free( the_message_queue->message_buffers );
diff --git a/cpukit/score/src/coremsgflush.c b/cpukit/score/src/coremsgflush.c
index f67dcf2..38f26b7 100644
--- a/cpukit/score/src/coremsgflush.c
+++ b/cpukit/score/src/coremsgflush.c
@@ -41,7 +41,7 @@ uint32_t   _CORE_message_queue_Flush(
    *
    *  (1) The thread queue of pending senders is a logical extension
    *  of the pending message queue.  In this case, it should be
-   *  flushed using the _Thread_queue_Flush() service with a status
+   *  flushed using the _Thread_queue_Flush_critical() service with a status
    *  such as CORE_MESSAGE_QUEUE_SENDER_FLUSHED (which currently does
    *  not exist).  This can be implemented without changing the "big-O"
    *  of the message flushing part of the routine.
diff --git a/cpukit/score/src/coremutex.c b/cpukit/score/src/coremutex.c
index 88d487c..75e0c49 100644
--- a/cpukit/score/src/coremutex.c
+++ b/cpukit/score/src/coremutex.c
@@ -96,3 +96,25 @@ CORE_mutex_Status _CORE_mutex_Initialize(
 
   return CORE_MUTEX_STATUS_SUCCESSFUL;
 }
+
+Thread_Control *_CORE_mutex_Was_deleted(
+  Thread_Control     *the_thread,
+  Thread_queue_Queue *queue,
+  ISR_lock_Context   *lock_context
+)
+{
+  the_thread->Wait.return_code = CORE_MUTEX_WAS_DELETED;
+
+  return the_thread;
+}
+
+Thread_Control *_CORE_mutex_Unsatisfied_nowait(
+  Thread_Control     *the_thread,
+  Thread_queue_Queue *queue,
+  ISR_lock_Context   *lock_context
+)
+{
+  the_thread->Wait.return_code = CORE_MUTEX_STATUS_UNSATISFIED_NOWAIT;
+
+  return the_thread;
+}
diff --git a/cpukit/score/src/coresem.c b/cpukit/score/src/coresem.c
index 2bdd81c..02a3837 100644
--- a/cpukit/score/src/coresem.c
+++ b/cpukit/score/src/coresem.c
@@ -36,3 +36,25 @@ void _CORE_semaphore_Initialize(
     the_semaphore->operations = &_Thread_queue_Operations_FIFO;
   }
 }
+
+Thread_Control *_CORE_semaphore_Was_deleted(
+  Thread_Control     *the_thread,
+  Thread_queue_Queue *queue,
+  ISR_lock_Context   *lock_context
+)
+{
+  the_thread->Wait.return_code = CORE_SEMAPHORE_WAS_DELETED;
+
+  return the_thread;
+}
+
+Thread_Control *_CORE_semaphore_Unsatisfied_nowait(
+  Thread_Control     *the_thread,
+  Thread_queue_Queue *queue,
+  ISR_lock_Context   *lock_context
+)
+{
+  the_thread->Wait.return_code = CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT;
+
+  return the_thread;
+}
diff --git a/cpukit/score/src/threadqflush.c b/cpukit/score/src/threadqflush.c
index c508314..0ce639e 100644
--- a/cpukit/score/src/threadqflush.c
+++ b/cpukit/score/src/threadqflush.c
@@ -18,46 +18,82 @@
 #include "config.h"
 #endif
 
-#include <rtems/score/threadqimpl.h>
-#include <rtems/score/objectimpl.h>
+#include <rtems/score/threadimpl.h>
 
-void _Thread_queue_Do_flush(
-  Thread_queue_Control          *the_thread_queue,
+size_t _Thread_queue_Do_flush_critical(
+  Thread_queue_Queue            *queue,
   const Thread_queue_Operations *operations,
-  uint32_t                       status
+  Thread_queue_Flush_filter      filter,
 #if defined(RTEMS_MULTIPROCESSING)
-  ,
   Thread_queue_MP_callout        mp_callout,
-  Objects_Id                     mp_id
+  Objects_Id                     mp_id,
 #endif
+  ISR_lock_Context              *lock_context
 )
 {
-  ISR_lock_Context  lock_context;
-  Thread_Control   *the_thread;
-
-  _Thread_queue_Acquire( the_thread_queue, &lock_context );
-
-  while (
-    (
-      the_thread = _Thread_queue_First_locked(
-        the_thread_queue,
-        operations
-      )
-    )
-  ) {
-    the_thread->Wait.return_code = status;
-
-    _Thread_queue_Extract_critical(
-      &the_thread_queue->Queue,
+  size_t         flushed;
+  Chain_Control  unblock;
+  Chain_Node    *node;
+  Chain_Node    *tail;
+
+  flushed = 0;
+  _Chain_Initialize_empty( &unblock );
+
+  while ( true ) {
+    Thread_queue_Heads *heads;
+    Thread_Control     *first;
+    bool                do_unblock;
+
+    heads = queue->heads;
+    if ( heads == NULL ) {
+      break;
+    }
+
+    first = ( *operations->first )( heads );
+    first = ( *filter )( first, queue, lock_context );
+    if ( first == NULL ) {
+      break;
+    }
+
+    do_unblock = _Thread_queue_Extract_locked(
+      queue,
       operations,
-      the_thread,
+      first,
       mp_callout,
-      mp_id,
-      &lock_context
+      mp_id
     );
+    if ( do_unblock ) {
+      _Chain_Append_unprotected( &unblock, &first->Wait.Node.Chain );
+    }
+
+    ++flushed;
+  }
+
+  node = _Chain_First( &unblock );
+  tail = _Chain_Tail( &unblock );
+
+  if ( node != tail ) {
+    Per_CPU_Control *cpu_self;
+
+    cpu_self = _Thread_Dispatch_disable_critical( lock_context );
+    _Thread_queue_Queue_release( queue, lock_context );
+
+    do {
+      Thread_Control *the_thread;
+      Chain_Node     *next;
+
+      next = _Chain_Next( node );
+      the_thread = THREAD_CHAIN_NODE_TO_THREAD( node );
+      _Thread_Timer_remove( the_thread );
+      _Thread_Unblock( the_thread );
+
+      node = next;
+    } while ( node != tail );
 
-    _Thread_queue_Acquire( the_thread_queue, &lock_context );
+    _Thread_Dispatch_enable( cpu_self );
+  } else {
+    _Thread_queue_Queue_release( queue, lock_context );
   }
 
-  _Thread_queue_Release( the_thread_queue, &lock_context );
+  return flushed;
 }




More information about the vc mailing list