[PATCH 13/20] score: Introduce _Thread_queue_Flush_critical()
Sebastian Huber
sebastian.huber at embedded-brains.de
Tue Apr 19 13:12:41 UTC 2016
Replace _Thread_queue_Flush() with _Thread_queue_Flush_critical() and
add a filter function for customization of the thread queue flush
operation.
Update #2555.
---
cpukit/rtems/src/semdelete.c | 2 +-
cpukit/rtems/src/semflush.c | 2 +-
cpukit/score/include/rtems/score/corebarrierimpl.h | 28 +++++--
cpukit/score/include/rtems/score/coremuteximpl.h | 36 +++++++--
cpukit/score/include/rtems/score/coresemimpl.h | 46 ++++++++---
cpukit/score/include/rtems/score/threadqimpl.h | 59 ++++++++++----
cpukit/score/src/corebarrier.c | 12 ++-
cpukit/score/src/coremsgclose.c | 21 ++++-
cpukit/score/src/coremsgflush.c | 2 +-
cpukit/score/src/coremutex.c | 22 ++++++
cpukit/score/src/coresem.c | 22 ++++++
cpukit/score/src/threadqflush.c | 90 +++++++++++++++-------
12 files changed, 266 insertions(+), 76 deletions(-)
diff --git a/cpukit/rtems/src/semdelete.c b/cpukit/rtems/src/semdelete.c
index f503cd9..48a9055 100644
--- a/cpukit/rtems/src/semdelete.c
+++ b/cpukit/rtems/src/semdelete.c
@@ -92,7 +92,7 @@ rtems_status_code rtems_semaphore_delete(
if ( !_Attributes_Is_counting_semaphore( attribute_set ) ) {
_CORE_mutex_Flush(
&the_semaphore->Core_control.mutex,
- CORE_MUTEX_WAS_DELETED,
+ _CORE_mutex_Was_deleted,
_Semaphore_MP_Send_object_was_deleted,
id
);
diff --git a/cpukit/rtems/src/semflush.c b/cpukit/rtems/src/semflush.c
index 64386b0..01c5c0d 100644
--- a/cpukit/rtems/src/semflush.c
+++ b/cpukit/rtems/src/semflush.c
@@ -53,7 +53,7 @@ rtems_status_code rtems_semaphore_flush(
if ( !_Attributes_Is_counting_semaphore( attribute_set ) ) {
_CORE_mutex_Flush(
&the_semaphore->Core_control.mutex,
- CORE_MUTEX_STATUS_UNSATISFIED_NOWAIT,
+ _CORE_mutex_Unsatisfied_nowait,
_Semaphore_MP_Send_object_was_deleted,
id
);
diff --git a/cpukit/score/include/rtems/score/corebarrierimpl.h b/cpukit/score/include/rtems/score/corebarrierimpl.h
index 1d77405..8e923df 100644
--- a/cpukit/score/include/rtems/score/corebarrierimpl.h
+++ b/cpukit/score/include/rtems/score/corebarrierimpl.h
@@ -193,19 +193,33 @@ uint32_t _CORE_barrier_Do_release(
)
#endif
+Thread_Control *_CORE_barrier_Was_deleted(
+ Thread_Control *the_thread,
+ Thread_queue_Control *the_thread_queue,
+ ISR_lock_Context *lock_context
+);
+
/* Must be a macro due to the multiprocessing dependent parameters */
#define _CORE_barrier_Flush( \
the_barrier, \
mp_callout, \
mp_id \
) \
- _Thread_queue_Flush( \
- &( the_barrier )->Wait_queue, \
- CORE_BARRIER_TQ_OPERATIONS, \
- CORE_BARRIER_WAS_DELETED, \
- mp_callout, \
- mp_id \
- )
+ do { \
+ ISR_lock_Context _core_barrier_flush_lock_context; \
+ _Thread_queue_Acquire( \
+ &( the_barrier )->Wait_queue, \
+ &_core_barrier_flush_lock_context \
+ ); \
+ _Thread_queue_Flush_critical( \
+ &( the_barrier )->Wait_queue, \
+ CORE_BARRIER_TQ_OPERATIONS, \
+ _CORE_barrier_Was_deleted, \
+ mp_callout, \
+ mp_id, \
+ &_core_barrier_flush_lock_context \
+ ); \
+ } while ( 0 )
/**
* This function returns true if the automatic release attribute is
diff --git a/cpukit/score/include/rtems/score/coremuteximpl.h b/cpukit/score/include/rtems/score/coremuteximpl.h
index eae6ef1..0f79923 100644
--- a/cpukit/score/include/rtems/score/coremuteximpl.h
+++ b/cpukit/score/include/rtems/score/coremuteximpl.h
@@ -338,20 +338,40 @@ CORE_mutex_Status _CORE_mutex_Do_surrender(
)
#endif
+Thread_Control *_CORE_mutex_Was_deleted(
+ Thread_Control *the_thread,
+ Thread_queue_Control *the_thread_queue,
+ ISR_lock_Context *lock_context
+);
+
+Thread_Control *_CORE_mutex_Unsatisfied_nowait(
+ Thread_Control *the_thread,
+ Thread_queue_Control *the_thread_queue,
+ ISR_lock_Context *lock_context
+);
+
/* Must be a macro due to the multiprocessing dependent parameters */
#define _CORE_mutex_Flush( \
the_mutex, \
- status, \
+ filter, \
mp_callout, \
mp_id \
) \
- _Thread_queue_Flush( \
- &( the_mutex )->Wait_queue, \
- ( the_mutex )->operations, \
- status, \
- mp_callout, \
- mp_id \
- )
+ do { \
+ ISR_lock_Context _core_mutex_flush_lock_context; \
+ _Thread_queue_Acquire( \
+ &( the_mutex )->Wait_queue, \
+ &_core_mutex_flush_lock_context \
+ ); \
+ _Thread_queue_Flush_critical( \
+ &( the_mutex )->Wait_queue, \
+ ( the_mutex )->operations, \
+ filter, \
+ mp_callout, \
+ mp_id, \
+ &_core_mutex_flush_lock_context \
+ ); \
+ } while ( 0 )
/**
* @brief Is mutex locked.
diff --git a/cpukit/score/include/rtems/score/coresemimpl.h b/cpukit/score/include/rtems/score/coresemimpl.h
index d68c3d7..a341f28 100644
--- a/cpukit/score/include/rtems/score/coresemimpl.h
+++ b/cpukit/score/include/rtems/score/coresemimpl.h
@@ -86,19 +86,36 @@ void _CORE_semaphore_Initialize(
uint32_t initial_value
);
+Thread_Control *_CORE_semaphore_Was_deleted(
+ Thread_Control *the_thread,
+ Thread_queue_Control *the_thread_queue,
+ ISR_lock_Context *lock_context
+);
+
+Thread_Control *_CORE_semaphore_Unsatisfied_nowait(
+ Thread_Control *the_thread,
+ Thread_queue_Control *the_thread_queue,
+ ISR_lock_Context *lock_context
+);
+
#define _CORE_semaphore_Destroy( \
the_semaphore, \
mp_callout, \
mp_id \
) \
do { \
- _Thread_queue_Flush( \
+ ISR_lock_Context _core_semaphore_destroy_lock_context; \
+ _Thread_queue_Acquire( \
+ &( the_semaphore )->Wait_queue, \
+ &_core_semaphore_destroy_lock_context \
+ ); \
+ _Thread_queue_Flush_critical( \
&( the_semaphore )->Wait_queue, \
( the_semaphore )->operations, \
- CORE_SEMAPHORE_WAS_DELETED, \
+ _CORE_semaphore_Was_deleted, \
mp_callout, \
mp_id, \
- &_core_semaphore_flush_lock_context \
+ &_core_semaphore_destroy_lock_context \
); \
_Thread_queue_Destroy( &( the_semaphore )->Wait_queue ); \
} while ( 0 )
@@ -190,17 +207,24 @@ RTEMS_INLINE_ROUTINE CORE_semaphore_Status _CORE_semaphore_Do_surrender(
/* Must be a macro due to the multiprocessing dependent parameters */
#define _CORE_semaphore_Flush( \
the_semaphore, \
- CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT, \
mp_callout, \
mp_id \
) \
- _Thread_queue_Flush( \
- &( the_semaphore )->Wait_queue, \
- ( the_semaphore )->operations, \
- CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT, \
- mp_callout, \
- mp_id \
- )
+ do { \
+ ISR_lock_Context _core_semaphore_flush_lock_context; \
+ _Thread_queue_Acquire( \
+ &( the_semaphore )->Wait_queue, \
+ &_core_semaphore_flush_lock_context \
+ ); \
+ _Thread_queue_Flush_critical( \
+ &( the_semaphore )->Wait_queue, \
+ ( the_semaphore )->operations, \
+ _CORE_semaphore_Unsatisfied_nowait, \
+ mp_callout, \
+ mp_id, \
+ &_core_semaphore_flush_lock_context \
+ ); \
+ } while ( 0 )
/**
* This routine returns the current count associated with the semaphore.
diff --git a/cpukit/score/include/rtems/score/threadqimpl.h b/cpukit/score/include/rtems/score/threadqimpl.h
index d56be79..3462b41 100644
--- a/cpukit/score/include/rtems/score/threadqimpl.h
+++ b/cpukit/score/include/rtems/score/threadqimpl.h
@@ -590,15 +590,37 @@ Thread_Control *_Thread_queue_First(
const Thread_queue_Operations *operations
);
-void _Thread_queue_Do_flush(
+/**
+ * @brief Thread queue flush filter function.
+ *
+ * @param the_thread The thread to extract. This is the first parameter to
+ * optimize for architectures that use the same register for the first
+ * parameter and the return value.
+ * @param the_thread_queue The thread queue.
+ * @param lock_context The lock context of the lock acquire. May be used to
+ * pass additional data to the filter function via an overlay structure. The
+ * filter function should not release or acquire the thread queue lock.
+ *
+ * @retval the_thread Extract this thread.
+ * @retval NULL Do not extract this thread and stop the thread queue flush
+ * operation. Threads that are already extracted will complete the flush
+ * operation.
+ */
+typedef Thread_Control *( *Thread_queue_Flush_filter )(
+ Thread_Control *the_thread,
+ Thread_queue_Control *the_thread_queue,
+ ISR_lock_Context *lock_context
+);
+
+size_t _Thread_queue_Do_flush_critical(
Thread_queue_Control *the_thread_queue,
const Thread_queue_Operations *operations,
- uint32_t status
+ Thread_queue_Flush_filter filter,
#if defined(RTEMS_MULTIPROCESSING)
- ,
Thread_queue_MP_callout mp_callout,
- Objects_Id mp_id
+ Objects_Id mp_id,
#endif
+ ISR_lock_Context *lock_context
);
/**
@@ -608,39 +630,44 @@ void _Thread_queue_Do_flush(
*
* @param the_thread_queue The thread queue.
* @param operations The thread queue operations.
- * @param status The return status for the threads.
+ * @param filter The filter functions called for each thread to extract from
+ * the thread queue.
* @param mp_callout Callout to extract the proxy of a remote thread. This
* parameter is only used on multiprocessing configurations.
* @param mp_id Object identifier of the object containing the thread queue.
* This parameter is only used on multiprocessing configurations.
*/
#if defined(RTEMS_MULTIPROCESSING)
- #define _Thread_queue_Flush( \
+ #define _Thread_queue_Flush_critical( \
the_thread_queue, \
operations, \
- status, \
+ filter, \
mp_callout, \
- mp_id \
+ mp_id, \
+ lock_context \
) \
- _Thread_queue_Do_flush( \
+ _Thread_queue_Do_flush_critical( \
the_thread_queue, \
operations, \
- status, \
+ filter, \
mp_callout, \
- mp_id \
+ mp_id, \
+ lock_context \
)
#else
- #define _Thread_queue_Flush( \
+ #define _Thread_queue_Flush_critical( \
the_thread_queue, \
operations, \
- status, \
+ filter, \
mp_callout, \
- mp_id \
+ mp_id, \
+ lock_context \
) \
- _Thread_queue_Do_flush( \
+ _Thread_queue_Do_flush_critical( \
the_thread_queue, \
operations, \
- status \
+ filter, \
+ lock_context \
)
#endif
diff --git a/cpukit/score/src/corebarrier.c b/cpukit/score/src/corebarrier.c
index 5313a0f..555e3c9 100644
--- a/cpukit/score/src/corebarrier.c
+++ b/cpukit/score/src/corebarrier.c
@@ -19,7 +19,6 @@
#endif
#include <rtems/score/corebarrierimpl.h>
-#include <rtems/score/threadqimpl.h>
void _CORE_barrier_Initialize(
CORE_barrier_Control *the_barrier,
@@ -32,3 +31,14 @@ void _CORE_barrier_Initialize(
_Thread_queue_Initialize( &the_barrier->Wait_queue );
}
+
+Thread_Control *_CORE_barrier_Was_deleted(
+ Thread_Control *the_thread,
+ Thread_queue_Control *the_thread_queue,
+ ISR_lock_Context *lock_context
+)
+{
+ the_thread->Wait.return_code = CORE_BARRIER_WAS_DELETED;
+
+ return the_thread;
+}
diff --git a/cpukit/score/src/coremsgclose.c b/cpukit/score/src/coremsgclose.c
index 1511f83..acdc9d7 100644
--- a/cpukit/score/src/coremsgclose.c
+++ b/cpukit/score/src/coremsgclose.c
@@ -21,6 +21,17 @@
#include <rtems/score/coremsgimpl.h>
#include <rtems/score/wkspace.h>
+static Thread_Control *_CORE_message_queue_Was_deleted(
+ Thread_Control *the_thread,
+ Thread_queue_Control *the_thread_queue,
+ ISR_lock_Context *lock_context
+)
+{
+ the_thread->Wait.return_code = CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED;
+
+ return the_thread;
+}
+
void _CORE_message_queue_Do_close(
CORE_message_queue_Control *the_message_queue
#if defined(RTEMS_MULTIPROCESSING)
@@ -30,17 +41,21 @@ void _CORE_message_queue_Do_close(
#endif
)
{
+ ISR_lock_Context lock_context;
+
/*
* This will flush blocked threads whether they were blocked on
* a send or receive.
*/
- _Thread_queue_Flush(
+ _CORE_message_queue_Acquire( the_message_queue, &lock_context );
+ _Thread_queue_Flush_critical(
&the_message_queue->Wait_queue,
the_message_queue->operations,
- CORE_MESSAGE_QUEUE_STATUS_WAS_DELETED,
+ _CORE_message_queue_Was_deleted,
mp_callout,
- mp_id
+ mp_id,
+ &lock_context
);
(void) _Workspace_Free( the_message_queue->message_buffers );
diff --git a/cpukit/score/src/coremsgflush.c b/cpukit/score/src/coremsgflush.c
index f67dcf2..38f26b7 100644
--- a/cpukit/score/src/coremsgflush.c
+++ b/cpukit/score/src/coremsgflush.c
@@ -41,7 +41,7 @@ uint32_t _CORE_message_queue_Flush(
*
* (1) The thread queue of pending senders is a logical extension
* of the pending message queue. In this case, it should be
- * flushed using the _Thread_queue_Flush() service with a status
+ * flushed using the _Thread_queue_Flush_critical() service with a status
* such as CORE_MESSAGE_QUEUE_SENDER_FLUSHED (which currently does
* not exist). This can be implemented without changing the "big-O"
* of the message flushing part of the routine.
diff --git a/cpukit/score/src/coremutex.c b/cpukit/score/src/coremutex.c
index 88d487c..0b07c89 100644
--- a/cpukit/score/src/coremutex.c
+++ b/cpukit/score/src/coremutex.c
@@ -96,3 +96,25 @@ CORE_mutex_Status _CORE_mutex_Initialize(
return CORE_MUTEX_STATUS_SUCCESSFUL;
}
+
+Thread_Control *_CORE_mutex_Was_deleted(
+ Thread_Control *the_thread,
+ Thread_queue_Control *the_thread_queue,
+ ISR_lock_Context *lock_context
+)
+{
+ the_thread->Wait.return_code = CORE_MUTEX_WAS_DELETED;
+
+ return the_thread;
+}
+
+Thread_Control *_CORE_mutex_Unsatisfied_nowait(
+ Thread_Control *the_thread,
+ Thread_queue_Control *the_thread_queue,
+ ISR_lock_Context *lock_context
+)
+{
+ the_thread->Wait.return_code = CORE_MUTEX_STATUS_UNSATISFIED_NOWAIT;
+
+ return the_thread;
+}
diff --git a/cpukit/score/src/coresem.c b/cpukit/score/src/coresem.c
index 2bdd81c..a36a4f7 100644
--- a/cpukit/score/src/coresem.c
+++ b/cpukit/score/src/coresem.c
@@ -36,3 +36,25 @@ void _CORE_semaphore_Initialize(
the_semaphore->operations = &_Thread_queue_Operations_FIFO;
}
}
+
+Thread_Control *_CORE_semaphore_Was_deleted(
+ Thread_Control *the_thread,
+ Thread_queue_Control *the_thread_queue,
+ ISR_lock_Context *lock_context
+)
+{
+ the_thread->Wait.return_code = CORE_SEMAPHORE_WAS_DELETED;
+
+ return the_thread;
+}
+
+Thread_Control *_CORE_semaphore_Unsatisfied_nowait(
+ Thread_Control *the_thread,
+ Thread_queue_Control *the_thread_queue,
+ ISR_lock_Context *lock_context
+)
+{
+ the_thread->Wait.return_code = CORE_SEMAPHORE_STATUS_UNSATISFIED_NOWAIT;
+
+ return the_thread;
+}
diff --git a/cpukit/score/src/threadqflush.c b/cpukit/score/src/threadqflush.c
index c508314..de64a49 100644
--- a/cpukit/score/src/threadqflush.c
+++ b/cpukit/score/src/threadqflush.c
@@ -18,46 +18,82 @@
#include "config.h"
#endif
-#include <rtems/score/threadqimpl.h>
-#include <rtems/score/objectimpl.h>
+#include <rtems/score/threadimpl.h>
-void _Thread_queue_Do_flush(
+size_t _Thread_queue_Do_flush_critical(
Thread_queue_Control *the_thread_queue,
const Thread_queue_Operations *operations,
- uint32_t status
+ Thread_queue_Flush_filter filter,
#if defined(RTEMS_MULTIPROCESSING)
- ,
Thread_queue_MP_callout mp_callout,
- Objects_Id mp_id
+ Objects_Id mp_id,
#endif
+ ISR_lock_Context *lock_context
)
{
- ISR_lock_Context lock_context;
- Thread_Control *the_thread;
-
- _Thread_queue_Acquire( the_thread_queue, &lock_context );
-
- while (
- (
- the_thread = _Thread_queue_First_locked(
- the_thread_queue,
- operations
- )
- )
- ) {
- the_thread->Wait.return_code = status;
-
- _Thread_queue_Extract_critical(
+ size_t flushed;
+ Chain_Control unblock;
+ Chain_Node *node;
+ Chain_Node *tail;
+
+ flushed = 0;
+ _Chain_Initialize_empty( &unblock );
+
+ while ( true ) {
+ Thread_queue_Heads *heads;
+ Thread_Control *first;
+ bool do_unblock;
+
+ heads = the_thread_queue->Queue.heads;
+ if ( heads == NULL ) {
+ break;
+ }
+
+ first = ( *operations->first )( heads );
+ first = ( *filter )( first, the_thread_queue, lock_context );
+ if ( first == NULL ) {
+ break;
+ }
+
+ do_unblock = _Thread_queue_Extract_locked(
&the_thread_queue->Queue,
operations,
- the_thread,
+ first,
mp_callout,
- mp_id,
- &lock_context
+ mp_id
);
+ if ( do_unblock ) {
+ _Chain_Append_unprotected( &unblock, &first->Wait.Node.Chain );
+ }
+
+ ++flushed;
+ }
+
+ node = _Chain_First( &unblock );
+ tail = _Chain_Tail( &unblock );
+
+ if ( node != tail ) {
+ Per_CPU_Control *cpu_self;
+
+ cpu_self = _Thread_Dispatch_disable_critical( lock_context );
+ _Thread_queue_Release( the_thread_queue, lock_context );
+
+ do {
+ Thread_Control *the_thread;
+ Chain_Node *next;
+
+ next = _Chain_Next( node );
+ the_thread = THREAD_CHAIN_NODE_TO_THREAD( node );
+ _Thread_Timer_remove( the_thread );
+ _Thread_Unblock( the_thread );
+
+ node = next;
+ } while ( node != tail );
- _Thread_queue_Acquire( the_thread_queue, &lock_context );
+ _Thread_Dispatch_enable( cpu_self );
+ } else {
+ _Thread_queue_Release( the_thread_queue, lock_context );
}
- _Thread_queue_Release( the_thread_queue, &lock_context );
+ return flushed;
}
--
1.8.4.5
More information about the devel
mailing list