LCOV - code coverage report
Current view: top level - lib/ring - soring.c (source / functions) Hit Total Coverage
Test: Code coverage Lines: 117 144 81.2 %
Date: 2025-02-01 18:54:23 Functions: 14 16 87.5 %
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: 112 400 28.0 %

           Branch data     Line data    Source code
       1                 :            : /* SPDX-License-Identifier: BSD-3-Clause
       2                 :            :  * Copyright(c) 2024 Huawei Technologies Co., Ltd
       3                 :            :  */
       4                 :            : 
       5                 :            : /**
       6                 :            :  * @file
       7                 :            :  * This file contains implementation of SORING 'datapath' functions.
       8                 :            :  *
       9                 :            :  * Brief description:
      10                 :            :  * ==================
      11                 :            :  * enqueue/dequeue works the same as for conventional rte_ring:
      12                 :            :  * any rte_ring sync types can be used, etc.
      13                 :            :  * Plus there could be multiple 'stages'.
      14                 :            :  * For each stage there is an acquire (start) and release (finish) operation.
      15                 :            :  * After some elems are 'acquired' - user can safely assume that he has
      16                 :            :  * exclusive possession of these elems till 'release' for them is done.
      17                 :            :  * Note that right now user has to release exactly the same number of elems
      18                 :            :  * he acquired before.
      19                 :            :  * After 'release', elems can be 'acquired' by next stage and/or dequeued
      20                 :            :  * (in case of last stage).
      21                 :            :  *
      22                 :            :  * Internal structure:
      23                 :            :  * ===================
      24                 :            :  * In addition to 'normal' ring of elems, it also has a ring of states of the
      25                 :            :  * same size. Each state[] corresponds to exactly one elem[].
      26                 :            :  * state[] will be used by acquire/release/dequeue functions to store internal
      27                 :            :  * information and should not be accessed by the user directly.
      28                 :            :  *
      29                 :            :  * How it works:
      30                 :            :  * =============
      31                 :            :  * 'acquire()' just moves stage's head (same as rte_ring move_head does),
      32                 :            :  * plus it saves in state[stage.cur_head] information about how many elems
      33                 :            :  * were acquired, current head position and special flag value to indicate
      34                 :            :  * that elems are acquired (SORING_ST_START).
      35                 :            :  * Note that 'acquire()' returns to the user a special 'ftoken' that user has
      36                 :            :  * to provide for 'release()' (in fact it is just a position for current head
      37                 :            :  * plus current stage index).
      38                 :            :  * 'release()' extracts old head value from provided ftoken and checks that
      39                 :            :  * corresponding 'state[]' contains expected values(mostly for sanity
      40                 :            :  * purposes).
      41                 :            :  * Then it marks this state[] with 'SORING_ST_FINISH' flag to indicate
      42                 :            :  * that given subset of objects was released.
      43                 :            :  * After that, it checks does old head value equals to current tail value?
      44                 :            :  * If yes, then it performs  'finalize()' operation, otherwise 'release()'
      45                 :            :  * just returns (without spinning on stage tail value).
      46                 :            :  * As updated state[] is shared by all threads, some other thread can do
      47                 :            :  * 'finalize()' for given stage.
      48                 :            :  * That allows 'release()' to avoid excessive waits on the tail value.
      49                 :            :  * Main purpose of 'finalize()' operation is to walk through 'state[]'
      50                 :            :  * from current stage tail up to its head, check state[] and move stage tail
      51                 :            :  * through elements that already are in SORING_ST_FINISH state.
      52                 :            :  * Along with that, corresponding state[] values are reset to zero.
      53                 :            :  * Note that 'finalize()' for given stage can be done from multiple places:
      54                 :            :  * 'release()' for that stage or from 'acquire()' for next stage
      55                 :            :  * even from consumer's 'dequeue()' - in case given stage is the last one.
      56                 :            :  * So 'finalize()' has to be MT-safe and inside it we have to
      57                 :            :  * guarantee that only one thread will update state[] and stage's tail values.
      58                 :            :  */
      59                 :            : 
      60                 :            : #include "soring.h"
      61                 :            : 
      62                 :            : /*
      63                 :            :  * Inline functions (fastpath) start here.
      64                 :            :  */
      65                 :            : static __rte_always_inline uint32_t
      66                 :            : __rte_soring_stage_finalize(struct soring_stage_headtail *sht, uint32_t stage,
      67                 :            :         union soring_state *rstate, uint32_t rmask, uint32_t maxn)
      68                 :            : {
      69                 :            :         int32_t rc;
      70                 :            :         uint32_t ftkn, head, i, idx, k, n, tail;
      71                 :            :         union soring_stage_tail nt, ot;
      72                 :            :         union soring_state st;
      73                 :            : 
      74                 :            :         /* try to grab exclusive right to update tail value */
      75                 :      20010 :         ot.raw = rte_atomic_load_explicit(&sht->tail.raw,
      76                 :            :                         rte_memory_order_acquire);
      77                 :            : 
      78                 :            :         /* other thread already finalizing it for us */
      79                 :      20010 :         if (ot.sync != 0)
      80                 :            :                 return 0;
      81                 :            : 
      82                 :      20010 :         nt.pos = ot.pos;
      83                 :      20010 :         nt.sync = 1;
      84                 :      20010 :         rc = rte_atomic_compare_exchange_strong_explicit(&sht->tail.raw,
      85                 :            :                 (uint64_t *)(uintptr_t)&ot.raw, nt.raw,
      86                 :            :                 rte_memory_order_release, rte_memory_order_relaxed);
      87                 :            : 
      88                 :            :         /* other thread won the race */
      89   [ +  -  +  -  :      20010 :         if (rc == 0)
          -  -  -  -  -  
          -  -  -  -  -  
          +  -  -  -  +  
                      - ]
      90                 :            :                 return 0;
      91                 :            : 
      92                 :            :         /* Ensure the head is read before rstate[] */
      93                 :      20010 :         head = rte_atomic_load_explicit(&sht->head, rte_memory_order_relaxed);
      94                 :            :         rte_atomic_thread_fence(rte_memory_order_acquire);
      95                 :            : 
      96                 :            :         /*
      97                 :            :          * start with current tail and walk through states that are
      98                 :            :          * already finished.
      99                 :            :          */
     100                 :            : 
     101                 :      20010 :         n = RTE_MIN(head - ot.pos, maxn);
     102   [ +  +  +  +  :      30015 :         for (i = 0, tail = ot.pos; i < n; i += k, tail += k) {
          -  -  -  -  -  
          -  -  -  -  -  
          -  +  -  -  -  
                      + ]
     103                 :            : 
     104                 :      10005 :                 idx = tail & rmask;
     105                 :      10005 :                 ftkn = SORING_FTKN_MAKE(tail, stage);
     106                 :            : 
     107                 :      10005 :                 st.raw = rte_atomic_load_explicit(&rstate[idx].raw,
     108                 :            :                         rte_memory_order_relaxed);
     109   [ +  -  +  -  :      10005 :                 if ((st.stnum & SORING_ST_MASK) != SORING_ST_FINISH ||
          +  -  +  -  -  
          -  -  -  -  -  
          -  -  -  -  -  
          -  -  -  -  -  
          -  -  -  -  -  
          -  -  -  -  -  
          -  -  -  -  -  
                      - ]
     110                 :            :                                 st.ftoken != ftkn)
     111                 :            :                         break;
     112                 :            : 
     113                 :      10005 :                 k = st.stnum & ~SORING_ST_MASK;
     114                 :      10005 :                 rte_atomic_store_explicit(&rstate[idx].raw, 0,
     115                 :            :                                 rte_memory_order_relaxed);
     116                 :            :         }
     117                 :            : 
     118                 :            : 
     119                 :            :         /* release exclusive right to update along with new tail value */
     120                 :      20010 :         ot.pos = tail;
     121                 :      20010 :         rte_atomic_store_explicit(&sht->tail.raw, ot.raw,
     122                 :            :                         rte_memory_order_release);
     123                 :            : 
     124                 :      20010 :         return i;
     125                 :            : }
     126                 :            : 
     127                 :            : static __rte_always_inline uint32_t
     128                 :            : __rte_soring_move_prod_head(struct rte_soring *r, uint32_t num,
     129                 :            :         enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
     130                 :            :         uint32_t *head, uint32_t *next, uint32_t *free)
     131                 :            : {
     132                 :            :         uint32_t n;
     133                 :            : 
     134                 :          6 :         switch (st) {
     135                 :          6 :         case RTE_RING_SYNC_ST:
     136                 :            :         case RTE_RING_SYNC_MT:
     137                 :          6 :                 n = __rte_ring_headtail_move_head(&r->prod.ht, &r->cons.ht,
     138                 :            :                         r->capacity, st, num, behavior, head, next, free);
     139                 :            :                 break;
     140                 :          0 :         case RTE_RING_SYNC_MT_RTS:
     141                 :          0 :                 n = __rte_ring_rts_move_head(&r->prod.rts, &r->cons.ht,
     142                 :            :                         r->capacity, num, behavior, head, free);
     143                 :            :                 *next = *head + n;
     144                 :          0 :                 break;
     145                 :          0 :         case RTE_RING_SYNC_MT_HTS:
     146                 :          0 :                 n = __rte_ring_hts_move_head(&r->prod.hts, &r->cons.ht,
     147                 :            :                         r->capacity, num, behavior, head, free);
     148                 :          0 :                 *next = *head + n;
     149                 :          0 :                 break;
     150                 :            :         default:
     151                 :            :                 /* unsupported mode, shouldn't be here */
     152                 :            :                 RTE_ASSERT(0);
     153                 :            :                 *free = 0;
     154                 :            :                 n = 0;
     155                 :            :         }
     156                 :            : 
     157                 :            :         return n;
     158                 :            : }
     159                 :            : 
     160                 :            : static __rte_always_inline uint32_t
     161                 :            : __rte_soring_move_cons_head(struct rte_soring *r, uint32_t stage, uint32_t num,
     162                 :            :         enum rte_ring_queue_behavior behavior, enum rte_ring_sync_type st,
     163                 :            :         uint32_t *head, uint32_t *next, uint32_t *avail)
     164                 :            : {
     165                 :            :         uint32_t n;
     166                 :            : 
     167   [ -  -  -  -  :          3 :         switch (st) {
          +  -  -  -  -  
          -  -  -  -  -  
                   -  - ]
     168                 :      10011 :         case RTE_RING_SYNC_ST:
     169                 :            :         case RTE_RING_SYNC_MT:
     170                 :            :                 n = __rte_ring_headtail_move_head(&r->cons.ht,
     171                 :      10011 :                         &r->stage[stage].ht, 0, st, num, behavior,
     172                 :            :                         head, next, avail);
     173                 :            :                 break;
     174                 :          0 :         case RTE_RING_SYNC_MT_RTS:
     175                 :          0 :                 n = __rte_ring_rts_move_head(&r->cons.rts, &r->stage[stage].ht,
     176                 :            :                         0, num, behavior, head, avail);
     177                 :          0 :                 *next = *head + n;
     178                 :          0 :                 break;
     179                 :          0 :         case RTE_RING_SYNC_MT_HTS:
     180                 :          0 :                 n = __rte_ring_hts_move_head(&r->cons.hts, &r->stage[stage].ht,
     181                 :            :                         0, num, behavior, head, avail);
     182                 :          0 :                 *next = *head + n;
     183                 :          0 :                 break;
     184                 :            :         default:
     185                 :            :                 /* unsupported mode, shouldn't be here */
     186                 :            :                 RTE_ASSERT(0);
     187                 :            :                 *avail = 0;
     188                 :            :                 n = 0;
     189                 :            :         }
     190                 :            : 
     191                 :            :         return n;
     192                 :            : }
     193                 :            : 
     194                 :            : static __rte_always_inline void
     195                 :            : __rte_soring_update_tail(struct __rte_ring_headtail *rht,
     196                 :            :         enum rte_ring_sync_type st, uint32_t head, uint32_t next, uint32_t enq)
     197                 :            : {
     198                 :            :         uint32_t n;
     199                 :            : 
     200   [ +  -  -  -  :          9 :         switch (st) {
          +  -  -  -  +  
          -  -  -  -  -  
          -  -  +  -  -  
          -  +  -  -  -  
          -  -  -  -  -  
                -  -  - ]
     201                 :            :         case RTE_RING_SYNC_ST:
     202                 :            :         case RTE_RING_SYNC_MT:
     203                 :            :                 __rte_ring_update_tail(&rht->ht, head, next, st, enq);
     204                 :            :                 break;
     205                 :          0 :         case RTE_RING_SYNC_MT_RTS:
     206                 :            :                 __rte_ring_rts_update_tail(&rht->rts);
     207                 :            :                 break;
     208                 :          0 :         case RTE_RING_SYNC_MT_HTS:
     209                 :            :                 n = next - head;
     210                 :            :                 __rte_ring_hts_update_tail(&rht->hts, head, n, enq);
     211                 :            :                 break;
     212                 :            :         default:
     213                 :            :                 /* unsupported mode, shouldn't be here */
     214                 :            :                 RTE_ASSERT(0);
     215                 :            :         }
     216                 :            : }
     217                 :            : 
     218                 :            : static __rte_always_inline uint32_t
     219                 :            : __rte_soring_stage_move_head(struct soring_stage_headtail *d,
     220                 :            :         const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num,
     221                 :            :         enum rte_ring_queue_behavior behavior,
     222                 :            :         uint32_t *old_head, uint32_t *new_head, uint32_t *avail)
     223                 :            : {
     224                 :            :         uint32_t n, tail;
     225                 :            : 
     226                 :          5 :         *old_head = rte_atomic_load_explicit(&d->head,
     227                 :            :                         rte_memory_order_relaxed);
     228                 :            : 
     229                 :            :         do {
     230                 :            :                 n = num;
     231                 :            : 
     232                 :            :                 /* Ensure the head is read before tail */
     233                 :            :                 rte_atomic_thread_fence(rte_memory_order_acquire);
     234                 :            : 
     235                 :      10005 :                 tail = rte_atomic_load_explicit(&s->tail,
     236                 :            :                                 rte_memory_order_acquire);
     237                 :      10005 :                 *avail = capacity + tail - *old_head;
     238   [ +  -  -  -  :      10003 :                 if (n > *avail)
          +  -  +  -  -  
          -  +  -  -  -  
                   -  - ]
     239                 :            :                         n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *avail;
     240   [ +  -  +  -  :      10005 :                 if (n == 0)
          -  -  -  -  -  
          -  -  -  +  -  
          +  -  -  -  +  
             -  -  -  -  
                      - ]
     241                 :            :                         return 0;
     242                 :      10005 :                 *new_head = *old_head + n;
     243                 :      10005 :         } while (rte_atomic_compare_exchange_strong_explicit(&d->head,
     244                 :            :                         old_head, *new_head, rte_memory_order_acq_rel,
     245   [ -  +  -  +  :      10005 :                         rte_memory_order_relaxed) == 0);
          -  -  -  -  -  
          -  -  -  -  +  
          -  +  -  -  -  
             +  -  -  -  
                      - ]
     246                 :            : 
     247                 :            :         return n;
     248                 :            : }
     249                 :            : 
     250                 :            : static __rte_always_inline uint32_t
     251                 :            : soring_enqueue(struct rte_soring *r, const void *objs,
     252                 :            :         const void *meta, uint32_t n, enum rte_ring_queue_behavior behavior,
     253                 :            :         uint32_t *free_space)
     254                 :            : {
     255                 :            :         enum rte_ring_sync_type st;
     256                 :            :         uint32_t nb_free, prod_head, prod_next;
     257                 :            : 
     258                 :            :         RTE_ASSERT(r != NULL && r->nb_stage > 0);
     259                 :            :         RTE_ASSERT(meta == NULL || r->meta != NULL);
     260                 :            : 
     261                 :          6 :         st = r->prod.ht.sync_type;
     262                 :            : 
     263                 :            :         n = __rte_soring_move_prod_head(r, n, behavior, st,
     264                 :            :                         &prod_head, &prod_next, &nb_free);
     265   [ +  -  +  -  :          6 :         if (n != 0) {
             -  -  -  + ]
     266                 :          5 :                 __rte_ring_do_enqueue_elems(&r[1], objs, r->size,
     267   [ -  +  +  +  :          5 :                         prod_head & r->mask, r->esize, n);
             -  -  -  - ]
     268   [ +  -  -  - ]:          2 :                 if (meta != NULL)
     269                 :          2 :                         __rte_ring_do_enqueue_elems(r->meta, meta, r->size,
     270   [ -  +  -  - ]:          2 :                                 prod_head & r->mask, r->msize, n);
     271                 :            :                 __rte_soring_update_tail(&r->prod, st, prod_head, prod_next, 1);
     272                 :            :         }
     273                 :            : 
     274   [ +  +  +  -  :          6 :         if (free_space != NULL)
             -  -  +  - ]
     275                 :          5 :                 *free_space = nb_free - n;
     276                 :            :         return n;
     277                 :            : }
     278                 :            : 
     279                 :            : static __rte_always_inline uint32_t
     280                 :            : soring_dequeue(struct rte_soring *r, void *objs, void *meta,
     281                 :            :         uint32_t num, enum rte_ring_queue_behavior behavior,
     282                 :            :         uint32_t *available)
     283                 :            : {
     284                 :            :         enum rte_ring_sync_type st;
     285                 :            :         uint32_t entries, cons_head, cons_next, n, ns, reqn;
     286                 :            : 
     287                 :            :         RTE_ASSERT(r != NULL && r->nb_stage > 0);
     288                 :            :         RTE_ASSERT(meta == NULL || r->meta != NULL);
     289                 :            : 
     290                 :      10008 :         ns = r->nb_stage - 1;
     291                 :      10008 :         st = r->cons.ht.sync_type;
     292                 :            : 
     293                 :            :         /* try to grab exactly @num elems first */
     294                 :            :         n = __rte_soring_move_cons_head(r, ns, num, RTE_RING_QUEUE_FIXED, st,
     295                 :            :                         &cons_head, &cons_next, &entries);
     296   [ -  +  +  +  :      10008 :         if (n == 0) {
             -  +  +  - ]
     297                 :            :                 /* try to finalize some elems from previous stage */
     298   [ -  -  +  -  :      10005 :                 n = __rte_soring_stage_finalize(&r->stage[ns].sht, ns,
             -  -  +  - ]
     299                 :            :                         r->state, r->mask, 2 * num);
     300                 :      10002 :                 entries += n;
     301                 :            : 
     302                 :            :                 /* repeat attempt to grab elems */
     303                 :            :                 reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
     304   [ -  -  -  + ]:      10002 :                 if (entries >= reqn)
     305                 :            :                         n = __rte_soring_move_cons_head(r, ns, num, behavior,
     306                 :            :                                 st, &cons_head, &cons_next, &entries);
     307                 :            :                 else
     308                 :            :                         n = 0;
     309                 :            :         }
     310                 :            : 
     311                 :            :         /* we have some elems to consume */
     312   [ +  -  +  +  :      10008 :         if (n != 0) {
             +  -  -  + ]
     313                 :          4 :                 __rte_ring_do_dequeue_elems(objs, &r[1], r->size,
     314   [ -  +  +  -  :          4 :                         cons_head & r->mask, r->esize, n);
             -  +  -  - ]
     315   [ +  -  +  - ]:          2 :                 if (meta != NULL)
     316                 :          2 :                         __rte_ring_do_dequeue_elems(meta, r->meta, r->size,
     317   [ -  +  -  + ]:          2 :                                 cons_head & r->mask, r->msize, n);
     318                 :            :                 __rte_soring_update_tail(&r->cons, st, cons_head, cons_next, 0);
     319                 :            :         }
     320                 :            : 
     321   [ -  +  -  +  :      10008 :         if (available != NULL)
             -  +  -  + ]
     322                 :          0 :                 *available = entries - n;
     323                 :            :         return n;
     324                 :            : }
     325                 :            : 
     326                 :            : /*
     327                 :            :  * Verify internal SORING state.
     328                 :            :  * WARNING: if expected value is not equal to actual one, it means that for
     329                 :            :  * whatever reason SORING data constancy is broken. That is a very serious
     330                 :            :  * problem that most likely will cause race-conditions, memory corruption,
     331                 :            :  * program crash.
     332                 :            :  * To ease debugging it user might rebuild ring library with
     333                 :            :  * RTE_SORING_DEBUG enabled.
     334                 :            :  */
     335                 :            : static __rte_always_inline void
     336                 :            : soring_verify_state(const struct rte_soring *r, uint32_t stage, uint32_t idx,
     337                 :            :         const char *msg, union soring_state val,  union soring_state exp)
     338                 :            : {
     339                 :      20010 :         if (val.raw != exp.raw) {
     340                 :            : #ifdef RTE_SORING_DEBUG
     341                 :            :                 rte_soring_dump(stderr, r);
     342                 :            :                 rte_panic("line:%d from:%s: soring=%p, stage=%#x, idx=%#x, "
     343                 :            :                         "expected={.stnum=%#x, .ftoken=%#x}, "
     344                 :            :                         "actual={.stnum=%#x, .ftoken=%#x};\n",
     345                 :            :                         __LINE__, msg, r, stage, idx,
     346                 :            :                         exp.stnum, exp.ftoken,
     347                 :            :                         val.stnum, val.ftoken);
     348                 :            : #else
     349                 :          0 :                 SORING_LOG(EMERG, "from:%s: soring=%p, stage=%#x, idx=%#x, "
     350                 :            :                         "expected={.stnum=%#x, .ftoken=%#x}, "
     351                 :            :                         "actual={.stnum=%#x, .ftoken=%#x};",
     352                 :            :                         msg, r, stage, idx,
     353                 :            :                         exp.stnum, exp.ftoken,
     354                 :            :                         val.stnum, val.ftoken);
     355                 :            : #endif
     356                 :            :         }
     357                 :            : }
     358                 :            : 
     359                 :            : /* check and update state ring at acquire op*/
     360                 :            : static __rte_always_inline void
     361                 :            : acquire_state_update(const struct rte_soring *r, uint32_t stage, uint32_t idx,
     362                 :            :         uint32_t ftoken, uint32_t num)
     363                 :            : {
     364                 :            :         union soring_state st;
     365                 :            :         const union soring_state est = {.raw = 0};
     366                 :            : 
     367                 :      10005 :         st.raw = rte_atomic_load_explicit(&r->state[idx].raw,
     368                 :            :                         rte_memory_order_relaxed);
     369                 :            :         soring_verify_state(r, stage, idx, __func__, st, est);
     370                 :            : 
     371                 :      10005 :         st.ftoken = ftoken;
     372                 :      10005 :         st.stnum = (SORING_ST_START | num);
     373                 :            : 
     374                 :      10005 :         rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
     375                 :            :                         rte_memory_order_relaxed);
     376                 :            : }
     377                 :            : 
     378                 :            : static __rte_always_inline uint32_t
     379                 :            : soring_acquire(struct rte_soring *r, void *objs, void *meta,
     380                 :            :         uint32_t stage, uint32_t num, enum rte_ring_queue_behavior behavior,
     381                 :            :         uint32_t *ftoken, uint32_t *available)
     382                 :            : {
     383                 :            :         uint32_t avail, head, idx, n, next, reqn;
     384                 :            :         struct soring_stage *pstg;
     385                 :            :         struct soring_stage_headtail *cons;
     386                 :            : 
     387                 :            :         RTE_ASSERT(r != NULL && stage < r->nb_stage);
     388                 :            :         RTE_ASSERT(meta == NULL || r->meta != NULL);
     389                 :            : 
     390                 :      10005 :         cons = &r->stage[stage].sht;
     391                 :            : 
     392                 :      10005 :         if (stage == 0)
     393                 :            :                 n = __rte_soring_stage_move_head(cons, &r->prod.ht, 0, num,
     394                 :            :                         behavior, &head, &next, &avail);
     395                 :            :         else {
     396                 :      10000 :                 pstg = r->stage + stage - 1;
     397                 :            : 
     398                 :            :                 /* try to grab exactly @num elems */
     399                 :            :                 n = __rte_soring_stage_move_head(cons, &pstg->ht, 0, num,
     400                 :            :                         RTE_RING_QUEUE_FIXED, &head, &next, &avail);
     401   [ -  +  -  -  :      10000 :                 if (n == 0) {
             -  +  -  - ]
     402                 :            :                         /* try to finalize some elems from previous stage */
     403   [ #  #  #  #  :          0 :                         n = __rte_soring_stage_finalize(&pstg->sht, stage - 1,
             #  #  #  # ]
     404                 :            :                                 r->state, r->mask, 2 * num);
     405                 :          0 :                         avail += n;
     406                 :            : 
     407                 :            :                         /* repeat attempt to grab elems */
     408                 :            :                         reqn = (behavior == RTE_RING_QUEUE_FIXED) ? num : 0;
     409   [ #  #  #  # ]:          0 :                         if (avail >= reqn)
     410                 :            :                                 n = __rte_soring_stage_move_head(cons,
     411                 :            :                                         &pstg->ht, 0, num, behavior, &head,
     412                 :            :                                         &next, &avail);
     413                 :            :                         else
     414                 :            :                                 n = 0;
     415                 :            :                 }
     416                 :            :         }
     417                 :            : 
     418   [ +  -  -  -  :      10005 :         if (n != 0) {
             +  -  +  - ]
     419                 :            : 
     420                 :      10005 :                 idx = head & r->mask;
     421   [ -  +  -  -  :      10005 :                 *ftoken = SORING_FTKN_MAKE(head, stage);
             -  +  -  + ]
     422                 :            : 
     423                 :            :                 /* check and update state value */
     424                 :            :                 acquire_state_update(r, stage, idx, *ftoken, n);
     425                 :            : 
     426                 :            :                 /* copy elems that are ready for given stage */
     427   [ -  +  -  -  :      10005 :                 __rte_ring_do_dequeue_elems(objs, &r[1], r->size, idx,
             -  +  +  - ]
     428                 :            :                                 r->esize, n);
     429   [ +  -  +  - ]:      10003 :                 if (meta != NULL)
     430   [ -  +  -  + ]:      10003 :                         __rte_ring_do_dequeue_elems(meta, r->meta,
     431                 :            :                                 r->size, idx, r->msize, n);
     432                 :            :         }
     433                 :            : 
     434   [ -  +  -  -  :      10005 :         if (available != NULL)
             -  +  -  + ]
     435                 :          0 :                 *available = avail - n;
     436                 :            :         return n;
     437                 :            : }
     438                 :            : 
     439                 :            : static __rte_always_inline void
     440                 :            : soring_release(struct rte_soring *r, const void *objs,
     441                 :            :         const void *meta, uint32_t stage, uint32_t n, uint32_t ftoken)
     442                 :            : {
     443                 :            :         uint32_t idx, pos, tail;
     444                 :            :         struct soring_stage *stg;
     445                 :            :         union soring_state st;
     446                 :            : 
     447                 :            :         const union soring_state est = {
     448                 :      10005 :                 .stnum = (SORING_ST_START | n),
     449                 :            :                 .ftoken = ftoken,
     450                 :            :         };
     451                 :            : 
     452                 :            :         RTE_ASSERT(r != NULL && stage < r->nb_stage);
     453                 :            :         RTE_ASSERT(meta == NULL || r->meta != NULL);
     454                 :            : 
     455                 :      10005 :         stg = r->stage + stage;
     456                 :            : 
     457                 :      10005 :         pos = SORING_FTKN_POS(ftoken, stage);
     458                 :      10005 :         idx = pos & r->mask;
     459                 :      10005 :         st.raw = rte_atomic_load_explicit(&r->state[idx].raw,
     460                 :            :                         rte_memory_order_relaxed);
     461                 :            : 
     462                 :            :         /* check state ring contents */
     463                 :      10005 :         soring_verify_state(r, stage, idx, __func__, st, est);
     464                 :            : 
     465                 :            :         /* update contents of the ring, if necessary */
     466   [ +  +  -  + ]:      10005 :         if (objs != NULL)
     467   [ -  +  -  - ]:      10000 :                 __rte_ring_do_enqueue_elems(&r[1], objs, r->size, idx,
     468                 :            :                         r->esize, n);
     469         [ +  - ]:      10001 :         if (meta != NULL)
     470         [ -  + ]:      10001 :                 __rte_ring_do_enqueue_elems(r->meta, meta, r->size, idx,
     471                 :            :                         r->msize, n);
     472                 :            : 
     473                 :            :         /* set state to FINISH, make sure it is not reordered */
     474                 :            :         rte_atomic_thread_fence(rte_memory_order_release);
     475                 :            : 
     476                 :      10005 :         st.stnum = SORING_ST_FINISH | n;
     477                 :      10005 :         rte_atomic_store_explicit(&r->state[idx].raw, st.raw,
     478                 :            :                         rte_memory_order_relaxed);
     479                 :            : 
     480                 :            :         /* try to do finalize(), if appropriate */
     481                 :      10005 :         tail = rte_atomic_load_explicit(&stg->sht.tail.pos,
     482                 :            :                         rte_memory_order_relaxed);
     483   [ +  -  +  - ]:      10005 :         if (tail == pos)
     484   [ +  -  +  - ]:      10005 :                 __rte_soring_stage_finalize(&stg->sht, stage, r->state, r->mask,
     485                 :            :                                 r->capacity);
     486                 :            : }
     487                 :            : 
     488                 :            : /*
     489                 :            :  * Public functions (data-path) start here.
     490                 :            :  */
     491                 :            : 
     492                 :            : void
     493         [ -  + ]:          4 : rte_soring_release(struct rte_soring *r, const void *objs,
     494                 :            :         uint32_t stage, uint32_t n, uint32_t ftoken)
     495                 :            : {
     496                 :            :         soring_release(r, objs, NULL, stage, n, ftoken);
     497                 :          4 : }
     498                 :            : 
     499                 :            : 
     500                 :            : void
     501         [ -  + ]:      10001 : rte_soring_releasx(struct rte_soring *r, const void *objs,
     502                 :            :         const void *meta, uint32_t stage, uint32_t n, uint32_t ftoken)
     503                 :            : {
     504                 :            :         soring_release(r, objs, meta, stage, n, ftoken);
     505                 :      10001 : }
     506                 :            : 
     507                 :            : uint32_t
     508   [ +  -  -  - ]:          1 : rte_soring_enqueue_bulk(struct rte_soring *r, const void *objs, uint32_t n,
     509                 :            :         uint32_t *free_space)
     510                 :            : {
     511                 :          1 :         return soring_enqueue(r, objs, NULL, n, RTE_RING_QUEUE_FIXED,
     512                 :            :                         free_space);
     513                 :            : }
     514                 :            : 
     515                 :            : uint32_t
     516   [ #  #  #  # ]:          0 : rte_soring_enqueux_bulk(struct rte_soring *r, const void *objs,
     517                 :            :         const void *meta, uint32_t n, uint32_t *free_space)
     518                 :            : {
     519                 :          0 :         return soring_enqueue(r, objs, meta, n, RTE_RING_QUEUE_FIXED,
     520                 :            :                         free_space);
     521                 :            : }
     522                 :            : 
     523                 :            : uint32_t
     524   [ +  -  -  - ]:          3 : rte_soring_enqueue_burst(struct rte_soring *r, const void *objs, uint32_t n,
     525                 :            :         uint32_t *free_space)
     526                 :            : {
     527                 :          3 :         return soring_enqueue(r, objs, NULL, n, RTE_RING_QUEUE_VARIABLE,
     528                 :            :                         free_space);
     529                 :            : }
     530                 :            : 
     531                 :            : uint32_t
     532   [ +  -  -  - ]:          2 : rte_soring_enqueux_burst(struct rte_soring *r, const void *objs,
     533                 :            :         const void *meta, uint32_t n, uint32_t *free_space)
     534                 :            : {
     535                 :          2 :         return soring_enqueue(r, objs, meta, n, RTE_RING_QUEUE_VARIABLE,
     536                 :            :                         free_space);
     537                 :            : }
     538                 :            : 
     539                 :            : uint32_t
     540   [ +  -  -  - ]:      10002 : rte_soring_dequeue_bulk(struct rte_soring *r, void *objs, uint32_t num,
     541                 :            :         uint32_t *available)
     542                 :            : {
     543                 :      10002 :         return soring_dequeue(r, objs, NULL, num, RTE_RING_QUEUE_FIXED,
     544                 :            :                         available);
     545                 :            : }
     546                 :            : 
     547                 :            : uint32_t
     548   [ +  -  -  - ]:          1 : rte_soring_dequeux_bulk(struct rte_soring *r, void *objs, void *meta,
     549                 :            :         uint32_t num, uint32_t *available)
     550                 :            : {
     551                 :          1 :         return soring_dequeue(r, objs, meta, num, RTE_RING_QUEUE_FIXED,
     552                 :            :                         available);
     553                 :            : }
     554                 :            : 
     555                 :            : uint32_t
     556   [ +  -  -  - ]:          4 : rte_soring_dequeue_burst(struct rte_soring *r, void *objs, uint32_t num,
     557                 :            :         uint32_t *available)
     558                 :            : {
     559                 :          4 :         return soring_dequeue(r, objs, NULL, num, RTE_RING_QUEUE_VARIABLE,
     560                 :            :                         available);
     561                 :            : }
     562                 :            : 
     563                 :            : uint32_t
     564   [ +  -  -  - ]:          1 : rte_soring_dequeux_burst(struct rte_soring *r, void *objs, void *meta,
     565                 :            :         uint32_t num, uint32_t *available)
     566                 :            : {
     567                 :          1 :         return soring_dequeue(r, objs, meta, num, RTE_RING_QUEUE_VARIABLE,
     568                 :            :                         available);
     569                 :            : }
     570                 :            : 
     571                 :            : uint32_t
     572         [ +  - ]:          2 : rte_soring_acquire_bulk(struct rte_soring *r, void *objs,
     573                 :            :         uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
     574                 :            : {
     575                 :          2 :         return soring_acquire(r, objs, NULL, stage, num,
     576                 :            :                         RTE_RING_QUEUE_FIXED, ftoken, available);
     577                 :            : }
     578                 :            : 
     579                 :            : uint32_t
     580         [ +  + ]:      10000 : rte_soring_acquirx_bulk(struct rte_soring *r, void *objs, void *meta,
     581                 :            :         uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
     582                 :            : {
     583                 :      10000 :         return soring_acquire(r, objs, meta, stage, num,
     584                 :            :                         RTE_RING_QUEUE_FIXED, ftoken, available);
     585                 :            : }
     586                 :            : 
     587                 :            : uint32_t
     588         [ #  # ]:          0 : rte_soring_acquire_burst(struct rte_soring *r, void *objs,
     589                 :            :         uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
     590                 :            : {
     591                 :          0 :         return soring_acquire(r, objs, NULL, stage, num,
     592                 :            :                         RTE_RING_QUEUE_VARIABLE, ftoken, available);
     593                 :            : }
     594                 :            : 
     595                 :            : uint32_t
     596         [ +  + ]:          3 : rte_soring_acquirx_burst(struct rte_soring *r, void *objs, void *meta,
     597                 :            :         uint32_t stage, uint32_t num, uint32_t *ftoken, uint32_t *available)
     598                 :            : {
     599                 :          3 :         return soring_acquire(r, objs, meta, stage, num,
     600                 :            :                         RTE_RING_QUEUE_VARIABLE, ftoken, available);
     601                 :            : }
     602                 :            : 
     603                 :            : unsigned int
     604                 :          4 : rte_soring_count(const struct rte_soring *r)
     605                 :            : {
     606                 :          4 :         uint32_t prod_tail = r->prod.ht.tail;
     607                 :          4 :         uint32_t cons_tail = r->cons.ht.tail;
     608                 :          4 :         uint32_t count = (prod_tail - cons_tail) & r->mask;
     609                 :          4 :         return (count > r->capacity) ? r->capacity : count;
     610                 :            : }
     611                 :            : 
     612                 :            : unsigned int
     613                 :          2 : rte_soring_free_count(const struct rte_soring *r)
     614                 :            : {
     615                 :          2 :         return r->capacity - rte_soring_count(r);
     616                 :            : }

Generated by: LCOV version 1.14