LCOV - code coverage report
Current view: top level - drivers/event/dsw - dsw_event.c (source / functions) Hit Total Coverage
Test: Code coverage Lines: 0 499 0.0 %
Date: 2025-01-02 22:41:34 Functions: 0 38 0.0 %
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed Branches: 0 347 0.0 %

           Branch data     Line data    Source code
       1                 :            : /* SPDX-License-Identifier: BSD-3-Clause
       2                 :            :  * Copyright(c) 2018 Ericsson AB
       3                 :            :  */
       4                 :            : 
       5                 :            : #include "dsw_evdev.h"
       6                 :            : 
       7                 :            : #ifdef DSW_SORT_DEQUEUED
       8                 :            : #include "dsw_sort.h"
       9                 :            : #endif
      10                 :            : 
      11                 :            : #include <stdbool.h>
      12                 :            : #include <stdlib.h>
      13                 :            : #include <string.h>
      14                 :            : 
      15                 :            : #include <rte_cycles.h>
      16                 :            : #include <rte_memcpy.h>
      17                 :            : #include <rte_random.h>
      18                 :            : 
      19                 :            : static bool
      20                 :          0 : dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
      21                 :            :                          int32_t credits)
      22                 :            : {
      23                 :          0 :         int32_t inflight_credits = port->inflight_credits;
      24                 :          0 :         int32_t missing_credits = credits - inflight_credits;
      25                 :            :         int32_t total_on_loan;
      26                 :            :         int32_t available;
      27                 :            :         int32_t acquired_credits;
      28                 :            :         int32_t new_total_on_loan;
      29                 :            : 
      30         [ #  # ]:          0 :         if (likely(missing_credits <= 0)) {
      31                 :          0 :                 port->inflight_credits -= credits;
      32                 :          0 :                 return true;
      33                 :            :         }
      34                 :            : 
      35                 :          0 :         total_on_loan =
      36                 :          0 :                 rte_atomic_load_explicit(&dsw->credits_on_loan,
      37                 :            :                                          rte_memory_order_relaxed);
      38                 :          0 :         available = dsw->max_inflight - total_on_loan;
      39                 :          0 :         acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
      40                 :            : 
      41         [ #  # ]:          0 :         if (available < acquired_credits)
      42                 :            :                 return false;
      43                 :            : 
      44                 :            :         /* This is a race, no locks are involved, and thus some other
      45                 :            :          * thread can allocate tokens in between the check and the
      46                 :            :          * allocation.
      47                 :            :          */
      48                 :          0 :         new_total_on_loan =
      49                 :          0 :             rte_atomic_fetch_add_explicit(&dsw->credits_on_loan,
      50                 :            :                                           acquired_credits,
      51                 :            :                                           rte_memory_order_relaxed) +
      52                 :            :                                           acquired_credits;
      53                 :            : 
      54         [ #  # ]:          0 :         if (unlikely(new_total_on_loan > dsw->max_inflight)) {
      55                 :            :                 /* Some other port took the last credits */
      56                 :          0 :                 rte_atomic_fetch_sub_explicit(&dsw->credits_on_loan,
      57                 :            :                                               acquired_credits,
      58                 :            :                                               rte_memory_order_relaxed);
      59                 :          0 :                 return false;
      60                 :            :         }
      61                 :            : 
      62                 :            :         DSW_LOG_DP_PORT_LINE(DEBUG, port->id, "Acquired %d tokens from pool.",
      63                 :            :                         acquired_credits);
      64                 :            : 
      65                 :          0 :         port->inflight_credits += acquired_credits;
      66                 :          0 :         port->inflight_credits -= credits;
      67                 :            : 
      68                 :          0 :         return true;
      69                 :            : }
      70                 :            : 
      71                 :            : static void
      72                 :            : dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
      73                 :            :                         int32_t credits)
      74                 :            : {
      75                 :          0 :         port->inflight_credits += credits;
      76                 :            : 
      77   [ #  #  #  # ]:          0 :         if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
      78                 :            :                 int32_t leave_credits = DSW_PORT_MIN_CREDITS;
      79                 :          0 :                 int32_t return_credits =
      80                 :            :                         port->inflight_credits - leave_credits;
      81                 :            : 
      82                 :          0 :                 port->inflight_credits = leave_credits;
      83                 :            : 
      84                 :          0 :                 rte_atomic_fetch_sub_explicit(&dsw->credits_on_loan,
      85                 :            :                                               return_credits,
      86                 :            :                                               rte_memory_order_relaxed);
      87                 :            : 
      88                 :            :                 DSW_LOG_DP_PORT_LINE(DEBUG, port->id,
      89                 :            :                                 "Returned %d tokens to pool.",
      90                 :            :                                 return_credits);
      91                 :            :         }
      92                 :            : }
      93                 :            : 
      94                 :            : static void
      95                 :            : dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
      96                 :            :                        uint16_t num_forward, uint16_t num_release)
      97                 :            : {
      98                 :          0 :         port->new_enqueued += num_new;
      99                 :          0 :         port->forward_enqueued += num_forward;
     100                 :          0 :         port->release_enqueued += num_release;
     101                 :          0 : }
     102                 :            : 
     103                 :            : static void
     104                 :            : dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
     105                 :            : {
     106                 :          0 :         source_port->queue_enqueued[queue_id]++;
     107                 :            : }
     108                 :            : 
     109                 :            : static void
     110                 :            : dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
     111                 :            : {
     112                 :          0 :         port->dequeued += num;
     113                 :            : }
     114                 :            : 
     115                 :            : static void
     116                 :            : dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
     117                 :            : {
     118                 :          0 :         source_port->queue_dequeued[queue_id]++;
     119                 :            : }
     120                 :            : 
     121                 :            : static void
     122                 :            : dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
     123                 :            : {
     124         [ #  # ]:          0 :         if (dequeued > 0 && port->busy_start == 0)
     125                 :            :                 /* work period begins */
     126                 :          0 :                 port->busy_start = rte_get_timer_cycles();
     127   [ #  #  #  # ]:          0 :         else if (dequeued == 0 && port->busy_start > 0) {
     128                 :            :                 /* work period ends */
     129                 :          0 :                 uint64_t work_period =
     130                 :          0 :                         rte_get_timer_cycles() - port->busy_start;
     131                 :          0 :                 port->busy_cycles += work_period;
     132                 :          0 :                 port->busy_start = 0;
     133                 :            :         }
     134                 :            : }
     135                 :            : 
     136                 :            : static int16_t
     137                 :            : dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
     138                 :            : {
     139                 :          0 :         uint64_t passed = now - port->measurement_start;
     140                 :          0 :         uint64_t busy_cycles = port->busy_cycles;
     141                 :            : 
     142                 :          0 :         if (port->busy_start > 0) {
     143                 :          0 :                 busy_cycles += (now - port->busy_start);
     144                 :          0 :                 port->busy_start = now;
     145                 :            :         }
     146                 :            : 
     147                 :          0 :         int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
     148                 :            : 
     149                 :          0 :         port->measurement_start = now;
     150                 :          0 :         port->busy_cycles = 0;
     151                 :            : 
     152                 :          0 :         port->total_busy_cycles += busy_cycles;
     153                 :            : 
     154                 :            :         return load;
     155                 :            : }
     156                 :            : 
     157                 :            : static void
     158                 :          0 : dsw_port_load_update(struct dsw_port *port, uint64_t now)
     159                 :            : {
     160                 :            :         int16_t old_load;
     161                 :            :         int16_t period_load;
     162                 :            :         int16_t new_load;
     163                 :            : 
     164         [ #  # ]:          0 :         old_load = rte_atomic_load_explicit(&port->load,
     165                 :            :                                             rte_memory_order_relaxed);
     166                 :            : 
     167                 :            :         period_load = dsw_port_load_close_period(port, now);
     168                 :            : 
     169                 :          0 :         new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
     170                 :            :                 (DSW_OLD_LOAD_WEIGHT+1);
     171                 :            : 
     172                 :          0 :         rte_atomic_store_explicit(&port->load, new_load,
     173                 :            :                                   rte_memory_order_relaxed);
     174                 :            : 
     175                 :            :         /* The load of the recently immigrated flows should hopefully
     176                 :            :          * be reflected the load estimate by now.
     177                 :            :          */
     178                 :          0 :         rte_atomic_store_explicit(&port->immigration_load, 0,
     179                 :            :                                   rte_memory_order_relaxed);
     180                 :          0 : }
     181                 :            : 
     182                 :            : static void
     183                 :            : dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
     184                 :            : {
     185         [ #  # ]:          0 :         if (now < port->next_load_update)
     186                 :            :                 return;
     187                 :            : 
     188                 :          0 :         port->next_load_update = now + port->load_update_interval;
     189                 :            : 
     190                 :          0 :         dsw_port_load_update(port, now);
     191                 :            : }
     192                 :            : 
     193                 :            : static void
     194                 :          0 : dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
     195                 :            : {
     196                 :            :         /* there's always room on the ring */
     197   [ #  #  #  #  :          0 :         while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
                      # ]
     198                 :            :                 rte_pause();
     199                 :          0 : }
     200                 :            : 
     201                 :            : static int
     202                 :          0 : dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
     203                 :            : {
     204   [ #  #  #  #  :          0 :         return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
                      # ]
     205                 :            : }
     206                 :            : 
     207                 :            : static void
     208                 :          0 : dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
     209                 :            :                        uint8_t type, struct dsw_queue_flow *qfs,
     210                 :            :                        uint8_t qfs_len)
     211                 :            : {
     212                 :            :         uint16_t port_id;
     213                 :          0 :         struct dsw_ctl_msg msg = {
     214                 :            :                 .type = type,
     215                 :          0 :                 .originating_port_id = source_port->id,
     216                 :            :                 .qfs_len = qfs_len
     217                 :            :         };
     218                 :            : 
     219                 :          0 :         memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
     220                 :            : 
     221         [ #  # ]:          0 :         for (port_id = 0; port_id < dsw->num_ports; port_id++)
     222         [ #  # ]:          0 :                 if (port_id != source_port->id)
     223                 :          0 :                         dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
     224                 :          0 : }
     225                 :            : 
     226                 :            : static __rte_always_inline bool
     227                 :            : dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
     228                 :            :                          uint8_t queue_id, uint16_t flow_hash)
     229                 :            : {
     230                 :            :         uint16_t i;
     231                 :            : 
     232   [ #  #  #  #  :          0 :         for (i = 0; i < qfs_len; i++)
          #  #  #  #  #  
                      # ]
     233   [ #  #  #  #  :          0 :                 if (qfs[i].queue_id == queue_id &&
          #  #  #  #  #  
                      # ]
     234   [ #  #  #  #  :          0 :                     qfs[i].flow_hash == flow_hash)
          #  #  #  #  #  
                      # ]
     235                 :            :                         return true;
     236                 :            : 
     237                 :            :         return false;
     238                 :            : }
     239                 :            : 
     240                 :            : static __rte_always_inline bool
     241                 :            : dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
     242                 :            :                         uint16_t flow_hash)
     243                 :            : {
     244                 :          0 :         return dsw_is_queue_flow_in_ary(port->paused_flows,
     245                 :          0 :                                         port->paused_flows_len,
     246                 :            :                                         queue_id, flow_hash);
     247                 :            : }
     248                 :            : 
     249                 :            : static __rte_always_inline bool
     250                 :            : dsw_port_is_flow_migrating(struct dsw_port *port, uint8_t queue_id,
     251                 :            :                            uint16_t flow_hash)
     252                 :            : {
     253                 :          0 :         return dsw_is_queue_flow_in_ary(port->emigration_target_qfs,
     254                 :          0 :                                         port->emigration_targets_len,
     255                 :            :                                         queue_id, flow_hash);
     256                 :            : }
     257                 :            : 
     258                 :            : static void
     259                 :            : dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
     260                 :            :                           uint8_t qfs_len)
     261                 :            : {
     262                 :            :         uint8_t i;
     263                 :            : 
     264   [ #  #  #  # ]:          0 :         for (i = 0; i < qfs_len; i++) {
     265                 :          0 :                 struct dsw_queue_flow *qf = &qfs[i];
     266                 :            : 
     267                 :            :                 DSW_LOG_DP_PORT_LINE(DEBUG, port->id,
     268                 :            :                                 "Pausing queue_id %d flow_hash %d.",
     269                 :            :                                 qf->queue_id, qf->flow_hash);
     270                 :            : 
     271                 :          0 :                 port->paused_flows[port->paused_flows_len] = *qf;
     272                 :          0 :                 port->paused_flows_len++;
     273                 :            :         };
     274                 :            : }
     275                 :            : 
     276                 :            : static void
     277                 :          0 : dsw_port_remove_paused_flow(struct dsw_port *port,
     278                 :            :                             const struct dsw_queue_flow *target_qf)
     279                 :            : {
     280                 :            :         uint16_t i;
     281                 :            : 
     282         [ #  # ]:          0 :         for (i = 0; i < port->paused_flows_len; i++) {
     283                 :          0 :                 struct dsw_queue_flow *qf = &port->paused_flows[i];
     284                 :            : 
     285         [ #  # ]:          0 :                 if (qf->queue_id == target_qf->queue_id &&
     286         [ #  # ]:          0 :                     qf->flow_hash == target_qf->flow_hash) {
     287                 :          0 :                         uint16_t last_idx = port->paused_flows_len-1;
     288         [ #  # ]:          0 :                         if (i != last_idx)
     289                 :          0 :                                 port->paused_flows[i] =
     290                 :          0 :                                         port->paused_flows[last_idx];
     291                 :          0 :                         port->paused_flows_len--;
     292                 :            : 
     293                 :            :                         DSW_LOG_DP_PORT_LINE(DEBUG, port->id,
     294                 :            :                                         "Unpausing queue_id %d flow_hash %d.",
     295                 :            :                                         target_qf->queue_id,
     296                 :            :                                         target_qf->flow_hash);
     297                 :            : 
     298                 :            :                         return;
     299                 :            :                 }
     300                 :            :         }
     301                 :            : 
     302                 :          0 :         DSW_LOG_DP_PORT_LINE(ERR, port->id,
     303                 :            :                         "Failed to unpause queue_id %d flow_hash %d.",
     304                 :            :                         target_qf->queue_id, target_qf->flow_hash);
     305                 :          0 :         RTE_VERIFY(0);
     306                 :            : }
     307                 :            : 
     308                 :            : static void
     309                 :            : dsw_port_remove_paused_flows(struct dsw_port *port,
     310                 :            :                              struct dsw_queue_flow *qfs, uint8_t qfs_len)
     311                 :            : {
     312                 :            :         uint8_t i;
     313                 :            : 
     314   [ #  #  #  # ]:          0 :         for (i = 0; i < qfs_len; i++)
     315                 :          0 :                 dsw_port_remove_paused_flow(port, &qfs[i]);
     316                 :            : }
     317                 :            : 
     318                 :            : static void
     319                 :            : dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
     320                 :            : 
     321                 :            : static void
     322                 :          0 : dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
     323                 :            :                             uint8_t originating_port_id,
     324                 :            :                             struct dsw_queue_flow *paused_qfs,
     325                 :            :                             uint8_t qfs_len)
     326                 :            : {
     327                 :          0 :         struct dsw_ctl_msg cfm = {
     328                 :            :                 .type = DSW_CTL_CFM,
     329                 :          0 :                 .originating_port_id = port->id
     330                 :            :         };
     331                 :            : 
     332                 :            :         /* There might be already-scheduled events belonging to the
     333                 :            :          * paused flow in the output buffers.
     334                 :            :          */
     335                 :            :         dsw_port_flush_out_buffers(dsw, port);
     336                 :            : 
     337                 :            :         dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
     338                 :            : 
     339                 :            :         /* Make sure any stores to the original port's in_ring is seen
     340                 :            :          * before the ctl message.
     341                 :            :          */
     342                 :          0 :         rte_smp_wmb();
     343                 :            : 
     344                 :          0 :         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
     345                 :          0 : }
     346                 :            : 
     347                 :            : struct dsw_queue_flow_burst {
     348                 :            :         struct dsw_queue_flow queue_flow;
     349                 :            :         uint16_t count;
     350                 :            : };
     351                 :            : 
     352                 :            : #define DSW_QF_TO_INT(_qf)                                      \
     353                 :            :         ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
     354                 :            : 
     355                 :            : static inline int
     356                 :          0 : dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
     357                 :            : {
     358                 :            :         const struct dsw_queue_flow *qf_a = v_qf_a;
     359                 :            :         const struct dsw_queue_flow *qf_b = v_qf_b;
     360                 :            : 
     361                 :          0 :         return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
     362                 :            : }
     363                 :            : 
     364                 :            : static uint16_t
     365                 :          0 : dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
     366                 :            :                        struct dsw_queue_flow_burst *bursts)
     367                 :            : {
     368                 :            :         uint16_t i;
     369                 :            :         struct dsw_queue_flow_burst *current_burst = NULL;
     370                 :            :         uint16_t num_bursts = 0;
     371                 :            : 
     372                 :            :         /* We don't need the stable property, and the list is likely
     373                 :            :          * large enough for qsort() to outperform dsw_stable_sort(),
     374                 :            :          * so we use qsort() here.
     375                 :            :          */
     376                 :          0 :         qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
     377                 :            : 
     378                 :            :         /* arrange the (now-consecutive) events into bursts */
     379         [ #  # ]:          0 :         for (i = 0; i < qfs_len; i++) {
     380         [ #  # ]:          0 :                 if (i == 0 ||
     381         [ #  # ]:          0 :                     dsw_cmp_qf(&qfs[i], &current_burst->queue_flow) != 0) {
     382                 :          0 :                         current_burst = &bursts[num_bursts];
     383                 :          0 :                         current_burst->queue_flow = qfs[i];
     384                 :          0 :                         current_burst->count = 0;
     385                 :          0 :                         num_bursts++;
     386                 :            :                 }
     387                 :          0 :                 current_burst->count++;
     388                 :            :         }
     389                 :            : 
     390                 :          0 :         return num_bursts;
     391                 :            : }
     392                 :            : 
     393                 :            : static bool
     394                 :          0 : dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
     395                 :            :                         int16_t load_limit)
     396                 :            : {
     397                 :            :         bool below_limit = false;
     398                 :            :         uint16_t i;
     399                 :            : 
     400         [ #  # ]:          0 :         for (i = 0; i < dsw->num_ports; i++) {
     401                 :          0 :                 int16_t measured_load =
     402                 :          0 :                         rte_atomic_load_explicit(&dsw->ports[i].load,
     403                 :            :                                                  rte_memory_order_relaxed);
     404                 :          0 :                 int32_t immigration_load =
     405                 :          0 :                         rte_atomic_load_explicit(&dsw->ports[i].immigration_load,
     406                 :            :                                                  rte_memory_order_relaxed);
     407                 :          0 :                 int32_t load = measured_load + immigration_load;
     408                 :            : 
     409                 :          0 :                 load = RTE_MIN(load, DSW_MAX_LOAD);
     410                 :            : 
     411         [ #  # ]:          0 :                 if (load < load_limit)
     412                 :            :                         below_limit = true;
     413                 :          0 :                 port_loads[i] = load;
     414                 :            :         }
     415                 :          0 :         return below_limit;
     416                 :            : }
     417                 :            : 
     418                 :            : static int16_t
     419                 :            : dsw_flow_load(uint16_t num_events, int16_t port_load)
     420                 :            : {
     421                 :          0 :         return ((int32_t)port_load * (int32_t)num_events) /
     422                 :            :                 DSW_MAX_EVENTS_RECORDED;
     423                 :            : }
     424                 :            : 
     425                 :            : static int16_t
     426                 :            : dsw_evaluate_migration(int16_t source_load, int16_t target_load,
     427                 :            :                        int16_t flow_load)
     428                 :            : {
     429                 :            :         int32_t res_target_load;
     430                 :            :         int32_t imbalance;
     431                 :            : 
     432                 :          0 :         if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
     433                 :            :                 return -1;
     434                 :            : 
     435                 :          0 :         imbalance = source_load - target_load;
     436                 :            : 
     437         [ #  # ]:          0 :         if (imbalance < DSW_REBALANCE_THRESHOLD)
     438                 :            :                 return -1;
     439                 :            : 
     440                 :          0 :         res_target_load = target_load + flow_load;
     441                 :            : 
     442                 :            :         /* If the estimated load of the target port will be higher
     443                 :            :          * than the source port's load, it doesn't make sense to move
     444                 :            :          * the flow.
     445                 :            :          */
     446         [ #  # ]:          0 :         if (res_target_load > source_load)
     447                 :            :                 return -1;
     448                 :            : 
     449                 :            :         /* The more idle the target will be, the better. This will
     450                 :            :          * make migration prefer moving smaller flows, and flows to
     451                 :            :          * lightly loaded ports.
     452                 :            :          */
     453                 :          0 :         return DSW_MAX_LOAD - res_target_load;
     454                 :            : }
     455                 :            : 
     456                 :            : static bool
     457                 :            : dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
     458                 :            : {
     459                 :            :         struct dsw_queue *queue = &dsw->queues[queue_id];
     460                 :            : 
     461                 :          0 :         return rte_bitset_test(queue->serving_ports, port_id);
     462                 :            : }
     463                 :            : 
     464                 :            : static bool
     465                 :          0 : dsw_select_emigration_target(struct dsw_evdev *dsw,
     466                 :            :                              struct dsw_port *source_port,
     467                 :            :                              struct dsw_queue_flow_burst *bursts,
     468                 :            :                              uint16_t num_bursts,
     469                 :            :                              int16_t *port_loads, uint16_t num_ports,
     470                 :            :                              uint8_t *target_port_ids,
     471                 :            :                              struct dsw_queue_flow *target_qfs,
     472                 :            :                              uint8_t *targets_len)
     473                 :            : {
     474                 :          0 :         int16_t source_port_load = port_loads[source_port->id];
     475                 :            :         struct dsw_queue_flow *candidate_qf = NULL;
     476                 :            :         uint8_t candidate_port_id = 0;
     477                 :            :         int16_t candidate_weight = -1;
     478                 :            :         int16_t candidate_flow_load = -1;
     479                 :            :         uint16_t i;
     480                 :            : 
     481         [ #  # ]:          0 :         if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
     482                 :            :                 return false;
     483                 :            : 
     484         [ #  # ]:          0 :         for (i = 0; i < num_bursts; i++) {
     485                 :          0 :                 struct dsw_queue_flow_burst *burst = &bursts[i];
     486                 :          0 :                 struct dsw_queue_flow *qf = &burst->queue_flow;
     487                 :            :                 int16_t flow_load;
     488                 :            :                 uint16_t port_id;
     489                 :            : 
     490         [ #  # ]:          0 :                 if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
     491                 :          0 :                                              qf->queue_id, qf->flow_hash))
     492                 :          0 :                         continue;
     493                 :            : 
     494                 :          0 :                 flow_load = dsw_flow_load(burst->count, source_port_load);
     495                 :            : 
     496         [ #  # ]:          0 :                 for (port_id = 0; port_id < num_ports; port_id++) {
     497                 :            :                         int16_t weight;
     498                 :            : 
     499         [ #  # ]:          0 :                         if (port_id == source_port->id)
     500                 :          0 :                                 continue;
     501                 :            : 
     502         [ #  # ]:          0 :                         if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
     503                 :          0 :                                 continue;
     504                 :            : 
     505                 :          0 :                         weight = dsw_evaluate_migration(source_port_load,
     506         [ #  # ]:          0 :                                                         port_loads[port_id],
     507                 :            :                                                         flow_load);
     508                 :            : 
     509         [ #  # ]:          0 :                         if (weight > candidate_weight) {
     510                 :            :                                 candidate_qf = qf;
     511                 :            :                                 candidate_port_id = port_id;
     512                 :            :                                 candidate_weight = weight;
     513                 :            :                                 candidate_flow_load = flow_load;
     514                 :            :                         }
     515                 :            :                 }
     516                 :            :         }
     517                 :            : 
     518         [ #  # ]:          0 :         if (candidate_weight < 0)
     519                 :            :                 return false;
     520                 :            : 
     521                 :            :         DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "Selected queue_id %d "
     522                 :            :                         "flow_hash %d (with flow load %d) for migration "
     523                 :            :                         "to port %d.", candidate_qf->queue_id,
     524                 :            :                         candidate_qf->flow_hash,
     525                 :            :                         DSW_LOAD_TO_PERCENT(candidate_flow_load),
     526                 :            :                         candidate_port_id);
     527                 :            : 
     528                 :          0 :         port_loads[candidate_port_id] += candidate_flow_load;
     529                 :          0 :         port_loads[source_port->id] -= candidate_flow_load;
     530                 :            : 
     531                 :          0 :         target_port_ids[*targets_len] = candidate_port_id;
     532                 :          0 :         target_qfs[*targets_len] = *candidate_qf;
     533                 :          0 :         (*targets_len)++;
     534                 :            : 
     535                 :          0 :         rte_atomic_fetch_add_explicit(
     536                 :            :                                 &dsw->ports[candidate_port_id].immigration_load,
     537                 :            :                                       candidate_flow_load,
     538                 :            :                                       rte_memory_order_relaxed);
     539                 :            : 
     540                 :          0 :         return true;
     541                 :            : }
     542                 :            : 
     543                 :            : static void
     544                 :          0 : dsw_select_emigration_targets(struct dsw_evdev *dsw,
     545                 :            :                               struct dsw_port *source_port,
     546                 :            :                               struct dsw_queue_flow_burst *bursts,
     547                 :            :                               uint16_t num_bursts, int16_t *port_loads)
     548                 :            : {
     549                 :          0 :         struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
     550                 :          0 :         uint8_t *target_port_ids = source_port->emigration_target_port_ids;
     551                 :          0 :         uint8_t *targets_len = &source_port->emigration_targets_len;
     552                 :            :         uint16_t i;
     553                 :            : 
     554         [ #  # ]:          0 :         for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
     555                 :            :                 bool found;
     556                 :            : 
     557                 :          0 :                 found = dsw_select_emigration_target(dsw, source_port,
     558                 :            :                                                      bursts, num_bursts,
     559                 :          0 :                                                      port_loads, dsw->num_ports,
     560                 :            :                                                      target_port_ids,
     561                 :            :                                                      target_qfs,
     562                 :            :                                                      targets_len);
     563         [ #  # ]:          0 :                 if (!found)
     564                 :            :                         break;
     565                 :            :         }
     566                 :            : 
     567                 :            :         if (*targets_len == 0)
     568                 :            :                 DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id,
     569                 :            :                                 "For the %d flows considered, no target port "
     570                 :            :                                 "was found.", num_bursts);
     571                 :          0 : }
     572                 :            : 
     573                 :            : static uint8_t
     574                 :          0 : dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
     575                 :            : {
     576                 :          0 :         struct dsw_queue *queue = &dsw->queues[queue_id];
     577                 :            :         uint8_t port_id;
     578                 :            : 
     579         [ #  # ]:          0 :         if (queue->num_serving_ports > 1)
     580                 :          0 :                 port_id = queue->flow_to_port_map[flow_hash];
     581                 :            :         else
     582                 :            :                 /* A single-link queue, or atomic/ordered/parallel but
     583                 :            :                  * with just a single serving port.
     584                 :            :                  */
     585                 :          0 :                 port_id = (uint8_t)rte_bitset_find_first_set(
     586                 :          0 :                         queue->serving_ports, DSW_MAX_PORTS
     587                 :            :                 );
     588                 :            : 
     589                 :            :         DSW_LOG_DP_LINE(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
     590                 :            :                    "to port %d.", queue_id, flow_hash, port_id);
     591                 :            : 
     592                 :          0 :         return port_id;
     593                 :            : }
     594                 :            : 
     595                 :            : static void
     596                 :          0 : dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
     597                 :            :                            uint8_t dest_port_id)
     598                 :            : {
     599                 :          0 :         struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
     600                 :            :         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
     601                 :          0 :         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
     602                 :            : 
     603         [ #  # ]:          0 :         if (*buffer_len == 0)
     604                 :            :                 return;
     605                 :            : 
     606                 :            :         /* The rings are dimensioned to fit all in-flight events (even
     607                 :            :          * on a single ring).
     608                 :            :          */
     609   [ #  #  #  #  :          0 :         rte_event_ring_enqueue_bulk(dest_port->in_ring, buffer, *buffer_len,
                      # ]
     610                 :            :                                     NULL);
     611                 :            : 
     612                 :          0 :         (*buffer_len) = 0;
     613                 :            : }
     614                 :            : 
     615                 :            : static uint16_t
     616                 :            : dsw_port_get_parallel_flow_id(struct dsw_port *port)
     617                 :            : {
     618                 :          0 :         uint16_t flow_id = port->next_parallel_flow_id;
     619                 :            : 
     620                 :          0 :         port->next_parallel_flow_id =
     621                 :          0 :                 (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
     622                 :            : 
     623                 :            :         return flow_id;
     624                 :            : }
     625                 :            : 
     626                 :            : static void
     627                 :            : dsw_port_buffer_paused(struct dsw_port *port,
     628                 :            :                        const struct rte_event *paused_event)
     629                 :            : {
     630                 :          0 :         port->paused_events[port->paused_events_len] = *paused_event;
     631                 :          0 :         port->paused_events_len++;
     632                 :          0 : }
     633                 :            : 
     634                 :            : 
     635                 :            : static void
     636                 :          0 : dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
     637                 :            :                            uint8_t dest_port_id, const struct rte_event *event)
     638                 :            : {
     639                 :          0 :         struct rte_event *buffer = source_port->out_buffer[dest_port_id];
     640                 :            :         uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
     641                 :            : 
     642         [ #  # ]:          0 :         if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
     643                 :          0 :                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
     644                 :            : 
     645                 :          0 :         buffer[*buffer_len] = *event;
     646                 :            : 
     647                 :          0 :         (*buffer_len)++;
     648                 :          0 : }
     649                 :            : 
     650                 :            : #define DSW_FLOW_ID_BITS (24)
     651                 :            : static uint16_t
     652                 :            : dsw_flow_id_hash(uint32_t flow_id)
     653                 :            : {
     654                 :            :         uint16_t hash = 0;
     655                 :            :         uint16_t offset = 0;
     656                 :            : 
     657                 :            :         do {
     658                 :          0 :                 hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
     659                 :          0 :                 offset += DSW_MAX_FLOWS_BITS;
     660   [ #  #  #  #  :          0 :         } while (offset < DSW_FLOW_ID_BITS);
          #  #  #  #  #  
             #  #  #  #  
                      # ]
     661                 :            : 
     662                 :            :         return hash;
     663                 :            : }
     664                 :            : 
     665                 :            : static void
     666                 :          0 : dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
     667                 :            :                          struct rte_event event)
     668                 :            : {
     669                 :            :         uint8_t dest_port_id;
     670                 :            : 
     671                 :          0 :         event.flow_id = dsw_port_get_parallel_flow_id(source_port);
     672                 :            : 
     673                 :          0 :         dest_port_id = dsw_schedule(dsw, event.queue_id,
     674                 :          0 :                                     dsw_flow_id_hash(event.flow_id));
     675                 :            : 
     676                 :          0 :         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
     677                 :          0 : }
     678                 :            : 
     679                 :            : static void
     680                 :          0 : dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
     681                 :            :                       const struct rte_event *event)
     682                 :            : {
     683                 :            :         uint16_t flow_hash;
     684                 :            :         uint8_t dest_port_id;
     685                 :            : 
     686         [ #  # ]:          0 :         if (unlikely(dsw->queues[event->queue_id].schedule_type ==
     687                 :            :                      RTE_SCHED_TYPE_PARALLEL)) {
     688                 :          0 :                 dsw_port_buffer_parallel(dsw, source_port, *event);
     689                 :          0 :                 return;
     690                 :            :         }
     691                 :            : 
     692                 :          0 :         flow_hash = dsw_flow_id_hash(event->flow_id);
     693                 :            : 
     694         [ #  # ]:          0 :         if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
     695                 :            :                                              flow_hash))) {
     696                 :            :                 dsw_port_buffer_paused(source_port, event);
     697                 :          0 :                 return;
     698                 :            :         }
     699                 :            : 
     700                 :          0 :         dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
     701                 :            : 
     702                 :          0 :         dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
     703                 :            : }
     704                 :            : 
     705                 :            : static void
     706                 :          0 : dsw_port_flush_no_longer_paused_events(struct dsw_evdev *dsw,
     707                 :            :                                        struct dsw_port *source_port)
     708                 :          0 : {
     709                 :          0 :         uint16_t paused_events_len = source_port->paused_events_len;
     710                 :          0 :         struct rte_event paused_events[paused_events_len];
     711                 :            :         uint16_t i;
     712                 :            : 
     713         [ #  # ]:          0 :         if (paused_events_len == 0)
     714                 :          0 :                 return;
     715                 :            : 
     716         [ #  # ]:          0 :         rte_memcpy(paused_events, source_port->paused_events,
     717                 :            :                    paused_events_len * sizeof(struct rte_event));
     718                 :            : 
     719                 :          0 :         source_port->paused_events_len = 0;
     720                 :            : 
     721         [ #  # ]:          0 :         for (i = 0; i < paused_events_len; i++) {
     722                 :          0 :                 struct rte_event *event = &paused_events[i];
     723                 :            :                 uint16_t flow_hash;
     724                 :            : 
     725                 :          0 :                 flow_hash = dsw_flow_id_hash(event->flow_id);
     726                 :            : 
     727         [ #  # ]:          0 :                 if (dsw_port_is_flow_paused(source_port, event->queue_id,
     728                 :            :                                             flow_hash))
     729                 :            :                         dsw_port_buffer_paused(source_port, event);
     730                 :            :                 else {
     731                 :            :                         uint8_t dest_port_id;
     732                 :            : 
     733                 :          0 :                         dest_port_id = dsw_schedule(dsw, event->queue_id,
     734                 :            :                                                     flow_hash);
     735                 :            : 
     736                 :          0 :                         dsw_port_buffer_non_paused(dsw, source_port,
     737                 :            :                                                    dest_port_id, event);
     738                 :            :                 }
     739                 :            :         }
     740                 :            : }
     741                 :            : 
     742                 :            : static void
     743                 :            : dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
     744                 :            : {
     745                 :            :         uint64_t flow_migration_latency;
     746                 :            : 
     747                 :          0 :         flow_migration_latency =
     748                 :          0 :                 (rte_get_timer_cycles() - port->emigration_start);
     749                 :          0 :         port->emigration_latency += (flow_migration_latency * finished);
     750                 :          0 :         port->emigrations += finished;
     751                 :          0 : }
     752                 :            : 
     753                 :            : static void
     754                 :          0 : dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
     755                 :            :                         uint8_t schedule_type)
     756                 :            : {
     757                 :            :         uint8_t i;
     758                 :            :         struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
     759                 :            :         uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
     760                 :            :         uint8_t left_qfs_len = 0;
     761                 :            :         uint8_t finished;
     762                 :            : 
     763         [ #  # ]:          0 :         for (i = 0; i < port->emigration_targets_len; i++) {
     764                 :          0 :                 struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
     765                 :          0 :                 uint8_t queue_id = qf->queue_id;
     766                 :          0 :                 uint8_t queue_schedule_type =
     767                 :          0 :                         dsw->queues[queue_id].schedule_type;
     768                 :            :                 uint16_t flow_hash = qf->flow_hash;
     769                 :            : 
     770         [ #  # ]:          0 :                 if (queue_schedule_type != schedule_type) {
     771                 :          0 :                         left_port_ids[left_qfs_len] =
     772                 :          0 :                                 port->emigration_target_port_ids[i];
     773                 :          0 :                         left_qfs[left_qfs_len] = *qf;
     774                 :          0 :                         left_qfs_len++;
     775                 :          0 :                         continue;
     776                 :            :                 }
     777                 :            : 
     778                 :            :                 DSW_LOG_DP_PORT_LINE(DEBUG, port->id, "Migration completed for "
     779                 :            :                                 "queue_id %d flow_hash %d.", queue_id,
     780                 :            :                                 flow_hash);
     781                 :            :         }
     782                 :            : 
     783                 :          0 :         finished = port->emigration_targets_len - left_qfs_len;
     784                 :            : 
     785         [ #  # ]:          0 :         if (finished > 0)
     786                 :            :                 dsw_port_emigration_stats(port, finished);
     787                 :            : 
     788         [ #  # ]:          0 :         for (i = 0; i < left_qfs_len; i++) {
     789                 :          0 :                 port->emigration_target_port_ids[i] = left_port_ids[i];
     790                 :          0 :                 port->emigration_target_qfs[i] = left_qfs[i];
     791                 :            :         }
     792                 :          0 :         port->emigration_targets_len = left_qfs_len;
     793                 :            : 
     794         [ #  # ]:          0 :         if (port->emigration_targets_len == 0) {
     795                 :          0 :                 port->migration_state = DSW_MIGRATION_STATE_IDLE;
     796                 :          0 :                 port->emigration_targets_len = 0;
     797                 :          0 :                 port->seen_events_len = 0;
     798                 :            :         }
     799                 :          0 : }
     800                 :            : 
     801                 :            : static void
     802                 :          0 : dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
     803                 :            :                              struct dsw_port *source_port)
     804                 :            : {
     805                 :            :         uint8_t i;
     806                 :            : 
     807         [ #  # ]:          0 :         for (i = 0; i < source_port->emigration_targets_len; i++) {
     808                 :            :                 struct dsw_queue_flow *qf =
     809                 :          0 :                         &source_port->emigration_target_qfs[i];
     810                 :          0 :                 uint8_t queue_id = qf->queue_id;
     811                 :            : 
     812         [ #  # ]:          0 :                 if (dsw->queues[queue_id].schedule_type ==
     813                 :            :                     RTE_SCHED_TYPE_PARALLEL) {
     814                 :          0 :                         uint8_t dest_port_id =
     815                 :            :                                 source_port->emigration_target_port_ids[i];
     816                 :          0 :                         uint16_t flow_hash = qf->flow_hash;
     817                 :            : 
     818                 :            :                         /* Single byte-sized stores are always atomic. */
     819                 :          0 :                         dsw->queues[queue_id].flow_to_port_map[flow_hash] =
     820                 :            :                                 dest_port_id;
     821                 :            :                 }
     822                 :            :         }
     823                 :            : 
     824                 :          0 :         rte_smp_wmb();
     825                 :            : 
     826                 :          0 :         dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
     827                 :          0 : }
     828                 :            : 
     829                 :            : static void
     830                 :          0 : dsw_port_consider_emigration(struct dsw_evdev *dsw,
     831                 :            :                              struct dsw_port *source_port,
     832                 :            :                              uint64_t now)
     833                 :          0 : {
     834                 :            :         bool any_port_below_limit;
     835                 :          0 :         struct dsw_queue_flow *seen_events = source_port->seen_events;
     836                 :          0 :         uint16_t seen_events_len = source_port->seen_events_len;
     837                 :            :         struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
     838                 :            :         uint16_t num_bursts;
     839                 :            :         int16_t source_port_load;
     840                 :          0 :         int16_t port_loads[dsw->num_ports];
     841                 :            : 
     842         [ #  # ]:          0 :         if (now < source_port->next_emigration)
     843                 :          0 :                 return;
     844                 :            : 
     845         [ #  # ]:          0 :         if (dsw->num_ports == 1)
     846                 :            :                 return;
     847                 :            : 
     848                 :            :         DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "Considering emigration.");
     849                 :            : 
     850                 :            :         /* For simplicity, postpone migration if there are still
     851                 :            :          * events to consume in the in_buffer (from the last
     852                 :            :          * emigration).
     853                 :            :          */
     854         [ #  # ]:          0 :         if (source_port->in_buffer_len > 0) {
     855                 :            :                 DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "There are still "
     856                 :            :                                 "events in the input buffer.");
     857                 :            :                 return;
     858                 :            :         }
     859                 :            : 
     860         [ #  # ]:          0 :         if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
     861                 :            :                 DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id,
     862                 :            :                                 "Emigration already in progress.");
     863                 :            :                 return;
     864                 :            :         }
     865                 :            : 
     866         [ #  # ]:          0 :         if (seen_events_len < DSW_MAX_EVENTS_RECORDED) {
     867                 :            :                 DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "Not enough events "
     868                 :            :                                 "are recorded to allow for a migration.");
     869                 :            :                 return;
     870                 :            :         }
     871                 :            : 
     872                 :            :         /* Postpone migration considering in case paused events exists, since
     873                 :            :          * such events may prevent the migration procedure from completing,
     874                 :            :          * leading to wasted CPU cycles (e.g., sorting queue flows).
     875                 :            :          */
     876         [ #  # ]:          0 :         if (source_port->paused_events_len > 0) {
     877                 :            :                 DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "Paused events on "
     878                 :            :                                 "port. Postponing any migrations.");
     879                 :            :                 return;
     880                 :            :         }
     881                 :            : 
     882                 :            :         /* Randomize interval to avoid having all threads considering
     883                 :            :          * emigration at the same in point in time, which might lead
     884                 :            :          * to all choosing the same target port.
     885                 :            :          */
     886                 :          0 :         source_port->next_emigration = now +
     887                 :          0 :                 source_port->migration_interval / 2 +
     888                 :          0 :                 rte_rand_max(source_port->migration_interval);
     889                 :            : 
     890                 :          0 :         source_port_load =
     891                 :          0 :                 rte_atomic_load_explicit(&source_port->load,
     892                 :            :                                          rte_memory_order_relaxed);
     893         [ #  # ]:          0 :         if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
     894                 :            :                 DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id,
     895                 :            :                       "Load %d is below threshold level %d.",
     896                 :            :                       DSW_LOAD_TO_PERCENT(source_port_load),
     897                 :            :                       DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
     898                 :            :                 return;
     899                 :            :         }
     900                 :            : 
     901                 :            :         /* Avoid starting any expensive operations (sorting etc), in
     902                 :            :          * case of a scenario with all ports above the load limit.
     903                 :            :          */
     904                 :            :         any_port_below_limit =
     905                 :          0 :                 dsw_retrieve_port_loads(dsw, port_loads,
     906                 :            :                                         DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
     907         [ #  # ]:          0 :         if (!any_port_below_limit) {
     908                 :            :                 DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id,
     909                 :            :                                 "Candidate target ports are all too highly "
     910                 :            :                                 "loaded.");
     911                 :            :                 return;
     912                 :            :         }
     913                 :            : 
     914                 :          0 :         num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
     915                 :            :                                             bursts);
     916                 :            : 
     917                 :            :         /* For non-big-little systems, there's no point in moving the
     918                 :            :          * only (known) flow.
     919                 :            :          */
     920         [ #  # ]:          0 :         if (num_bursts < 2) {
     921                 :            :                 DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "Only a single flow "
     922                 :            :                                 "queue_id %d flow_hash %d has been seen.",
     923                 :            :                                 bursts[0].queue_flow.queue_id,
     924                 :            :                                 bursts[0].queue_flow.flow_hash);
     925                 :            :                 return;
     926                 :            :         }
     927                 :            : 
     928                 :          0 :         dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
     929                 :            :                                       port_loads);
     930                 :            : 
     931         [ #  # ]:          0 :         if (source_port->emigration_targets_len == 0)
     932                 :            :                 return;
     933                 :            : 
     934                 :          0 :         source_port->migration_state = DSW_MIGRATION_STATE_FINISH_PENDING;
     935                 :          0 :         source_port->emigration_start = rte_get_timer_cycles();
     936                 :            : 
     937                 :            :         /* No need to go through the whole pause procedure for
     938                 :            :          * parallel queues, since atomic/ordered semantics need not to
     939                 :            :          * be maintained.
     940                 :            :          */
     941                 :          0 :         dsw_port_move_parallel_flows(dsw, source_port);
     942                 :            : }
     943                 :            : 
     944                 :            : static void
     945                 :          0 : dsw_port_abort_migration(struct dsw_port *source_port)
     946                 :            : {
     947                 :            :         RTE_ASSERT(source_port->in_buffer_start == 0);
     948                 :            :         RTE_ASSERT(source_port->in_buffer_len == 0);
     949                 :            : 
     950                 :            :         /* Putting the stashed events in the in_buffer makes sure they
     951                 :            :          * are processed before any events on the in_ring, to avoid
     952                 :            :          * reordering.
     953                 :            :          */
     954                 :          0 :         rte_memcpy(source_port->in_buffer, source_port->emigrating_events,
     955         [ #  # ]:          0 :                  source_port->emigrating_events_len * sizeof(struct rte_event));
     956                 :          0 :         source_port->in_buffer_len = source_port->emigrating_events_len;
     957                 :          0 :         source_port->emigrating_events_len = 0;
     958                 :            : 
     959                 :          0 :         source_port->emigration_targets_len = 0;
     960                 :            : 
     961                 :          0 :         source_port->migration_state = DSW_MIGRATION_STATE_IDLE;
     962                 :          0 : }
     963                 :            : 
     964                 :            : static void
     965                 :          0 : dsw_port_continue_emigration(struct dsw_evdev *dsw,
     966                 :            :                              struct dsw_port *source_port)
     967                 :            : {
     968                 :            :         /* A flow migration cannot be completed if there are paused
     969                 :            :          * events, since some/all of those events may be have been
     970                 :            :          * produced as a result of processing the flow(s) selected for
     971                 :            :          * migration. Moving such a flow would potentially introduced
     972                 :            :          * reordering, since processing the migrated flow on the
     973                 :            :          * receiving flow may commence before the to-be-enqueued-to
     974                 :            :          * flows are unpaused, leading to paused events on the second
     975                 :            :          * port as well, destined for the same paused flow(s). When
     976                 :            :          * those flows are unpaused, the resulting events are
     977                 :            :          * delivered the owning port in an undefined order.
     978                 :            :          *
     979                 :            :          * Waiting for the events to be unpaused could lead to a
     980                 :            :          * deadlock, where two ports are both waiting for the other to
     981                 :            :          * unpause.
     982                 :            :          */
     983         [ #  # ]:          0 :         if (source_port->paused_events_len > 0) {
     984                 :            :                 DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "There are events in "
     985                 :            :                                 "the pause buffer. Aborting migration.");
     986                 :          0 :                 dsw_port_abort_migration(source_port);
     987                 :          0 :                 return;
     988                 :            :         }
     989                 :            : 
     990                 :          0 :         dsw_port_add_paused_flows(source_port,
     991                 :          0 :                                   source_port->emigration_target_qfs,
     992                 :          0 :                                   source_port->emigration_targets_len);
     993                 :            : 
     994                 :          0 :         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUSE_REQ,
     995                 :            :                                source_port->emigration_target_qfs,
     996                 :            :                                source_port->emigration_targets_len);
     997                 :          0 :         source_port->cfm_cnt = 0;
     998                 :            : 
     999                 :          0 :         source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
    1000                 :            : }
    1001                 :            : 
    1002                 :            : static void
    1003                 :            : dsw_port_try_finish_pending(struct dsw_evdev *dsw, struct dsw_port *source_port)
    1004                 :            : {
    1005   [ #  #  #  # ]:          0 :         if (unlikely(source_port->migration_state ==
    1006                 :            :                      DSW_MIGRATION_STATE_FINISH_PENDING &&
    1007                 :            :                      source_port->pending_releases == 0))
    1008                 :          0 :                 dsw_port_continue_emigration(dsw, source_port);
    1009                 :            : }
    1010                 :            : 
    1011                 :            : static void
    1012                 :            : dsw_port_flush_no_longer_paused_events(struct dsw_evdev *dsw,
    1013                 :            :                                        struct dsw_port *source_port);
    1014                 :            : 
    1015                 :            : static void
    1016                 :          0 : dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
    1017                 :            :                               uint8_t originating_port_id,
    1018                 :            :                               struct dsw_queue_flow *paused_qfs,
    1019                 :            :                               uint8_t qfs_len)
    1020                 :            : {
    1021                 :            :         uint16_t i;
    1022                 :          0 :         struct dsw_ctl_msg cfm = {
    1023                 :            :                 .type = DSW_CTL_CFM,
    1024                 :          0 :                 .originating_port_id = port->id
    1025                 :            :         };
    1026                 :            : 
    1027                 :            :         dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
    1028                 :            : 
    1029                 :          0 :         dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
    1030                 :            : 
    1031         [ #  # ]:          0 :         for (i = 0; i < qfs_len; i++) {
    1032                 :          0 :                 struct dsw_queue_flow *qf = &paused_qfs[i];
    1033                 :            : 
    1034         [ #  # ]:          0 :                 if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
    1035                 :          0 :                         port->immigrations++;
    1036                 :            :         }
    1037                 :            : 
    1038                 :          0 :         dsw_port_flush_no_longer_paused_events(dsw, port);
    1039                 :          0 : }
    1040                 :            : 
    1041                 :            : static void
    1042                 :            : dsw_port_buffer_in_buffer(struct dsw_port *port,
    1043                 :            :                           const struct rte_event *event)
    1044                 :            : 
    1045                 :            : {
    1046                 :            :         RTE_ASSERT(port->in_buffer_start == 0);
    1047                 :            : 
    1048                 :          0 :         port->in_buffer[port->in_buffer_len] = *event;
    1049                 :          0 :         port->in_buffer_len++;
    1050                 :          0 : }
    1051                 :            : 
    1052                 :            : static void
    1053                 :            : dsw_port_stash_migrating_event(struct dsw_port *port,
    1054                 :            :                                const struct rte_event *event)
    1055                 :            : {
    1056                 :          0 :         port->emigrating_events[port->emigrating_events_len] = *event;
    1057                 :          0 :         port->emigrating_events_len++;
    1058                 :          0 : }
    1059                 :            : 
    1060                 :            : static void
    1061                 :          0 : dsw_port_stash_any_migrating_events(struct dsw_port *port,
    1062                 :            :                                     struct rte_event *events,
    1063                 :            :                                     uint16_t *num)
    1064                 :            : {
    1065                 :            :         uint16_t i;
    1066                 :            :         uint16_t offset = 0;
    1067                 :            : 
    1068         [ #  # ]:          0 :         for (i = 0; i < *num; i++) {
    1069                 :            :                 uint16_t flow_hash;
    1070                 :          0 :                 struct rte_event *in_event = &events[i];
    1071                 :            : 
    1072                 :          0 :                 flow_hash = dsw_flow_id_hash(in_event->flow_id);
    1073                 :            : 
    1074         [ #  # ]:          0 :                 if (unlikely(dsw_port_is_flow_migrating(port,
    1075                 :            :                                                         in_event->queue_id,
    1076                 :            :                                                         flow_hash))) {
    1077                 :            :                         dsw_port_stash_migrating_event(port, in_event);
    1078                 :          0 :                         offset++;
    1079         [ #  # ]:          0 :                 } else if (offset > 0) {
    1080         [ #  # ]:          0 :                         struct rte_event *out_event = &events[i - offset];
    1081                 :            :                         rte_memcpy(out_event, in_event,
    1082                 :            :                                    sizeof(struct rte_event));
    1083                 :            :                 }
    1084                 :            :         }
    1085                 :            : 
    1086                 :          0 :         *num -= offset;
    1087                 :          0 : }
    1088                 :            : 
    1089                 :            : #define DRAIN_DEQUEUE_BURST_SIZE (32)
    1090                 :            : 
    1091                 :            : static void
    1092                 :          0 : dsw_port_drain_in_ring(struct dsw_port *source_port)
    1093                 :            : {
    1094                 :          0 :         for (;;) {
    1095                 :            :                 struct rte_event events[DRAIN_DEQUEUE_BURST_SIZE];
    1096                 :            :                 uint16_t n;
    1097                 :            :                 uint16_t i;
    1098                 :            :                 uint16_t available;
    1099                 :            : 
    1100   [ #  #  #  #  :          0 :                 n = rte_event_ring_dequeue_burst(source_port->in_ring,
                      # ]
    1101                 :            :                                                  events,
    1102                 :            :                                                  DRAIN_DEQUEUE_BURST_SIZE,
    1103                 :            :                                                  &available);
    1104                 :            : 
    1105   [ #  #  #  # ]:          0 :                 if (n == 0 && available == 0)
    1106                 :            :                         break;
    1107                 :            : 
    1108         [ #  # ]:          0 :                 for (i = 0; i < n; i++) {
    1109                 :          0 :                         struct rte_event *event = &events[i];
    1110                 :            :                         uint16_t flow_hash;
    1111                 :            : 
    1112                 :          0 :                         flow_hash = dsw_flow_id_hash(event->flow_id);
    1113                 :            : 
    1114         [ #  # ]:          0 :                         if (unlikely(dsw_port_is_flow_migrating(source_port,
    1115                 :            :                                                                 event->queue_id,
    1116                 :            :                                                                 flow_hash)))
    1117                 :            :                                 dsw_port_stash_migrating_event(source_port,
    1118                 :            :                                                                event);
    1119                 :            :                         else
    1120                 :            :                                 dsw_port_buffer_in_buffer(source_port, event);
    1121                 :            :                 }
    1122                 :            :         }
    1123                 :          0 : }
    1124                 :            : 
    1125                 :            : static void
    1126                 :          0 : dsw_port_forward_emigrated_event(struct dsw_evdev *dsw,
    1127                 :            :                                  struct dsw_port *source_port,
    1128                 :            :                                  struct rte_event *event)
    1129                 :            : {
    1130                 :            :         uint16_t i;
    1131                 :            : 
    1132         [ #  # ]:          0 :         for (i = 0; i < source_port->emigration_targets_len; i++) {
    1133                 :            :                 struct dsw_queue_flow *qf =
    1134                 :          0 :                         &source_port->emigration_target_qfs[i];
    1135                 :          0 :                 uint8_t dest_port_id =
    1136                 :            :                         source_port->emigration_target_port_ids[i];
    1137                 :          0 :                 struct dsw_port *dest_port = &dsw->ports[dest_port_id];
    1138                 :            : 
    1139         [ #  # ]:          0 :                 if (event->queue_id == qf->queue_id &&
    1140         [ #  # ]:          0 :                     dsw_flow_id_hash(event->flow_id) == qf->flow_hash) {
    1141                 :            :                         /* No need to care about bursting forwarded
    1142                 :            :                          * events (to the destination port's in_ring),
    1143                 :            :                          * since migration doesn't happen very often,
    1144                 :            :                          * and also the majority of the dequeued
    1145                 :            :                          * events will likely *not* be forwarded.
    1146                 :            :                          */
    1147   [ #  #  #  #  :          0 :                         while (rte_event_ring_enqueue_burst(dest_port->in_ring,
                      # ]
    1148                 :            :                                                             event, 1,
    1149         [ #  # ]:          0 :                                                             NULL) != 1)
    1150                 :            :                                 rte_pause();
    1151                 :            :                         return;
    1152                 :            :                 }
    1153                 :            :         }
    1154                 :            : 
    1155                 :            :         /* Event did not belong to the emigrated flows */
    1156                 :            :         dsw_port_buffer_in_buffer(source_port, event);
    1157                 :            : }
    1158                 :            : 
    1159                 :            : static void
    1160                 :            : dsw_port_forward_emigrated_flows(struct dsw_evdev *dsw,
    1161                 :            :                                  struct dsw_port *source_port)
    1162                 :            : {
    1163                 :            :         uint16_t i;
    1164                 :            : 
    1165         [ #  # ]:          0 :         for (i = 0; i < source_port->emigrating_events_len; i++) {
    1166                 :          0 :                 struct rte_event *event = &source_port->emigrating_events[i];
    1167                 :            : 
    1168                 :          0 :                 dsw_port_forward_emigrated_event(dsw, source_port, event);
    1169                 :            :         }
    1170                 :          0 :         source_port->emigrating_events_len = 0;
    1171                 :            : }
    1172                 :            : 
    1173                 :            : static void
    1174                 :          0 : dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
    1175                 :            :                                struct dsw_port *source_port)
    1176                 :            : {
    1177                 :            :         uint8_t i;
    1178                 :            : 
    1179                 :            :         /* There may be events lingering in the output buffer from
    1180                 :            :          * prior to the pause took effect.
    1181                 :            :          */
    1182                 :            :         dsw_port_flush_out_buffers(dsw, source_port);
    1183                 :            : 
    1184         [ #  # ]:          0 :         for (i = 0; i < source_port->emigration_targets_len; i++) {
    1185                 :            :                 struct dsw_queue_flow *qf =
    1186                 :          0 :                         &source_port->emigration_target_qfs[i];
    1187                 :          0 :                 uint8_t dest_port_id =
    1188                 :            :                         source_port->emigration_target_port_ids[i];
    1189                 :            : 
    1190                 :          0 :                 dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
    1191                 :            :                     dest_port_id;
    1192                 :            :         }
    1193                 :            : 
    1194                 :          0 :         rte_smp_wmb();
    1195                 :            : 
    1196                 :          0 :         dsw_port_drain_in_ring(source_port);
    1197                 :            :         dsw_port_forward_emigrated_flows(dsw, source_port);
    1198                 :            : 
    1199                 :            :         dsw_port_remove_paused_flows(source_port,
    1200                 :          0 :                                      source_port->emigration_target_qfs,
    1201                 :          0 :                                      source_port->emigration_targets_len);
    1202                 :            : 
    1203                 :          0 :         dsw_port_flush_no_longer_paused_events(dsw, source_port);
    1204                 :            : 
    1205                 :            :         /* Processing migrating flows during migration may have
    1206                 :            :          * produced events to paused flows, including the flows which
    1207                 :            :          * were being migrated. Flushing the output buffers before
    1208                 :            :          * unpausing the flows on other ports assures that such events
    1209                 :            :          * are seen *before* any events produced by processing the
    1210                 :            :          * migrating flows on the new port.
    1211                 :            :          */
    1212                 :            :         dsw_port_flush_out_buffers(dsw, source_port);
    1213                 :            : 
    1214                 :            :         /* Flow table update and migration destination port's enqueues
    1215                 :            :          * must be seen before the control message.
    1216                 :            :          */
    1217                 :          0 :         rte_smp_wmb();
    1218                 :            : 
    1219                 :          0 :         dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUSE_REQ,
    1220                 :            :                                source_port->emigration_target_qfs,
    1221                 :          0 :                                source_port->emigration_targets_len);
    1222                 :          0 :         source_port->cfm_cnt = 0;
    1223                 :          0 :         source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
    1224                 :          0 : }
    1225                 :            : 
    1226                 :            : static void
    1227                 :          0 : dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
    1228                 :            : {
    1229                 :          0 :         port->cfm_cnt++;
    1230                 :            : 
    1231         [ #  # ]:          0 :         if (port->cfm_cnt == (dsw->num_ports - 1)) {
    1232      [ #  #  # ]:          0 :                 switch (port->migration_state) {
    1233                 :          0 :                 case DSW_MIGRATION_STATE_PAUSING:
    1234                 :          0 :                         dsw_port_move_emigrating_flows(dsw, port);
    1235                 :          0 :                         break;
    1236                 :          0 :                 case DSW_MIGRATION_STATE_UNPAUSING:
    1237                 :          0 :                         dsw_port_end_emigration(dsw, port,
    1238                 :            :                                                 RTE_SCHED_TYPE_ATOMIC);
    1239                 :          0 :                         break;
    1240                 :            :                 default:
    1241                 :          0 :                         RTE_VERIFY(0);
    1242                 :            :                         break;
    1243                 :            :                 }
    1244                 :            :         }
    1245                 :          0 : }
    1246                 :            : 
    1247                 :            : static void
    1248                 :          0 : dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
    1249                 :            : {
    1250                 :            :         struct dsw_ctl_msg msg;
    1251                 :            : 
    1252         [ #  # ]:          0 :         if (dsw_port_ctl_dequeue(port, &msg) == 0) {
    1253   [ #  #  #  # ]:          0 :                 switch (msg.type) {
    1254                 :          0 :                 case DSW_CTL_PAUSE_REQ:
    1255                 :          0 :                         dsw_port_handle_pause_flows(dsw, port,
    1256                 :          0 :                                                     msg.originating_port_id,
    1257                 :          0 :                                                     msg.qfs, msg.qfs_len);
    1258                 :          0 :                         break;
    1259                 :          0 :                 case DSW_CTL_UNPAUSE_REQ:
    1260                 :          0 :                         dsw_port_handle_unpause_flows(dsw, port,
    1261                 :          0 :                                                       msg.originating_port_id,
    1262                 :          0 :                                                       msg.qfs, msg.qfs_len);
    1263                 :          0 :                         break;
    1264                 :          0 :                 case DSW_CTL_CFM:
    1265                 :          0 :                         dsw_port_handle_confirm(dsw, port);
    1266                 :          0 :                         break;
    1267                 :            :                 }
    1268                 :            :         }
    1269                 :          0 : }
    1270                 :            : 
    1271                 :            : static void
    1272                 :            : dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
    1273                 :            : {
    1274                 :          0 :         port->ops_since_bg_task += (num_events+1);
    1275                 :          0 : }
    1276                 :            : 
    1277                 :            : static void
    1278                 :          0 : dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
    1279                 :            : {
    1280                 :            :         /* Polling the control ring is relatively inexpensive, and
    1281                 :            :          * polling it often helps bringing down migration latency, so
    1282                 :            :          * do this for every iteration.
    1283                 :            :          */
    1284                 :          0 :         dsw_port_ctl_process(dsw, port);
    1285                 :            : 
    1286                 :            :         /* Always check if a migration is waiting for pending releases
    1287                 :            :          * to arrive, to minimize the amount of time dequeuing events
    1288                 :            :          * from the port is disabled.
    1289                 :            :          */
    1290                 :            :         dsw_port_try_finish_pending(dsw, port);
    1291                 :            : 
    1292                 :            :         /* To avoid considering migration and flushing output buffers
    1293                 :            :          * on every dequeue/enqueue call, the scheduler only performs
    1294                 :            :          * such 'background' tasks every nth
    1295                 :            :          * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
    1296                 :            :          */
    1297         [ #  # ]:          0 :         if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
    1298                 :            :                 uint64_t now;
    1299                 :            : 
    1300                 :            :                 now = rte_get_timer_cycles();
    1301                 :            : 
    1302                 :          0 :                 port->last_bg = now;
    1303                 :            : 
    1304                 :            :                 /* Logic to avoid having events linger in the output
    1305                 :            :                  * buffer too long.
    1306                 :            :                  */
    1307                 :            :                 dsw_port_flush_out_buffers(dsw, port);
    1308                 :            : 
    1309                 :            :                 dsw_port_consider_load_update(port, now);
    1310                 :            : 
    1311                 :          0 :                 dsw_port_consider_emigration(dsw, port, now);
    1312                 :            : 
    1313                 :          0 :                 port->ops_since_bg_task = 0;
    1314                 :            :         }
    1315                 :          0 : }
    1316                 :            : 
    1317                 :            : static void
    1318                 :            : dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
    1319                 :            : {
    1320                 :            :         uint16_t dest_port_id;
    1321                 :            : 
    1322   [ #  #  #  #  :          0 :         for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
          #  #  #  #  #  
          #  #  #  #  #  
             #  #  #  # ]
    1323                 :          0 :                 dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
    1324                 :            : }
    1325                 :            : 
    1326                 :            : static __rte_always_inline uint16_t
    1327                 :            : dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
    1328                 :            :                                 const struct rte_event events[],
    1329                 :            :                                 uint16_t events_len, bool op_types_known,
    1330                 :            :                                 uint16_t num_new, uint16_t num_forward,
    1331                 :            :                                 uint16_t num_release)
    1332                 :            : {
    1333                 :          0 :         struct dsw_evdev *dsw = source_port->dsw;
    1334                 :            :         bool enough_credits;
    1335                 :            :         uint16_t i;
    1336                 :            : 
    1337                 :            :         DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "Attempting to enqueue %d "
    1338                 :            :                         "events.", events_len);
    1339                 :            : 
    1340                 :          0 :         dsw_port_bg_process(dsw, source_port);
    1341                 :            : 
    1342                 :            :         /* XXX: For performance (=ring efficiency) reasons, the
    1343                 :            :          * scheduler relies on internal non-ring buffers instead of
    1344                 :            :          * immediately sending the event to the destination ring. For
    1345                 :            :          * a producer that doesn't intend to produce or consume any
    1346                 :            :          * more events, the scheduler provides a way to flush the
    1347                 :            :          * buffer, by means of doing an enqueue of zero events. In
    1348                 :            :          * addition, a port cannot be left "unattended" (e.g. unused)
    1349                 :            :          * for long periods of time, since that would stall
    1350                 :            :          * migration. Eventdev API extensions to provide a cleaner way
    1351                 :            :          * to archive both of these functions should be
    1352                 :            :          * considered.
    1353                 :            :          */
    1354   [ #  #  #  #  :          0 :         if (unlikely(events_len == 0)) {
                   #  # ]
    1355                 :            :                 dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
    1356                 :            :                 dsw_port_flush_out_buffers(dsw, source_port);
    1357                 :            :                 return 0;
    1358                 :            :         }
    1359                 :            : 
    1360                 :            :         dsw_port_note_op(source_port, events_len);
    1361                 :            : 
    1362                 :            :         if (!op_types_known)
    1363         [ #  # ]:          0 :                 for (i = 0; i < events_len; i++) {
    1364   [ #  #  #  # ]:          0 :                         switch (events[i].op) {
    1365                 :          0 :                         case RTE_EVENT_OP_NEW:
    1366                 :          0 :                                 num_new++;
    1367                 :          0 :                                 break;
    1368                 :          0 :                         case RTE_EVENT_OP_FORWARD:
    1369                 :          0 :                                 num_forward++;
    1370                 :          0 :                                 break;
    1371                 :          0 :                         case RTE_EVENT_OP_RELEASE:
    1372                 :          0 :                                 num_release++;
    1373                 :          0 :                                 break;
    1374                 :            :                         }
    1375                 :            :                 }
    1376                 :            : 
    1377                 :            :         /* Technically, we could allow the non-new events up to the
    1378                 :            :          * first new event in the array into the system, but for
    1379                 :            :          * simplicity reasons, we deny the whole burst if the port is
    1380                 :            :          * above the water mark.
    1381                 :            :          */
    1382   [ #  #  #  #  :          0 :         if (unlikely(num_new > 0 &&
             #  #  #  # ]
    1383                 :            :                      rte_atomic_load_explicit(&dsw->credits_on_loan,
    1384                 :            :                                               rte_memory_order_relaxed) >
    1385                 :            :                      source_port->new_event_threshold))
    1386                 :            :                 return 0;
    1387                 :            : 
    1388                 :          0 :         enough_credits = dsw_port_acquire_credits(dsw, source_port, num_new);
    1389   [ #  #  #  #  :          0 :         if (unlikely(!enough_credits))
                   #  # ]
    1390                 :            :                 return 0;
    1391                 :            : 
    1392         [ #  # ]:          0 :         dsw_port_return_credits(dsw, source_port, num_release);
    1393                 :            : 
    1394                 :            :         /* This may seem harsh, but it's important for an application
    1395                 :            :          * to get early feedback for cases where it fails to stick to
    1396                 :            :          * the API contract.
    1397                 :            :          */
    1398   [ #  #  #  # ]:          0 :         RTE_VERIFY(num_forward + num_release <= source_port->pending_releases);
    1399                 :          0 :         source_port->pending_releases -= (num_forward + num_release);
    1400                 :            : 
    1401                 :            :         dsw_port_enqueue_stats(source_port, num_new, num_forward, num_release);
    1402                 :            : 
    1403   [ #  #  #  #  :          0 :         for (i = 0; i < events_len; i++) {
                   #  # ]
    1404                 :          0 :                 const struct rte_event *event = &events[i];
    1405                 :            : 
    1406   [ #  #  #  # ]:          0 :                 if (likely(num_release == 0 ||
    1407                 :            :                            event->op != RTE_EVENT_OP_RELEASE))
    1408                 :          0 :                         dsw_port_buffer_event(dsw, source_port, event);
    1409                 :          0 :                 dsw_port_queue_enqueue_stats(source_port, event->queue_id);
    1410                 :            :         }
    1411                 :            : 
    1412                 :            :         DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "%d non-release events "
    1413                 :            :                         "accepted.", num_new + num_forward);
    1414                 :            : 
    1415                 :          0 :         return (num_new + num_forward + num_release);
    1416                 :            : }
    1417                 :            : 
    1418                 :            : uint16_t
    1419                 :          0 : dsw_event_enqueue_burst(void *port, const struct rte_event events[],
    1420                 :            :                         uint16_t events_len)
    1421                 :            : {
    1422                 :            :         struct dsw_port *source_port = port;
    1423                 :            : 
    1424         [ #  # ]:          0 :         if (unlikely(events_len > source_port->enqueue_depth))
    1425                 :            :                 events_len = source_port->enqueue_depth;
    1426                 :            : 
    1427                 :          0 :         return dsw_event_enqueue_burst_generic(source_port, events,
    1428                 :            :                                                events_len, false, 0, 0, 0);
    1429                 :            : }
    1430                 :            : 
    1431                 :            : uint16_t
    1432                 :          0 : dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
    1433                 :            :                             uint16_t events_len)
    1434                 :            : {
    1435                 :            :         struct dsw_port *source_port = port;
    1436                 :            : 
    1437         [ #  # ]:          0 :         if (unlikely(events_len > source_port->enqueue_depth))
    1438                 :            :                 events_len = source_port->enqueue_depth;
    1439                 :            : 
    1440                 :          0 :         return dsw_event_enqueue_burst_generic(source_port, events,
    1441                 :            :                                                events_len, true, events_len,
    1442                 :            :                                                0, 0);
    1443                 :            : }
    1444                 :            : 
    1445                 :            : uint16_t
    1446                 :          0 : dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
    1447                 :            :                                 uint16_t events_len)
    1448                 :            : {
    1449                 :            :         struct dsw_port *source_port = port;
    1450                 :            : 
    1451         [ #  # ]:          0 :         if (unlikely(events_len > source_port->enqueue_depth))
    1452                 :            :                 events_len = source_port->enqueue_depth;
    1453                 :            : 
    1454                 :          0 :         return dsw_event_enqueue_burst_generic(source_port, events,
    1455                 :            :                                                events_len, true, 0,
    1456                 :            :                                                events_len, 0);
    1457                 :            : }
    1458                 :            : 
    1459                 :            : static void
    1460                 :          0 : dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
    1461                 :            :                             uint16_t num)
    1462                 :            : {
    1463                 :            :         uint16_t i;
    1464                 :            : 
    1465                 :          0 :         dsw_port_dequeue_stats(port, num);
    1466                 :            : 
    1467         [ #  # ]:          0 :         for (i = 0; i < num; i++) {
    1468                 :          0 :                 uint16_t l_idx = port->seen_events_idx;
    1469                 :          0 :                 struct dsw_queue_flow *qf = &port->seen_events[l_idx];
    1470                 :          0 :                 struct rte_event *event = &events[i];
    1471                 :          0 :                 qf->queue_id = event->queue_id;
    1472                 :          0 :                 qf->flow_hash = dsw_flow_id_hash(event->flow_id);
    1473                 :            : 
    1474                 :          0 :                 port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
    1475                 :            : 
    1476                 :          0 :                 dsw_port_queue_dequeued_stats(port, event->queue_id);
    1477                 :            :         }
    1478                 :            : 
    1479         [ #  # ]:          0 :         if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
    1480                 :          0 :                 port->seen_events_len =
    1481                 :          0 :                         RTE_MIN(port->seen_events_len + num,
    1482                 :            :                                 DSW_MAX_EVENTS_RECORDED);
    1483                 :          0 : }
    1484                 :            : 
    1485                 :            : #ifdef DSW_SORT_DEQUEUED
    1486                 :            : 
    1487                 :            : #define DSW_EVENT_TO_INT(_event)                                \
    1488                 :            :         ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
    1489                 :            : 
    1490                 :            : static inline int
    1491                 :            : dsw_cmp_event(const void *v_event_a, const void *v_event_b)
    1492                 :            : {
    1493                 :            :         const struct rte_event *event_a = v_event_a;
    1494                 :            :         const struct rte_event *event_b = v_event_b;
    1495                 :            : 
    1496                 :            :         return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
    1497                 :            : }
    1498                 :            : #endif
    1499                 :            : 
    1500                 :            : static uint16_t
    1501                 :          0 : dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
    1502                 :            :                        uint16_t num)
    1503                 :            : {
    1504                 :          0 :         enum dsw_migration_state state = port->migration_state;
    1505                 :            :         uint16_t dequeued;
    1506                 :            : 
    1507         [ #  # ]:          0 :         if (unlikely(state == DSW_MIGRATION_STATE_FINISH_PENDING))
    1508                 :            :                 /* Do not produce new items of work - only finish
    1509                 :            :                  * outstanding (unreleased) events, to allow the
    1510                 :            :                  * migration procedure to continue.
    1511                 :            :                  */
    1512                 :          0 :                 dequeued = 0;
    1513         [ #  # ]:          0 :         else if (unlikely(port->in_buffer_len > 0)) {
    1514                 :          0 :                 dequeued = RTE_MIN(num, port->in_buffer_len);
    1515                 :            : 
    1516         [ #  # ]:          0 :                 rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
    1517                 :            :                            dequeued * sizeof(struct rte_event));
    1518                 :            : 
    1519                 :          0 :                 port->in_buffer_start += dequeued;
    1520                 :          0 :                 port->in_buffer_len -= dequeued;
    1521                 :            : 
    1522         [ #  # ]:          0 :                 if (port->in_buffer_len == 0)
    1523                 :          0 :                         port->in_buffer_start = 0;
    1524                 :            :         } else {
    1525   [ #  #  #  #  :          0 :                 dequeued = rte_event_ring_dequeue_burst(port->in_ring,
                      # ]
    1526                 :            :                                                         events, num, NULL);
    1527                 :            : 
    1528                 :            :                 /* Stash incoming events belonging to migrating flows,
    1529                 :            :                  * to avoid having to deal with forwarded events to
    1530                 :            :                  * flows which are also in the process of being
    1531                 :            :                  * migrated. A failure to do so leads to reordering,
    1532                 :            :                  * since paused events on the source port may be
    1533                 :            :                  * flushed after paused events on the migration
    1534                 :            :                  * destination port.
    1535                 :            :                  */
    1536         [ #  # ]:          0 :                 if (unlikely(state == DSW_MIGRATION_STATE_PAUSING))
    1537                 :          0 :                         dsw_port_stash_any_migrating_events(port, events,
    1538                 :            :                                                             &dequeued);
    1539                 :            :         }
    1540                 :            : 
    1541                 :          0 :         return dequeued;
    1542                 :            : }
    1543                 :            : 
    1544                 :            : uint16_t
    1545                 :          0 : dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
    1546                 :            :                         uint64_t wait __rte_unused)
    1547                 :            : {
    1548                 :            :         struct dsw_port *source_port = port;
    1549                 :          0 :         struct dsw_evdev *dsw = source_port->dsw;
    1550                 :            :         uint16_t dequeued;
    1551                 :            : 
    1552         [ #  # ]:          0 :         if (source_port->implicit_release) {
    1553                 :          0 :                 dsw_port_return_credits(dsw, port,
    1554         [ #  # ]:          0 :                                         source_port->pending_releases);
    1555                 :            : 
    1556                 :          0 :                 source_port->pending_releases = 0;
    1557                 :            :         }
    1558                 :            : 
    1559                 :          0 :         dsw_port_bg_process(dsw, source_port);
    1560                 :            : 
    1561         [ #  # ]:          0 :         if (unlikely(num > source_port->dequeue_depth))
    1562                 :            :                 num = source_port->dequeue_depth;
    1563                 :            : 
    1564                 :          0 :         dequeued = dsw_port_dequeue_burst(source_port, events, num);
    1565                 :            : 
    1566                 :          0 :         source_port->pending_releases += dequeued;
    1567                 :            : 
    1568         [ #  # ]:          0 :         dsw_port_load_record(source_port, dequeued);
    1569                 :            : 
    1570                 :          0 :         dsw_port_note_op(source_port, dequeued);
    1571                 :            : 
    1572         [ #  # ]:          0 :         if (dequeued > 0) {
    1573                 :            :                 DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "Dequeued %d events.",
    1574                 :            :                                 dequeued);
    1575                 :            : 
    1576                 :            :                 /* One potential optimization one might think of is to
    1577                 :            :                  * add a migration state (prior to 'pausing'), and
    1578                 :            :                  * only record seen events when the port is in this
    1579                 :            :                  * state (and transit to 'pausing' when enough events
    1580                 :            :                  * have been gathered). However, that schema doesn't
    1581                 :            :                  * seem to improve performance.
    1582                 :            :                  */
    1583                 :          0 :                 dsw_port_record_seen_events(port, events, dequeued);
    1584                 :            :         } else /* Zero-size dequeue means a likely idle port, and thus
    1585                 :            :                 * we can afford trading some efficiency for a slightly
    1586                 :            :                 * reduced event wall-time latency.
    1587                 :            :                 */
    1588                 :            :                 dsw_port_flush_out_buffers(dsw, port);
    1589                 :            : 
    1590                 :            : #ifdef DSW_SORT_DEQUEUED
    1591                 :            :         dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
    1592                 :            : #endif
    1593                 :            : 
    1594                 :          0 :         return dequeued;
    1595                 :            : }
    1596                 :            : 
    1597                 :          0 : void dsw_event_maintain(void *port, int op)
    1598                 :            : {
    1599                 :            :         struct dsw_port *source_port = port;
    1600                 :          0 :         struct dsw_evdev *dsw = source_port->dsw;
    1601                 :            : 
    1602                 :            :         dsw_port_note_op(source_port, 0);
    1603                 :          0 :         dsw_port_bg_process(dsw, source_port);
    1604                 :            : 
    1605         [ #  # ]:          0 :         if (op & RTE_EVENT_DEV_MAINT_OP_FLUSH)
    1606                 :            :                 dsw_port_flush_out_buffers(dsw, source_port);
    1607                 :          0 : }

Generated by: LCOV version 1.14