LCOV - code coverage report
Current view: top level - lib/distributor - rte_distributor_single.c (source / functions) Hit Total Coverage
Test: Code coverage Lines: 119 130 91.5 %
Date: 2024-01-22 15:55:54 Functions: 12 12 100.0 %
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: 52 64 81.2 %

           Branch data     Line data    Source code
       1                 :            : /* SPDX-License-Identifier: BSD-3-Clause
       2                 :            :  * Copyright(c) 2010-2014 Intel Corporation
       3                 :            :  */
       4                 :            : 
       5                 :            : #include <stdio.h>
       6                 :            : #include <sys/queue.h>
       7                 :            : #include <rte_mbuf.h>
       8                 :            : #include <rte_memzone.h>
       9                 :            : #include <rte_errno.h>
      10                 :            : #include <rte_string_fns.h>
      11                 :            : #include <rte_eal_memconfig.h>
      12                 :            : #include <rte_pause.h>
      13                 :            : #include <rte_tailq.h>
      14                 :            : 
      15                 :            : #include "rte_distributor_single.h"
      16                 :            : #include "distributor_private.h"
      17                 :            : 
      18                 :            : TAILQ_HEAD(rte_distributor_list, rte_distributor_single);
      19                 :            : 
      20                 :            : static struct rte_tailq_elem rte_distributor_tailq = {
      21                 :            :         .name = "RTE_DISTRIBUTOR",
      22                 :            : };
      23         [ -  + ]:        235 : EAL_REGISTER_TAILQ(rte_distributor_tailq)
      24                 :            : 
      25                 :            : /**** APIs called by workers ****/
      26                 :            : 
      27                 :            : void
      28                 :    1049666 : rte_distributor_request_pkt_single(struct rte_distributor_single *d,
      29                 :            :                 unsigned worker_id, struct rte_mbuf *oldpkt)
      30                 :            : {
      31                 :            :         union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
      32                 :    1049666 :         int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
      33                 :            :                         | RTE_DISTRIB_GET_BUF;
      34         [ -  + ]:    1049666 :         RTE_WAIT_UNTIL_MASKED(&buf->bufptr64, RTE_DISTRIB_FLAGS_MASK,
      35                 :            :                 ==, 0, rte_memory_order_relaxed);
      36                 :            : 
      37                 :            :         /* Sync with distributor on GET_BUF flag. */
      38                 :    1049666 :         rte_atomic_store_explicit(&buf->bufptr64, req, rte_memory_order_release);
      39                 :    1049666 : }
      40                 :            : 
      41                 :            : struct rte_mbuf *
      42                 :   17588465 : rte_distributor_poll_pkt_single(struct rte_distributor_single *d,
      43                 :            :                 unsigned worker_id)
      44                 :            : {
      45                 :            :         union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
      46                 :            :         /* Sync with distributor. Acquire bufptr64. */
      47                 :   17588465 :         if (rte_atomic_load_explicit(&buf->bufptr64, rte_memory_order_acquire)
      48         [ +  + ]:   17588465 :                 & RTE_DISTRIB_GET_BUF)
      49                 :            :                 return NULL;
      50                 :            : 
      51                 :            :         /* since bufptr64 is signed, this should be an arithmetic shift */
      52                 :    1049666 :         int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
      53                 :    1049666 :         return (struct rte_mbuf *)((uintptr_t)ret);
      54                 :            : }
      55                 :            : 
      56                 :            : struct rte_mbuf *
      57                 :    1049666 : rte_distributor_get_pkt_single(struct rte_distributor_single *d,
      58                 :            :                 unsigned worker_id, struct rte_mbuf *oldpkt)
      59                 :            : {
      60                 :            :         struct rte_mbuf *ret;
      61                 :    1049666 :         rte_distributor_request_pkt_single(d, worker_id, oldpkt);
      62         [ +  + ]:   17588465 :         while ((ret = rte_distributor_poll_pkt_single(d, worker_id)) == NULL)
      63                 :            :                 rte_pause();
      64                 :    1049666 :         return ret;
      65                 :            : }
      66                 :            : 
      67                 :            : int
      68                 :          2 : rte_distributor_return_pkt_single(struct rte_distributor_single *d,
      69                 :            :                 unsigned worker_id, struct rte_mbuf *oldpkt)
      70                 :            : {
      71                 :            :         union rte_distributor_buffer_single *buf = &d->bufs[worker_id];
      72                 :          2 :         uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
      73                 :          2 :                         | RTE_DISTRIB_RETURN_BUF;
      74         [ -  + ]:          2 :         RTE_WAIT_UNTIL_MASKED(&buf->bufptr64, RTE_DISTRIB_FLAGS_MASK,
      75                 :            :                 ==, 0, rte_memory_order_relaxed);
      76                 :            : 
      77                 :            :         /* Sync with distributor on RETURN_BUF flag. */
      78                 :          2 :         rte_atomic_store_explicit(&buf->bufptr64, req, rte_memory_order_release);
      79                 :          2 :         return 0;
      80                 :            : }
      81                 :            : 
      82                 :            : /**** APIs called on distributor core ***/
      83                 :            : 
      84                 :            : /* as name suggests, adds a packet to the backlog for a particular worker */
      85                 :            : static int
      86                 :            : add_to_backlog(struct rte_distributor_backlog *bl, int64_t item)
      87                 :            : {
      88         [ +  + ]:        454 :         if (bl->count == RTE_DISTRIB_BACKLOG_SIZE)
      89                 :            :                 return -1;
      90                 :            : 
      91                 :         31 :         bl->pkts[(bl->start + bl->count++) & (RTE_DISTRIB_BACKLOG_MASK)]
      92                 :         31 :                         = item;
      93                 :            :         return 0;
      94                 :            : }
      95                 :            : 
      96                 :            : /* takes the next packet for a worker off the backlog */
      97                 :            : static int64_t
      98                 :            : backlog_pop(struct rte_distributor_backlog *bl)
      99                 :            : {
     100                 :         31 :         bl->count--;
     101                 :         31 :         return bl->pkts[bl->start++ & RTE_DISTRIB_BACKLOG_MASK];
     102                 :            : }
     103                 :            : 
     104                 :            : /* stores a packet returned from a worker inside the returns array */
     105                 :            : static inline void
     106                 :            : store_return(uintptr_t oldbuf, struct rte_distributor_single *d,
     107                 :            :                 unsigned *ret_start, unsigned *ret_count)
     108                 :            : {
     109                 :            :         /* store returns in a circular buffer - code is branch-free */
     110                 :   51828464 :         d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
     111                 :   51828464 :                         = (void *)oldbuf;
     112                 :   51828464 :         *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
     113                 :   51828464 :         *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
     114                 :            : }
     115                 :            : 
     116                 :            : static inline void
     117                 :          2 : handle_worker_shutdown(struct rte_distributor_single *d, unsigned int wkr)
     118                 :            : {
     119                 :          2 :         d->in_flight_tags[wkr] = 0;
     120                 :          2 :         d->in_flight_bitmask &= ~(1UL << wkr);
     121                 :            :         /* Sync with worker. Release bufptr64. */
     122                 :          2 :         rte_atomic_store_explicit(&d->bufs[wkr].bufptr64, 0, rte_memory_order_release);
     123         [ -  + ]:          2 :         if (unlikely(d->backlog[wkr].count != 0)) {
     124                 :            :                 /* On return of a packet, we need to move the
     125                 :            :                  * queued packets for this core elsewhere.
     126                 :            :                  * Easiest solution is to set things up for
     127                 :            :                  * a recursive call. That will cause those
     128                 :            :                  * packets to be queued up for the next free
     129                 :            :                  * core, i.e. it will return as soon as a
     130                 :            :                  * core becomes free to accept the first
     131                 :            :                  * packet, as subsequent ones will be added to
     132                 :            :                  * the backlog for that core.
     133                 :            :                  */
     134                 :            :                 struct rte_mbuf *pkts[RTE_DISTRIB_BACKLOG_SIZE];
     135                 :            :                 unsigned i;
     136                 :            :                 struct rte_distributor_backlog *bl = &d->backlog[wkr];
     137                 :            : 
     138         [ #  # ]:          0 :                 for (i = 0; i < bl->count; i++) {
     139                 :          0 :                         unsigned idx = (bl->start + i) &
     140                 :            :                                         RTE_DISTRIB_BACKLOG_MASK;
     141                 :          0 :                         pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
     142                 :            :                                         RTE_DISTRIB_FLAG_BITS));
     143                 :            :                 }
     144                 :            :                 /* recursive call.
     145                 :            :                  * Note that the tags were set before first level call
     146                 :            :                  * to rte_distributor_process.
     147                 :            :                  */
     148                 :          0 :                 rte_distributor_process_single(d, pkts, i);
     149                 :          0 :                 bl->count = bl->start = 0;
     150                 :            :         }
     151                 :          2 : }
     152                 :            : 
     153                 :            : /* this function is called when process() fn is called without any new
     154                 :            :  * packets. It goes through all the workers and clears any returned packets
     155                 :            :  * to do a partial flush.
     156                 :            :  */
     157                 :            : static int
     158                 :        378 : process_returns(struct rte_distributor_single *d)
     159                 :            : {
     160                 :            :         unsigned wkr;
     161                 :            :         unsigned flushed = 0;
     162                 :        378 :         unsigned ret_start = d->returns.start,
     163                 :        378 :                         ret_count = d->returns.count;
     164                 :            : 
     165         [ +  + ]:        756 :         for (wkr = 0; wkr < d->num_workers; wkr++) {
     166                 :            :                 uintptr_t oldbuf = 0;
     167                 :            :                 /* Sync with worker. Acquire bufptr64. */
     168                 :        378 :                 const int64_t data = rte_atomic_load_explicit(&d->bufs[wkr].bufptr64,
     169                 :            :                                                         rte_memory_order_acquire);
     170                 :            : 
     171         [ +  + ]:        378 :                 if (data & RTE_DISTRIB_GET_BUF) {
     172                 :         42 :                         flushed++;
     173         [ +  + ]:         42 :                         if (d->backlog[wkr].count)
     174                 :            :                                 /* Sync with worker. Release bufptr64. */
     175                 :          7 :                                 rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
     176                 :            :                                         backlog_pop(&d->backlog[wkr]),
     177                 :            :                                         rte_memory_order_release);
     178                 :            :                         else {
     179                 :            :                                 /* Sync with worker on GET_BUF flag. */
     180                 :         35 :                                 rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
     181                 :            :                                         RTE_DISTRIB_GET_BUF,
     182                 :            :                                         rte_memory_order_release);
     183                 :         35 :                                 d->in_flight_tags[wkr] = 0;
     184                 :         35 :                                 d->in_flight_bitmask &= ~(1UL << wkr);
     185                 :            :                         }
     186                 :         42 :                         oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
     187         [ +  + ]:        336 :                 } else if (data & RTE_DISTRIB_RETURN_BUF) {
     188                 :          2 :                         handle_worker_shutdown(d, wkr);
     189                 :          2 :                         oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
     190                 :            :                 }
     191                 :            : 
     192                 :            :                 store_return(oldbuf, d, &ret_start, &ret_count);
     193                 :            :         }
     194                 :            : 
     195                 :        378 :         d->returns.start = ret_start;
     196                 :        378 :         d->returns.count = ret_count;
     197                 :            : 
     198                 :        378 :         return flushed;
     199                 :            : }
     200                 :            : 
     201                 :            : /* process a set of packets to distribute them to workers */
     202                 :            : int
     203                 :      33182 : rte_distributor_process_single(struct rte_distributor_single *d,
     204                 :            :                 struct rte_mbuf **mbufs, unsigned num_mbufs)
     205                 :            : {
     206                 :            :         unsigned next_idx = 0;
     207                 :            :         unsigned wkr = 0;
     208                 :            :         struct rte_mbuf *next_mb = NULL;
     209                 :            :         int64_t next_value = 0;
     210                 :            :         uint32_t new_tag = 0;
     211                 :      33182 :         unsigned ret_start = d->returns.start,
     212                 :      33182 :                         ret_count = d->returns.count;
     213                 :            : 
     214         [ +  + ]:      33182 :         if (unlikely(num_mbufs == 0))
     215                 :        378 :                 return process_returns(d);
     216                 :            : 
     217         [ +  + ]:   51860889 :         while (next_idx < num_mbufs || next_mb != NULL) {
     218                 :            :                 uintptr_t oldbuf = 0;
     219                 :            :                 /* Sync with worker. Acquire bufptr64. */
     220                 :   51828085 :                 int64_t data = rte_atomic_load_explicit(&(d->bufs[wkr].bufptr64),
     221                 :            :                                                 rte_memory_order_acquire);
     222                 :            : 
     223         [ +  + ]:   51828085 :                 if (!next_mb) {
     224                 :    1050089 :                         next_mb = mbufs[next_idx++];
     225                 :    1050089 :                         next_value = (((int64_t)(uintptr_t)next_mb)
     226                 :            :                                         << RTE_DISTRIB_FLAG_BITS);
     227                 :            :                         /*
     228                 :            :                          * User is advocated to set tag value for each
     229                 :            :                          * mbuf before calling rte_distributor_process.
     230                 :            :                          * User defined tags are used to identify flows,
     231                 :            :                          * or sessions.
     232                 :            :                          */
     233                 :    1050089 :                         new_tag = next_mb->hash.usr;
     234                 :            : 
     235                 :            :                         /*
     236                 :            :                          * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64
     237                 :            :                          * then the size of match has to be expanded.
     238                 :            :                          */
     239                 :            :                         uint64_t match = 0;
     240                 :            :                         unsigned i;
     241                 :            :                         /*
     242                 :            :                          * to scan for a match use "xor" and "not" to get a 0/1
     243                 :            :                          * value, then use shifting to merge to single "match"
     244                 :            :                          * variable, where a one-bit indicates a match for the
     245                 :            :                          * worker given by the bit-position
     246                 :            :                          */
     247         [ +  + ]:    2100178 :                         for (i = 0; i < d->num_workers; i++)
     248                 :    1050089 :                                 match |= ((uint64_t)!(d->in_flight_tags[i] ^ new_tag) << i);
     249                 :            : 
     250                 :            :                         /* Only turned-on bits are considered as match */
     251                 :    1050089 :                         match &= d->in_flight_bitmask;
     252                 :            : 
     253         [ +  + ]:    1050089 :                         if (match) {
     254                 :            :                                 next_mb = NULL;
     255                 :            :                                 unsigned int worker = rte_ctz64(match);
     256                 :            :                                 if (add_to_backlog(&d->backlog[worker],
     257                 :            :                                                 next_value) < 0)
     258                 :            :                                         next_idx--;
     259                 :            :                         }
     260                 :            :                 }
     261                 :            : 
     262         [ +  + ]:   51828085 :                 if ((data & RTE_DISTRIB_GET_BUF) &&
     263   [ +  +  +  - ]:    1049658 :                                 (d->backlog[wkr].count || next_mb)) {
     264                 :            : 
     265         [ +  + ]:    1049658 :                         if (d->backlog[wkr].count)
     266                 :            :                                 /* Sync with worker. Release bufptr64. */
     267                 :         23 :                                 rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
     268                 :            :                                                 backlog_pop(&d->backlog[wkr]),
     269                 :            :                                                 rte_memory_order_release);
     270                 :            : 
     271                 :            :                         else {
     272                 :            :                                 /* Sync with worker. Release bufptr64.  */
     273                 :    1049635 :                                 rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
     274                 :            :                                                 next_value,
     275                 :            :                                                 rte_memory_order_release);
     276                 :    1049635 :                                 d->in_flight_tags[wkr] = new_tag;
     277                 :    1049635 :                                 d->in_flight_bitmask |= (1UL << wkr);
     278                 :            :                                 next_mb = NULL;
     279                 :            :                         }
     280                 :    1049658 :                         oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
     281         [ -  + ]:   50778427 :                 } else if (data & RTE_DISTRIB_RETURN_BUF) {
     282                 :          0 :                         handle_worker_shutdown(d, wkr);
     283                 :          0 :                         oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
     284                 :            :                 }
     285                 :            : 
     286                 :            :                 /* store returns in a circular buffer */
     287                 :            :                 store_return(oldbuf, d, &ret_start, &ret_count);
     288                 :            : 
     289         [ +  - ]:   51828085 :                 if (++wkr == d->num_workers)
     290                 :            :                         wkr = 0;
     291                 :            :         }
     292                 :            :         /* to finish, check all workers for backlog and schedule work for them
     293                 :            :          * if they are ready */
     294         [ +  + ]:      65608 :         for (wkr = 0; wkr < d->num_workers; wkr++)
     295         [ +  + ]:      32804 :                 if (d->backlog[wkr].count &&
     296                 :            :                                 /* Sync with worker. Acquire bufptr64. */
     297                 :          1 :                                 (rte_atomic_load_explicit(&d->bufs[wkr].bufptr64,
     298         [ +  - ]:          1 :                                 rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF)) {
     299                 :            : 
     300                 :          1 :                         int64_t oldbuf = d->bufs[wkr].bufptr64 >>
     301                 :            :                                         RTE_DISTRIB_FLAG_BITS;
     302                 :            : 
     303                 :            :                         store_return(oldbuf, d, &ret_start, &ret_count);
     304                 :            : 
     305                 :            :                         /* Sync with worker. Release bufptr64. */
     306                 :          1 :                         rte_atomic_store_explicit(&d->bufs[wkr].bufptr64,
     307                 :            :                                 backlog_pop(&d->backlog[wkr]),
     308                 :            :                                 rte_memory_order_release);
     309                 :            :                 }
     310                 :            : 
     311                 :      32804 :         d->returns.start = ret_start;
     312                 :      32804 :         d->returns.count = ret_count;
     313                 :      32804 :         return num_mbufs;
     314                 :            : }
     315                 :            : 
     316                 :            : /* return to the caller, packets returned from workers */
     317                 :            : int
     318                 :         39 : rte_distributor_returned_pkts_single(struct rte_distributor_single *d,
     319                 :            :                 struct rte_mbuf **mbufs, unsigned max_mbufs)
     320                 :            : {
     321                 :            :         struct rte_distributor_returned_pkts *returns = &d->returns;
     322                 :         39 :         unsigned retval = (max_mbufs < returns->count) ?
     323                 :            :                         max_mbufs : returns->count;
     324                 :            :         unsigned i;
     325                 :            : 
     326         [ +  + ]:       1129 :         for (i = 0; i < retval; i++) {
     327                 :       1090 :                 unsigned idx = (returns->start + i) & RTE_DISTRIB_RETURNS_MASK;
     328                 :       1090 :                 mbufs[i] = returns->mbufs[idx];
     329                 :            :         }
     330                 :         39 :         returns->start += i;
     331                 :         39 :         returns->count -= i;
     332                 :            : 
     333                 :         39 :         return retval;
     334                 :            : }
     335                 :            : 
     336                 :            : /* return the number of packets in-flight in a distributor, i.e. packets
     337                 :            :  * being worked on or queued up in a backlog.
     338                 :            :  */
     339                 :            : static inline unsigned
     340                 :            : total_outstanding(const struct rte_distributor_single *d)
     341                 :            : {
     342                 :            :         unsigned wkr, total_outstanding;
     343                 :            : 
     344                 :        415 :         total_outstanding = rte_popcount64(d->in_flight_bitmask);
     345                 :            : 
     346   [ +  +  +  + ]:        908 :         for (wkr = 0; wkr < d->num_workers; wkr++)
     347                 :        454 :                 total_outstanding += d->backlog[wkr].count;
     348                 :            : 
     349                 :            :         return total_outstanding;
     350                 :            : }
     351                 :            : 
     352                 :            : /* flush the distributor, so that there are no outstanding packets in flight or
     353                 :            :  * queued up. */
     354                 :            : int
     355                 :         39 : rte_distributor_flush_single(struct rte_distributor_single *d)
     356                 :            : {
     357                 :            :         const unsigned flushed = total_outstanding(d);
     358                 :            : 
     359         [ +  + ]:        415 :         while (total_outstanding(d) > 0)
     360                 :        376 :                 rte_distributor_process_single(d, NULL, 0);
     361                 :            : 
     362                 :         39 :         return flushed;
     363                 :            : }
     364                 :            : 
     365                 :            : /* clears the internal returns array in the distributor */
     366                 :            : void
     367                 :          3 : rte_distributor_clear_returns_single(struct rte_distributor_single *d)
     368                 :            : {
     369                 :          3 :         d->returns.start = d->returns.count = 0;
     370                 :            : #ifndef __OPTIMIZE__
     371                 :            :         memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs));
     372                 :            : #endif
     373                 :          3 : }
     374                 :            : 
     375                 :            : /* creates a distributor instance */
     376                 :            : struct rte_distributor_single *
     377                 :          1 : rte_distributor_create_single(const char *name,
     378                 :            :                 unsigned socket_id,
     379                 :            :                 unsigned num_workers)
     380                 :            : {
     381                 :            :         struct rte_distributor_single *d;
     382                 :            :         struct rte_distributor_list *distributor_list;
     383                 :            :         char mz_name[RTE_MEMZONE_NAMESIZE];
     384                 :            :         const struct rte_memzone *mz;
     385                 :            : 
     386                 :            :         /* compilation-time checks */
     387                 :            :         RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
     388                 :            :         RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
     389                 :            :         RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS >
     390                 :            :                                 sizeof(d->in_flight_bitmask) * CHAR_BIT);
     391                 :            : 
     392         [ -  + ]:          1 :         if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
     393                 :          0 :                 rte_errno = EINVAL;
     394                 :          0 :                 return NULL;
     395                 :            :         }
     396                 :            : 
     397                 :            :         snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
     398                 :          1 :         mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
     399         [ -  + ]:          1 :         if (mz == NULL) {
     400                 :          0 :                 rte_errno = ENOMEM;
     401                 :          0 :                 return NULL;
     402                 :            :         }
     403                 :            : 
     404                 :          1 :         d = mz->addr;
     405                 :          1 :         strlcpy(d->name, name, sizeof(d->name));
     406                 :          1 :         d->num_workers = num_workers;
     407                 :            : 
     408                 :          1 :         distributor_list = RTE_TAILQ_CAST(rte_distributor_tailq.head,
     409                 :            :                                           rte_distributor_list);
     410                 :            : 
     411                 :          1 :         rte_mcfg_tailq_write_lock();
     412                 :          1 :         TAILQ_INSERT_TAIL(distributor_list, d, next);
     413                 :          1 :         rte_mcfg_tailq_write_unlock();
     414                 :            : 
     415                 :          1 :         return d;
     416                 :            : }

Generated by: LCOV version 1.14