[PATCH 11/12] cpukit/libblock: Workaround SMP problem in bdbuf

Gedare Bloom gedare at rtems.org
Tue May 27 15:21:44 UTC 2014


On Tue, May 27, 2014 at 10:48 AM, Ralf Kirchner
<ralf.kirchner at embedded-brains.de> wrote:
> Enabling and disabling preemption as done for single core will not work for SMP.
> Thus as a temporary workaround use POSIX mutexes and POSIX condition variables for SMP instead of the combination of semaphores and preemption handling used for single core.
> ---
>  cpukit/libblock/src/bdbuf.c |  348 +++++++++++++++++++++++++++++++++++--------
>  1 Datei geändert, 282 Zeilen hinzugefügt(+), 66 Zeilen entfernt(-)
>
> diff --git a/cpukit/libblock/src/bdbuf.c b/cpukit/libblock/src/bdbuf.c
> index 1e4887c..0caf8f5 100644
> --- a/cpukit/libblock/src/bdbuf.c
> +++ b/cpukit/libblock/src/bdbuf.c
> @@ -43,6 +43,23 @@
>
>  #include "rtems/bdbuf.h"
>
> +#if defined( RTEMS_SMP )
> +  #if defined( RTEMS_POSIX_API )
> +    /* The single core implementation with enabling and disabling preemption
> +     * will not work under SMP. Thus we need to use POSIX mutexes and POSIX
> +     * condition variables as a workaround
> +     * Required POSIX mutexes and POSIX condition variables will be allocated
> +     * automatically in confdefs.h if RTEMS_SMP and RTEMS_POSIX_API
> +     * are #defined
> +     */
> +    #define RTEMS_BDBUF_SMP_WORKAROUND
Workaround implies a temporary solution. Is there a longer-term
solution in the works? Otherwise, we should just condition this on
RTEMS_SMP?

What if RTEMS_SMP is defined, but RTEMS_POSIX_API is not? This would
be an error here I suppose? Or possibly elsewhere in the bdbuf
workaround code? Probably it should be accounted in confdefs.h and an
error emitted?

> +  #endif /* defined( RTEMS_POSIX_API ) */
> +#endif /* defined( RTEMS_SMP ) */
> +
> +#if defined( RTEMS_BDBUF_SMP_WORKAROUND )
> +  #include <rtems/posix/priorityimpl.h>
> +#endif /* defined( RTEMS_BDBUF_SMP_WORKAROUND ) */
> +
>  #define BDBUF_INVALID_DEV NULL
>
>  /*
> @@ -77,12 +94,22 @@ typedef struct rtems_bdbuf_swapout_worker
>                                            * thread. */
>  } rtems_bdbuf_swapout_worker;
>
> +#if defined( RTEMS_BDBUF_SMP_WORKAROUND )
> +  typedef pthread_mutex_t rtems_bdbuf_lock_type;
> +#else
> +  typedef rtems_id rtems_bdbuf_lock_type;
> +#endif /* defined( BDBUF_SMP_WORKAROUND ) */
> +
>  /**
>   * Buffer waiters synchronization.
>   */
>  typedef struct rtems_bdbuf_waiters {
> -  unsigned count;
> -  rtems_id sema;
> +  unsigned       count;
> +#if defined( RTEMS_BDBUF_SMP_WORKAROUND )
> +  pthread_cond_t cond_var;
> +#else
> +  rtems_id       sema;
> +#endif /* defined( RTEMS_BDBUF_SMP_WORKAROUND ) */
>  } rtems_bdbuf_waiters;
>
>  /**
> @@ -106,9 +133,9 @@ typedef struct rtems_bdbuf_cache
>                                            * buffer size that fit in a group. */
>    uint32_t            flags;             /**< Configuration flags. */
>
> -  rtems_id            lock;              /**< The cache lock. It locks all
> +  rtems_bdbuf_lock_type lock;            /**< The cache lock. It locks all
>                                            * cache data, BD and lists. */
> -  rtems_id            sync_lock;         /**< Sync calls block writes. */
> +  rtems_bdbuf_lock_type sync_lock;       /**< Sync calls block writes. */
>    bool                sync_active;       /**< True if a sync is active. */
>    rtems_id            sync_requester;    /**< The sync requester. */
>    rtems_disk_device  *sync_device;       /**< The device to sync and
> @@ -231,6 +258,138 @@ static rtems_task rtems_bdbuf_read_ahead_task(rtems_task_argument arg);
>   */
>  static rtems_bdbuf_cache bdbuf_cache;
>
> +static rtems_status_code rtems_bdbuf_lock_create(
> +  rtems_name             name,
> +  uint32_t               count,
> +  rtems_attribute        attribute_set,
> +  rtems_task_priority    priority_ceiling,
> +  rtems_bdbuf_lock_type *lock
> +)
> +{
> +  rtems_status_code sc = RTEMS_SUCCESSFUL;
> +#if defined( RTEMS_BDBUF_SMP_WORKAROUND )
> +  {
We don't usually add arbitrary scoping blocks like this, so I would
omit the extra {} unless suitable justification to do otherwise is
provided, in which case we should consider it for our coding
conventions. This is done a few more times below.

> +    int eno;
> +    pthread_mutexattr_t attr;
> +
> +    (void)name;
> +    (void)priority_ceiling;
> +
> +    eno = pthread_mutexattr_init (&attr);
> +    if (eno == 0) {
> +      if ((attribute_set & RTEMS_INHERIT_PRIORITY) != 0) {
> +        eno = pthread_mutexattr_setprotocol (
> +          &attr,
> +          PTHREAD_PRIO_INHERIT
> +        );
> +      }
> +      if (eno == 0) {
> +        eno = pthread_mutexattr_settype (
> +          &attr,
> +          PTHREAD_MUTEX_RECURSIVE
> +        );
> +      }
> +      if (eno == 0) {
> +        eno = pthread_mutex_init (lock, &attr);
> +      }
> +      if (eno == 0) {
> +        if (count == 0) {
> +          eno = pthread_mutex_lock (lock);
> +          if (eno == EBUSY) {
> +            eno = 0;
> +          }
> +        }
> +      }
> +      (void)pthread_mutexattr_destroy (&attr);
> +    }
> +
> +    if (eno != 0) {
> +      sc = RTEMS_UNSATISFIED;
> +    }
> +  }
> +#else
> +  sc = rtems_semaphore_create(
> +    name,
> +    count,
> +    attribute_set,
> +    priority_ceiling,
> +    lock
> +  );
> +#endif /* defined( RTEMS_BDBUF_SMP_WORKAROUND ) */
> +  return sc;
> +}
> +
> +
Delete extra newline here.

> +static rtems_status_code rtems_bdbuf_lock_delete(
> +  rtems_bdbuf_lock_type *lock
> +)
> +{
> +#if defined( RTEMS_BDBUF_SMP_WORKAROUND )
> +  {
> +    int eno = pthread_mutex_destroy (lock);
> +    if (eno != 0) {
> +      return RTEMS_UNSATISFIED;
> +    } else {
> +      return RTEMS_SUCCESSFUL;
> +    }
> +  }
> +#else
> +  return rtems_semaphore_delete( *lock );
> +#endif /* defined( RTEMS_BDBUF_SMP_WORKAROUND ) */
> +}
> +
> +static rtems_status_code rtems_bdbuf_waiter_create(
> +  rtems_name           name,
> +  uint32_t             count,
> +  rtems_attribute      attribute_set,
> +  rtems_task_priority  priority_ceiling,
> +  rtems_bdbuf_waiters *waiter
> +)
> +{
> +  rtems_status_code sc = RTEMS_SUCCESSFUL;
> +#if defined( RTEMS_BDBUF_SMP_WORKAROUND )
> +  {
> +    int eno = pthread_cond_init (&waiter->cond_var, NULL);
> +
> +    if (eno != 0) {
> +      sc = RTEMS_UNSATISFIED;
> +    }
> +  }
> +#else
> +  sc = rtems_semaphore_create(
> +    name,
> +    count,
> +    attribute_set,
> +    priority_ceiling,
> +    &waiter->sema
> +  );
> +#endif /* defined( RTEMS_BDBUF_SMP_WORKAROUND ) */
> +  return sc;
> +}
> +
> +static rtems_status_code rtems_bdbuf_waiter_delete(
> +  rtems_bdbuf_waiters *waiter
> +)
> +{
> +  rtems_status_code sc = RTEMS_SUCCESSFUL;
> +
> +#if defined( RTEMS_BDBUF_SMP_WORKAROUND )
> +  {
> +    int eno = pthread_cond_destroy (&waiter->cond_var);
> +
> +    if (eno != 0) {
> +      sc = RTEMS_UNSATISFIED;
> +    }
> +  }
> +#else
> +  sc = rtems_semaphore_delete(
> +    waiter->sema
> +  );
> +#endif /* defined( RTEMS_BDBUF_SMP_WORKAROUND ) */
> +
> +  return sc;
> +}
> +
>  #if RTEMS_BDBUF_TRACE
>  /**
>   * If true output the trace message.
> @@ -845,13 +1004,28 @@ rtems_bdbuf_media_block (const rtems_disk_device *dd, rtems_blkdev_bnum block)
>   * @param fatal_error_code The error code if the call fails.
>   */
>  static void
> -rtems_bdbuf_lock (rtems_id lock, uint32_t fatal_error_code)
> +rtems_bdbuf_lock (rtems_bdbuf_lock_type *lock, uint32_t fatal_error_code)
>  {
> -  rtems_status_code sc = rtems_semaphore_obtain (lock,
> -                                                 RTEMS_WAIT,
> -                                                 RTEMS_NO_TIMEOUT);
> -  if (sc != RTEMS_SUCCESSFUL)
> -    rtems_bdbuf_fatal (fatal_error_code);
> +#ifndef RTEMS_BDBUF_SMP_WORKAROUND
It is a little easier to read the code if the pattern of #ifdef ...
#else ... #endif use the same condition, e.g. always use ifdef
RTEMS_BDBUF_SMP_WORKAROUND rather than switching between ifdef and
ifndef.

> +  {
> +    rtems_status_code sc = rtems_semaphore_obtain (*lock,
> +                                                   RTEMS_WAIT,
> +                                                   RTEMS_NO_TIMEOUT);
> +    if (sc != RTEMS_SUCCESSFUL)
> +      rtems_bdbuf_fatal (fatal_error_code);
> +  }
> +#else
> +  {
> +    int eno = pthread_mutex_lock (lock);
> +    if (eno == EBUSY) {
> +      eno = 0;
> +    }
> +
> +    if (eno != 0) {
> +      rtems_bdbuf_fatal (fatal_error_code);
> +    }
> +  }
> +#endif /* #ifndef RTEMS_BDBUF_SMP_WORKAROUND */
>  }
>
>  /**
> @@ -861,11 +1035,23 @@ rtems_bdbuf_lock (rtems_id lock, uint32_t fatal_error_code)
>   * @param fatal_error_code The error code if the call fails.
>   */
>  static void
> -rtems_bdbuf_unlock (rtems_id lock, uint32_t fatal_error_code)
> +rtems_bdbuf_unlock (rtems_bdbuf_lock_type *lock, uint32_t fatal_error_code)
>  {
> -  rtems_status_code sc = rtems_semaphore_release (lock);
> -  if (sc != RTEMS_SUCCESSFUL)
> -    rtems_bdbuf_fatal (fatal_error_code);
> +#ifndef RTEMS_BDBUF_SMP_WORKAROUND
> +  {
> +    rtems_status_code sc = rtems_semaphore_release (*lock);
> +    if (sc != RTEMS_SUCCESSFUL)
> +      rtems_bdbuf_fatal (fatal_error_code);
> +  }
> +#else
> +  {
> +    int eno = pthread_mutex_unlock (lock);
> +
> +    if (eno != 0) {
> +      rtems_bdbuf_fatal (fatal_error_code);
> +    }
> +  }
> +#endif /* #ifndef RTEMS_BDBUF_SMP_WORKAROUND */
>  }
>
>  /**
> @@ -874,7 +1060,7 @@ rtems_bdbuf_unlock (rtems_id lock, uint32_t fatal_error_code)
>  static void
>  rtems_bdbuf_lock_cache (void)
>  {
> -  rtems_bdbuf_lock (bdbuf_cache.lock, RTEMS_BDBUF_FATAL_CACHE_LOCK);
> +  rtems_bdbuf_lock (&bdbuf_cache.lock, RTEMS_BDBUF_FATAL_CACHE_LOCK);
>  }
>
>  /**
> @@ -883,7 +1069,7 @@ rtems_bdbuf_lock_cache (void)
>  static void
>  rtems_bdbuf_unlock_cache (void)
>  {
> -  rtems_bdbuf_unlock (bdbuf_cache.lock, RTEMS_BDBUF_FATAL_CACHE_UNLOCK);
> +  rtems_bdbuf_unlock (&bdbuf_cache.lock, RTEMS_BDBUF_FATAL_CACHE_UNLOCK);
>  }
>
>  /**
> @@ -892,7 +1078,7 @@ rtems_bdbuf_unlock_cache (void)
>  static void
>  rtems_bdbuf_lock_sync (void)
>  {
> -  rtems_bdbuf_lock (bdbuf_cache.sync_lock, RTEMS_BDBUF_FATAL_SYNC_LOCK);
> +  rtems_bdbuf_lock (&bdbuf_cache.sync_lock, RTEMS_BDBUF_FATAL_SYNC_LOCK);
>  }
>
>  /**
> @@ -901,7 +1087,7 @@ rtems_bdbuf_lock_sync (void)
>  static void
>  rtems_bdbuf_unlock_sync (void)
>  {
> -  rtems_bdbuf_unlock (bdbuf_cache.sync_lock,
> +  rtems_bdbuf_unlock (&bdbuf_cache.sync_lock,
>                        RTEMS_BDBUF_FATAL_SYNC_UNLOCK);
>  }
>
> @@ -917,6 +1103,7 @@ rtems_bdbuf_group_release (rtems_bdbuf_buffer *bd)
>    --bd->group->users;
>  }
>
> +#ifndef RTEMS_BDBUF_SMP_WORKAROUND
>  static rtems_mode
>  rtems_bdbuf_disable_preemption (void)
>  {
> @@ -939,6 +1126,7 @@ rtems_bdbuf_restore_preemption (rtems_mode prev_mode)
>    if (sc != RTEMS_SUCCESSFUL)
>      rtems_bdbuf_fatal (RTEMS_BDBUF_FATAL_PREEMPT_RST);
>  }
> +#endif /* #ifndef RTEMS_BDBUF_SMP_WORKAROUND */
>
>  /**
>   * Wait until woken. Semaphores are used so a number of tasks can wait and can
> @@ -958,42 +1146,56 @@ rtems_bdbuf_restore_preemption (rtems_mode prev_mode)
>  static void
>  rtems_bdbuf_anonymous_wait (rtems_bdbuf_waiters *waiters)
>  {
> -  rtems_status_code sc;
> -  rtems_mode        prev_mode;
> -
>    /*
>     * Indicate we are waiting.
>     */
>    ++waiters->count;
>
> -  /*
> -   * Disable preemption then unlock the cache and block.  There is no POSIX
> -   * condition variable in the core API so this is a work around.
> -   *
> -   * The issue is a task could preempt after the cache is unlocked because it is
> -   * blocking or just hits that window, and before this task has blocked on the
> -   * semaphore. If the preempting task flushes the queue this task will not see
> -   * the flush and may block for ever or until another transaction flushes this
> -   * semaphore.
> -   */
> -  prev_mode = rtems_bdbuf_disable_preemption ();
> +#ifndef RTEMS_BDBUF_SMP_WORKAROUND
> +  {
> +    rtems_status_code sc;
> +    rtems_mode        prev_mode;
> +    /*
> +     * Disable preemption then unlock the cache and block.  There is no POSIX
> +     * condition variable in the core API so this is a work around.
> +     *
> +     * The issue is a task could preempt after the cache is unlocked because it is
> +     * blocking or just hits that window, and before this task has blocked on the
> +     * semaphore. If the preempting task flushes the queue this task will not see
> +     * the flush and may block for ever or until another transaction flushes this
> +     * semaphore.
> +     */
> +    prev_mode = rtems_bdbuf_disable_preemption();
>
> -  /*
> -   * Unlock the cache, wait, and lock the cache when we return.
> -   */
> -  rtems_bdbuf_unlock_cache ();
> +    /*
> +     * Unlock the cache, wait, and lock the cache when we return.
> +     */
> +    rtems_bdbuf_unlock_cache ();
>
> -  sc = rtems_semaphore_obtain (waiters->sema, RTEMS_WAIT, RTEMS_BDBUF_WAIT_TIMEOUT);
> +    sc = rtems_semaphore_obtain (waiters->sema, RTEMS_WAIT, RTEMS_BDBUF_WAIT_TIMEOUT);
>
> -  if (sc == RTEMS_TIMEOUT)
> -    rtems_bdbuf_fatal (RTEMS_BDBUF_FATAL_CACHE_WAIT_TO);
> +    if (sc == RTEMS_TIMEOUT)
> +      rtems_bdbuf_fatal (RTEMS_BDBUF_FATAL_CACHE_WAIT_TO);
>
> -  if (sc != RTEMS_UNSATISFIED)
> -    rtems_bdbuf_fatal (RTEMS_BDBUF_FATAL_CACHE_WAIT_2);
> +    if (sc != RTEMS_UNSATISFIED)
> +      rtems_bdbuf_fatal (RTEMS_BDBUF_FATAL_CACHE_WAIT_2);
>
> -  rtems_bdbuf_lock_cache ();
> +    rtems_bdbuf_lock_cache ();
> +
> +    rtems_bdbuf_restore_preemption (prev_mode);
> +  }
> +#else
> +  {
> +    int eno = pthread_cond_wait (
> +      &waiters->cond_var,
> +      &bdbuf_cache.lock
> +    );
>
> -  rtems_bdbuf_restore_preemption (prev_mode);
> +    if (eno != 0) {
> +      rtems_bdbuf_fatal (RTEMS_BDBUF_FATAL_CACHE_WAIT_TO);
> +    }
> +  }
> +#endif /* #ifndef RTEMS_BDBUF_SMP_WORKAROUND */
>
>    --waiters->count;
>  }
> @@ -1013,13 +1215,23 @@ rtems_bdbuf_wait (rtems_bdbuf_buffer *bd, rtems_bdbuf_waiters *waiters)
>   * there are any waiters.
>   */
>  static void
> -rtems_bdbuf_wake (const rtems_bdbuf_waiters *waiters)
> +rtems_bdbuf_wake (rtems_bdbuf_waiters *waiters)
>  {
>    rtems_status_code sc = RTEMS_SUCCESSFUL;
>
>    if (waiters->count > 0)
>    {
> +#ifndef RTEMS_BDBUF_SMP_WORKAROUND
>      sc = rtems_semaphore_flush (waiters->sema);
> +#else
> +    {
> +      int eno = pthread_cond_broadcast (&waiters->cond_var);
> +
> +      if (eno != 0) {
> +        sc = RTEMS_UNSATISFIED;
> +      }
> +    }
> +#endif /* #ifndef RTEMS_BDBUF_SMP_WORKAROUND */
>      if (sc != RTEMS_SUCCESSFUL)
>        rtems_bdbuf_fatal (RTEMS_BDBUF_FATAL_CACHE_WAKE);
>    }
> @@ -1411,6 +1623,7 @@ rtems_bdbuf_init_once (void)
>    size_t              b;
>    size_t              cache_aligment;
>    rtems_status_code   sc;
> +  rtems_status_code   sc_lock;
>
>    if (rtems_bdbuf_tracer)
>      printf ("bdbuf:init\n");
> @@ -1453,35 +1666,38 @@ rtems_bdbuf_init_once (void)
>    /*
>     * Create the locks for the cache.
>     */
> -  sc = rtems_semaphore_create (rtems_build_name ('B', 'D', 'C', 'l'),
> -                               1, RTEMS_BDBUF_CACHE_LOCK_ATTRIBS, 0,
> -                               &bdbuf_cache.lock);
> -  if (sc != RTEMS_SUCCESSFUL)
> +
> +  sc_lock = rtems_bdbuf_lock_create (rtems_build_name ('B', 'D', 'C', 'l'),
> +                                     1, RTEMS_BDBUF_CACHE_LOCK_ATTRIBS, 0,
> +                                     &bdbuf_cache.lock);
> +  if (sc_lock != RTEMS_SUCCESSFUL) {
> +    sc = sc_lock;
>      goto error;
> +  }
>
>    rtems_bdbuf_lock_cache ();
>
> -  sc = rtems_semaphore_create (rtems_build_name ('B', 'D', 'C', 's'),
> -                               1, RTEMS_BDBUF_CACHE_LOCK_ATTRIBS, 0,
> -                               &bdbuf_cache.sync_lock);
> +  sc = rtems_bdbuf_lock_create (rtems_build_name ('B', 'D', 'C', 's'),
> +                                     1, RTEMS_BDBUF_CACHE_LOCK_ATTRIBS, 0,
> +                                     &bdbuf_cache.sync_lock);
>    if (sc != RTEMS_SUCCESSFUL)
>      goto error;
>
> -  sc = rtems_semaphore_create (rtems_build_name ('B', 'D', 'C', 'a'),
> -                               0, RTEMS_BDBUF_CACHE_WAITER_ATTRIBS, 0,
> -                               &bdbuf_cache.access_waiters.sema);
> +  sc = rtems_bdbuf_waiter_create (rtems_build_name ('B', 'D', 'C', 'a'),
> +                                  0, RTEMS_BDBUF_CACHE_WAITER_ATTRIBS, 0,
> +                                  &bdbuf_cache.access_waiters);
>    if (sc != RTEMS_SUCCESSFUL)
>      goto error;
>
> -  sc = rtems_semaphore_create (rtems_build_name ('B', 'D', 'C', 't'),
> -                               0, RTEMS_BDBUF_CACHE_WAITER_ATTRIBS, 0,
> -                               &bdbuf_cache.transfer_waiters.sema);
> +  sc = rtems_bdbuf_waiter_create (rtems_build_name ('B', 'D', 'C', 't'),
> +                                  0, RTEMS_BDBUF_CACHE_WAITER_ATTRIBS, 0,
> +                                  &bdbuf_cache.transfer_waiters);
>    if (sc != RTEMS_SUCCESSFUL)
>      goto error;
>
> -  sc = rtems_semaphore_create (rtems_build_name ('B', 'D', 'C', 'b'),
> -                               0, RTEMS_BDBUF_CACHE_WAITER_ATTRIBS, 0,
> -                               &bdbuf_cache.buffer_waiters.sema);
> +  sc = rtems_bdbuf_waiter_create (rtems_build_name ('B', 'D', 'C', 'b'),
> +                                  0, RTEMS_BDBUF_CACHE_WAITER_ATTRIBS, 0,
> +                                  &bdbuf_cache.buffer_waiters);
>    if (sc != RTEMS_SUCCESSFUL)
>      goto error;
>
> @@ -1641,15 +1857,15 @@ error:
>    free (bdbuf_cache.swapout_transfer);
>    free (bdbuf_cache.swapout_workers);
>
> -  rtems_semaphore_delete (bdbuf_cache.buffer_waiters.sema);
> -  rtems_semaphore_delete (bdbuf_cache.access_waiters.sema);
> -  rtems_semaphore_delete (bdbuf_cache.transfer_waiters.sema);
> -  rtems_semaphore_delete (bdbuf_cache.sync_lock);
> +  (void)rtems_bdbuf_waiter_delete (&bdbuf_cache.buffer_waiters);
> +  (void)rtems_bdbuf_waiter_delete (&bdbuf_cache.access_waiters);
> +  (void)rtems_bdbuf_waiter_delete (&bdbuf_cache.transfer_waiters);
> +  (void)rtems_bdbuf_lock_delete (&bdbuf_cache.sync_lock);
>
> -  if (bdbuf_cache.lock != 0)
> +  if (sc_lock == RTEMS_SUCCESSFUL)
>    {
>      rtems_bdbuf_unlock_cache ();
> -    rtems_semaphore_delete (bdbuf_cache.lock);
> +    (void)rtems_bdbuf_lock_delete (&bdbuf_cache.lock);
>    }
>
>    rtems_bdbuf_data.return_status = RTEMS_UNSATISFIED;
> --
> 1.7.10.4
>
> _______________________________________________
> rtems-devel mailing list
> rtems-devel at rtems.org
> http://www.rtems.org/mailman/listinfo/rtems-devel




More information about the devel mailing list