Branch data Line data Source code
1 : : /* SPDX-License-Identifier: BSD-3-Clause
2 : : * Copyright(c) 2018 Ericsson AB
3 : : */
4 : :
5 : : #include "dsw_evdev.h"
6 : :
7 : : #ifdef DSW_SORT_DEQUEUED
8 : : #include "dsw_sort.h"
9 : : #endif
10 : :
11 : : #include <stdbool.h>
12 : : #include <stdlib.h>
13 : : #include <string.h>
14 : :
15 : : #include <rte_cycles.h>
16 : : #include <rte_memcpy.h>
17 : : #include <rte_random.h>
18 : :
19 : : static bool
20 : 0 : dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
21 : : int32_t credits)
22 : : {
23 : 0 : int32_t inflight_credits = port->inflight_credits;
24 : 0 : int32_t missing_credits = credits - inflight_credits;
25 : : int32_t total_on_loan;
26 : : int32_t available;
27 : : int32_t acquired_credits;
28 : : int32_t new_total_on_loan;
29 : :
30 [ # # ]: 0 : if (likely(missing_credits <= 0)) {
31 : 0 : port->inflight_credits -= credits;
32 : 0 : return true;
33 : : }
34 : :
35 : 0 : total_on_loan =
36 : 0 : rte_atomic_load_explicit(&dsw->credits_on_loan,
37 : : rte_memory_order_relaxed);
38 : 0 : available = dsw->max_inflight - total_on_loan;
39 : 0 : acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
40 : :
41 [ # # ]: 0 : if (available < acquired_credits)
42 : : return false;
43 : :
44 : : /* This is a race, no locks are involved, and thus some other
45 : : * thread can allocate tokens in between the check and the
46 : : * allocation.
47 : : */
48 : 0 : new_total_on_loan =
49 : 0 : rte_atomic_fetch_add_explicit(&dsw->credits_on_loan,
50 : : acquired_credits,
51 : : rte_memory_order_relaxed) +
52 : : acquired_credits;
53 : :
54 [ # # ]: 0 : if (unlikely(new_total_on_loan > dsw->max_inflight)) {
55 : : /* Some other port took the last credits */
56 : 0 : rte_atomic_fetch_sub_explicit(&dsw->credits_on_loan,
57 : : acquired_credits,
58 : : rte_memory_order_relaxed);
59 : 0 : return false;
60 : : }
61 : :
62 : : DSW_LOG_DP_PORT_LINE(DEBUG, port->id, "Acquired %d tokens from pool.",
63 : : acquired_credits);
64 : :
65 : 0 : port->inflight_credits += acquired_credits;
66 : 0 : port->inflight_credits -= credits;
67 : :
68 : 0 : return true;
69 : : }
70 : :
71 : : static void
72 : : dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
73 : : int32_t credits)
74 : : {
75 : 0 : port->inflight_credits += credits;
76 : :
77 [ # # # # ]: 0 : if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
78 : : int32_t leave_credits = DSW_PORT_MIN_CREDITS;
79 : 0 : int32_t return_credits =
80 : : port->inflight_credits - leave_credits;
81 : :
82 : 0 : port->inflight_credits = leave_credits;
83 : :
84 : 0 : rte_atomic_fetch_sub_explicit(&dsw->credits_on_loan,
85 : : return_credits,
86 : : rte_memory_order_relaxed);
87 : :
88 : : DSW_LOG_DP_PORT_LINE(DEBUG, port->id,
89 : : "Returned %d tokens to pool.",
90 : : return_credits);
91 : : }
92 : : }
93 : :
94 : : static void
95 : : dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
96 : : uint16_t num_forward, uint16_t num_release)
97 : : {
98 : 0 : port->new_enqueued += num_new;
99 : 0 : port->forward_enqueued += num_forward;
100 : 0 : port->release_enqueued += num_release;
101 : 0 : }
102 : :
103 : : static void
104 : : dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
105 : : {
106 : 0 : source_port->queue_enqueued[queue_id]++;
107 : : }
108 : :
109 : : static void
110 : : dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
111 : : {
112 : 0 : port->dequeued += num;
113 : : }
114 : :
115 : : static void
116 : : dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
117 : : {
118 : 0 : source_port->queue_dequeued[queue_id]++;
119 : : }
120 : :
121 : : static void
122 : : dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
123 : : {
124 [ # # ]: 0 : if (dequeued > 0 && port->busy_start == 0)
125 : : /* work period begins */
126 : 0 : port->busy_start = rte_get_timer_cycles();
127 [ # # # # ]: 0 : else if (dequeued == 0 && port->busy_start > 0) {
128 : : /* work period ends */
129 : 0 : uint64_t work_period =
130 : 0 : rte_get_timer_cycles() - port->busy_start;
131 : 0 : port->busy_cycles += work_period;
132 : 0 : port->busy_start = 0;
133 : : }
134 : : }
135 : :
136 : : static int16_t
137 : : dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
138 : : {
139 : 0 : uint64_t passed = now - port->measurement_start;
140 : 0 : uint64_t busy_cycles = port->busy_cycles;
141 : :
142 : 0 : if (port->busy_start > 0) {
143 : 0 : busy_cycles += (now - port->busy_start);
144 : 0 : port->busy_start = now;
145 : : }
146 : :
147 : 0 : int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
148 : :
149 : 0 : port->measurement_start = now;
150 : 0 : port->busy_cycles = 0;
151 : :
152 : 0 : port->total_busy_cycles += busy_cycles;
153 : :
154 : : return load;
155 : : }
156 : :
157 : : static void
158 : 0 : dsw_port_load_update(struct dsw_port *port, uint64_t now)
159 : : {
160 : : int16_t old_load;
161 : : int16_t period_load;
162 : : int16_t new_load;
163 : :
164 [ # # ]: 0 : old_load = rte_atomic_load_explicit(&port->load,
165 : : rte_memory_order_relaxed);
166 : :
167 : : period_load = dsw_port_load_close_period(port, now);
168 : :
169 : 0 : new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
170 : : (DSW_OLD_LOAD_WEIGHT+1);
171 : :
172 : 0 : rte_atomic_store_explicit(&port->load, new_load,
173 : : rte_memory_order_relaxed);
174 : :
175 : : /* The load of the recently immigrated flows should hopefully
176 : : * be reflected the load estimate by now.
177 : : */
178 : 0 : rte_atomic_store_explicit(&port->immigration_load, 0,
179 : : rte_memory_order_relaxed);
180 : 0 : }
181 : :
182 : : static void
183 : : dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
184 : : {
185 [ # # ]: 0 : if (now < port->next_load_update)
186 : : return;
187 : :
188 : 0 : port->next_load_update = now + port->load_update_interval;
189 : :
190 : 0 : dsw_port_load_update(port, now);
191 : : }
192 : :
193 : : static void
194 : 0 : dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
195 : : {
196 : : /* there's always room on the ring */
197 [ # # # # : 0 : while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
# ]
198 : : rte_pause();
199 : 0 : }
200 : :
201 : : static int
202 : 0 : dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
203 : : {
204 [ # # # # : 0 : return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
# ]
205 : : }
206 : :
207 : : static void
208 : 0 : dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
209 : : uint8_t type, struct dsw_queue_flow *qfs,
210 : : uint8_t qfs_len)
211 : : {
212 : : uint16_t port_id;
213 : 0 : struct dsw_ctl_msg msg = {
214 : : .type = type,
215 : 0 : .originating_port_id = source_port->id,
216 : : .qfs_len = qfs_len
217 : : };
218 : :
219 : 0 : memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
220 : :
221 [ # # ]: 0 : for (port_id = 0; port_id < dsw->num_ports; port_id++)
222 [ # # ]: 0 : if (port_id != source_port->id)
223 : 0 : dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
224 : 0 : }
225 : :
226 : : static __rte_always_inline bool
227 : : dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
228 : : uint8_t queue_id, uint16_t flow_hash)
229 : : {
230 : : uint16_t i;
231 : :
232 [ # # # # : 0 : for (i = 0; i < qfs_len; i++)
# # # # #
# ]
233 [ # # # # : 0 : if (qfs[i].queue_id == queue_id &&
# # # # #
# ]
234 [ # # # # : 0 : qfs[i].flow_hash == flow_hash)
# # # # #
# ]
235 : : return true;
236 : :
237 : : return false;
238 : : }
239 : :
240 : : static __rte_always_inline bool
241 : : dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
242 : : uint16_t flow_hash)
243 : : {
244 : 0 : return dsw_is_queue_flow_in_ary(port->paused_flows,
245 : 0 : port->paused_flows_len,
246 : : queue_id, flow_hash);
247 : : }
248 : :
249 : : static __rte_always_inline bool
250 : : dsw_port_is_flow_migrating(struct dsw_port *port, uint8_t queue_id,
251 : : uint16_t flow_hash)
252 : : {
253 : 0 : return dsw_is_queue_flow_in_ary(port->emigration_target_qfs,
254 : 0 : port->emigration_targets_len,
255 : : queue_id, flow_hash);
256 : : }
257 : :
258 : : static void
259 : : dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
260 : : uint8_t qfs_len)
261 : : {
262 : : uint8_t i;
263 : :
264 [ # # # # ]: 0 : for (i = 0; i < qfs_len; i++) {
265 : 0 : struct dsw_queue_flow *qf = &qfs[i];
266 : :
267 : : DSW_LOG_DP_PORT_LINE(DEBUG, port->id,
268 : : "Pausing queue_id %d flow_hash %d.",
269 : : qf->queue_id, qf->flow_hash);
270 : :
271 : 0 : port->paused_flows[port->paused_flows_len] = *qf;
272 : 0 : port->paused_flows_len++;
273 : : };
274 : : }
275 : :
276 : : static void
277 : 0 : dsw_port_remove_paused_flow(struct dsw_port *port,
278 : : const struct dsw_queue_flow *target_qf)
279 : : {
280 : : uint16_t i;
281 : :
282 [ # # ]: 0 : for (i = 0; i < port->paused_flows_len; i++) {
283 : 0 : struct dsw_queue_flow *qf = &port->paused_flows[i];
284 : :
285 [ # # ]: 0 : if (qf->queue_id == target_qf->queue_id &&
286 [ # # ]: 0 : qf->flow_hash == target_qf->flow_hash) {
287 : 0 : uint16_t last_idx = port->paused_flows_len-1;
288 [ # # ]: 0 : if (i != last_idx)
289 : 0 : port->paused_flows[i] =
290 : 0 : port->paused_flows[last_idx];
291 : 0 : port->paused_flows_len--;
292 : :
293 : : DSW_LOG_DP_PORT_LINE(DEBUG, port->id,
294 : : "Unpausing queue_id %d flow_hash %d.",
295 : : target_qf->queue_id,
296 : : target_qf->flow_hash);
297 : :
298 : : return;
299 : : }
300 : : }
301 : :
302 : 0 : DSW_LOG_DP_PORT_LINE(ERR, port->id,
303 : : "Failed to unpause queue_id %d flow_hash %d.",
304 : : target_qf->queue_id, target_qf->flow_hash);
305 : 0 : RTE_VERIFY(0);
306 : : }
307 : :
308 : : static void
309 : : dsw_port_remove_paused_flows(struct dsw_port *port,
310 : : struct dsw_queue_flow *qfs, uint8_t qfs_len)
311 : : {
312 : : uint8_t i;
313 : :
314 [ # # # # ]: 0 : for (i = 0; i < qfs_len; i++)
315 : 0 : dsw_port_remove_paused_flow(port, &qfs[i]);
316 : : }
317 : :
318 : : static void
319 : : dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
320 : :
321 : : static void
322 : 0 : dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
323 : : uint8_t originating_port_id,
324 : : struct dsw_queue_flow *paused_qfs,
325 : : uint8_t qfs_len)
326 : : {
327 : 0 : struct dsw_ctl_msg cfm = {
328 : : .type = DSW_CTL_CFM,
329 : 0 : .originating_port_id = port->id
330 : : };
331 : :
332 : : /* There might be already-scheduled events belonging to the
333 : : * paused flow in the output buffers.
334 : : */
335 : : dsw_port_flush_out_buffers(dsw, port);
336 : :
337 : : dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
338 : :
339 : : /* Make sure any stores to the original port's in_ring is seen
340 : : * before the ctl message.
341 : : */
342 : 0 : rte_smp_wmb();
343 : :
344 : 0 : dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
345 : 0 : }
346 : :
347 : : struct dsw_queue_flow_burst {
348 : : struct dsw_queue_flow queue_flow;
349 : : uint16_t count;
350 : : };
351 : :
352 : : #define DSW_QF_TO_INT(_qf) \
353 : : ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
354 : :
355 : : static inline int
356 : 0 : dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
357 : : {
358 : : const struct dsw_queue_flow *qf_a = v_qf_a;
359 : : const struct dsw_queue_flow *qf_b = v_qf_b;
360 : :
361 : 0 : return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
362 : : }
363 : :
364 : : static uint16_t
365 : 0 : dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
366 : : struct dsw_queue_flow_burst *bursts)
367 : : {
368 : : uint16_t i;
369 : : struct dsw_queue_flow_burst *current_burst = NULL;
370 : : uint16_t num_bursts = 0;
371 : :
372 : : /* We don't need the stable property, and the list is likely
373 : : * large enough for qsort() to outperform dsw_stable_sort(),
374 : : * so we use qsort() here.
375 : : */
376 : 0 : qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
377 : :
378 : : /* arrange the (now-consecutive) events into bursts */
379 [ # # ]: 0 : for (i = 0; i < qfs_len; i++) {
380 [ # # ]: 0 : if (i == 0 ||
381 [ # # ]: 0 : dsw_cmp_qf(&qfs[i], ¤t_burst->queue_flow) != 0) {
382 : 0 : current_burst = &bursts[num_bursts];
383 : 0 : current_burst->queue_flow = qfs[i];
384 : 0 : current_burst->count = 0;
385 : 0 : num_bursts++;
386 : : }
387 : 0 : current_burst->count++;
388 : : }
389 : :
390 : 0 : return num_bursts;
391 : : }
392 : :
393 : : static bool
394 : 0 : dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
395 : : int16_t load_limit)
396 : : {
397 : : bool below_limit = false;
398 : : uint16_t i;
399 : :
400 [ # # ]: 0 : for (i = 0; i < dsw->num_ports; i++) {
401 : 0 : int16_t measured_load =
402 : 0 : rte_atomic_load_explicit(&dsw->ports[i].load,
403 : : rte_memory_order_relaxed);
404 : 0 : int32_t immigration_load =
405 : 0 : rte_atomic_load_explicit(&dsw->ports[i].immigration_load,
406 : : rte_memory_order_relaxed);
407 : 0 : int32_t load = measured_load + immigration_load;
408 : :
409 : 0 : load = RTE_MIN(load, DSW_MAX_LOAD);
410 : :
411 [ # # ]: 0 : if (load < load_limit)
412 : : below_limit = true;
413 : 0 : port_loads[i] = load;
414 : : }
415 : 0 : return below_limit;
416 : : }
417 : :
418 : : static int16_t
419 : : dsw_flow_load(uint16_t num_events, int16_t port_load)
420 : : {
421 : 0 : return ((int32_t)port_load * (int32_t)num_events) /
422 : : DSW_MAX_EVENTS_RECORDED;
423 : : }
424 : :
425 : : static int16_t
426 : : dsw_evaluate_migration(int16_t source_load, int16_t target_load,
427 : : int16_t flow_load)
428 : : {
429 : : int32_t res_target_load;
430 : : int32_t imbalance;
431 : :
432 : 0 : if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
433 : : return -1;
434 : :
435 : 0 : imbalance = source_load - target_load;
436 : :
437 [ # # ]: 0 : if (imbalance < DSW_REBALANCE_THRESHOLD)
438 : : return -1;
439 : :
440 : 0 : res_target_load = target_load + flow_load;
441 : :
442 : : /* If the estimated load of the target port will be higher
443 : : * than the source port's load, it doesn't make sense to move
444 : : * the flow.
445 : : */
446 [ # # ]: 0 : if (res_target_load > source_load)
447 : : return -1;
448 : :
449 : : /* The more idle the target will be, the better. This will
450 : : * make migration prefer moving smaller flows, and flows to
451 : : * lightly loaded ports.
452 : : */
453 : 0 : return DSW_MAX_LOAD - res_target_load;
454 : : }
455 : :
456 : : static bool
457 : : dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
458 : : {
459 : : struct dsw_queue *queue = &dsw->queues[queue_id];
460 : :
461 : 0 : return rte_bitset_test(queue->serving_ports, port_id);
462 : : }
463 : :
464 : : static bool
465 : 0 : dsw_select_emigration_target(struct dsw_evdev *dsw,
466 : : struct dsw_port *source_port,
467 : : struct dsw_queue_flow_burst *bursts,
468 : : uint16_t num_bursts,
469 : : int16_t *port_loads, uint16_t num_ports,
470 : : uint8_t *target_port_ids,
471 : : struct dsw_queue_flow *target_qfs,
472 : : uint8_t *targets_len)
473 : : {
474 : 0 : int16_t source_port_load = port_loads[source_port->id];
475 : : struct dsw_queue_flow *candidate_qf = NULL;
476 : : uint8_t candidate_port_id = 0;
477 : : int16_t candidate_weight = -1;
478 : : int16_t candidate_flow_load = -1;
479 : : uint16_t i;
480 : :
481 [ # # ]: 0 : if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
482 : : return false;
483 : :
484 [ # # ]: 0 : for (i = 0; i < num_bursts; i++) {
485 : 0 : struct dsw_queue_flow_burst *burst = &bursts[i];
486 : 0 : struct dsw_queue_flow *qf = &burst->queue_flow;
487 : : int16_t flow_load;
488 : : uint16_t port_id;
489 : :
490 [ # # ]: 0 : if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
491 : 0 : qf->queue_id, qf->flow_hash))
492 : 0 : continue;
493 : :
494 : 0 : flow_load = dsw_flow_load(burst->count, source_port_load);
495 : :
496 [ # # ]: 0 : for (port_id = 0; port_id < num_ports; port_id++) {
497 : : int16_t weight;
498 : :
499 [ # # ]: 0 : if (port_id == source_port->id)
500 : 0 : continue;
501 : :
502 [ # # ]: 0 : if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
503 : 0 : continue;
504 : :
505 : 0 : weight = dsw_evaluate_migration(source_port_load,
506 [ # # ]: 0 : port_loads[port_id],
507 : : flow_load);
508 : :
509 [ # # ]: 0 : if (weight > candidate_weight) {
510 : : candidate_qf = qf;
511 : : candidate_port_id = port_id;
512 : : candidate_weight = weight;
513 : : candidate_flow_load = flow_load;
514 : : }
515 : : }
516 : : }
517 : :
518 [ # # ]: 0 : if (candidate_weight < 0)
519 : : return false;
520 : :
521 : : DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "Selected queue_id %d "
522 : : "flow_hash %d (with flow load %d) for migration "
523 : : "to port %d.", candidate_qf->queue_id,
524 : : candidate_qf->flow_hash,
525 : : DSW_LOAD_TO_PERCENT(candidate_flow_load),
526 : : candidate_port_id);
527 : :
528 : 0 : port_loads[candidate_port_id] += candidate_flow_load;
529 : 0 : port_loads[source_port->id] -= candidate_flow_load;
530 : :
531 : 0 : target_port_ids[*targets_len] = candidate_port_id;
532 : 0 : target_qfs[*targets_len] = *candidate_qf;
533 : 0 : (*targets_len)++;
534 : :
535 : 0 : rte_atomic_fetch_add_explicit(
536 : : &dsw->ports[candidate_port_id].immigration_load,
537 : : candidate_flow_load,
538 : : rte_memory_order_relaxed);
539 : :
540 : 0 : return true;
541 : : }
542 : :
543 : : static void
544 : 0 : dsw_select_emigration_targets(struct dsw_evdev *dsw,
545 : : struct dsw_port *source_port,
546 : : struct dsw_queue_flow_burst *bursts,
547 : : uint16_t num_bursts, int16_t *port_loads)
548 : : {
549 : 0 : struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
550 : 0 : uint8_t *target_port_ids = source_port->emigration_target_port_ids;
551 : 0 : uint8_t *targets_len = &source_port->emigration_targets_len;
552 : : uint16_t i;
553 : :
554 [ # # ]: 0 : for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
555 : : bool found;
556 : :
557 : 0 : found = dsw_select_emigration_target(dsw, source_port,
558 : : bursts, num_bursts,
559 : 0 : port_loads, dsw->num_ports,
560 : : target_port_ids,
561 : : target_qfs,
562 : : targets_len);
563 [ # # ]: 0 : if (!found)
564 : : break;
565 : : }
566 : :
567 : : if (*targets_len == 0)
568 : : DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id,
569 : : "For the %d flows considered, no target port "
570 : : "was found.", num_bursts);
571 : 0 : }
572 : :
573 : : static uint8_t
574 : 0 : dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
575 : : {
576 : 0 : struct dsw_queue *queue = &dsw->queues[queue_id];
577 : : uint8_t port_id;
578 : :
579 [ # # ]: 0 : if (queue->num_serving_ports > 1)
580 : 0 : port_id = queue->flow_to_port_map[flow_hash];
581 : : else
582 : : /* A single-link queue, or atomic/ordered/parallel but
583 : : * with just a single serving port.
584 : : */
585 : 0 : port_id = (uint8_t)rte_bitset_find_first_set(
586 : 0 : queue->serving_ports, DSW_MAX_PORTS
587 : : );
588 : :
589 : : DSW_LOG_DP_LINE(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
590 : : "to port %d.", queue_id, flow_hash, port_id);
591 : :
592 : 0 : return port_id;
593 : : }
594 : :
595 : : static void
596 : 0 : dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
597 : : uint8_t dest_port_id)
598 : : {
599 : 0 : struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
600 : : uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
601 : 0 : struct rte_event *buffer = source_port->out_buffer[dest_port_id];
602 : :
603 [ # # ]: 0 : if (*buffer_len == 0)
604 : : return;
605 : :
606 : : /* The rings are dimensioned to fit all in-flight events (even
607 : : * on a single ring).
608 : : */
609 [ # # # # : 0 : rte_event_ring_enqueue_bulk(dest_port->in_ring, buffer, *buffer_len,
# ]
610 : : NULL);
611 : :
612 : 0 : (*buffer_len) = 0;
613 : : }
614 : :
615 : : static uint16_t
616 : : dsw_port_get_parallel_flow_id(struct dsw_port *port)
617 : : {
618 : 0 : uint16_t flow_id = port->next_parallel_flow_id;
619 : :
620 : 0 : port->next_parallel_flow_id =
621 : 0 : (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
622 : :
623 : : return flow_id;
624 : : }
625 : :
626 : : static void
627 : : dsw_port_buffer_paused(struct dsw_port *port,
628 : : const struct rte_event *paused_event)
629 : : {
630 : 0 : port->paused_events[port->paused_events_len] = *paused_event;
631 : 0 : port->paused_events_len++;
632 : 0 : }
633 : :
634 : :
635 : : static void
636 : 0 : dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
637 : : uint8_t dest_port_id, const struct rte_event *event)
638 : : {
639 : 0 : struct rte_event *buffer = source_port->out_buffer[dest_port_id];
640 : : uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
641 : :
642 [ # # ]: 0 : if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
643 : 0 : dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
644 : :
645 : 0 : buffer[*buffer_len] = *event;
646 : :
647 : 0 : (*buffer_len)++;
648 : 0 : }
649 : :
650 : : #define DSW_FLOW_ID_BITS (24)
651 : : static uint16_t
652 : : dsw_flow_id_hash(uint32_t flow_id)
653 : : {
654 : : uint16_t hash = 0;
655 : : uint16_t offset = 0;
656 : :
657 : : do {
658 : 0 : hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
659 : 0 : offset += DSW_MAX_FLOWS_BITS;
660 [ # # # # : 0 : } while (offset < DSW_FLOW_ID_BITS);
# # # # #
# # # #
# ]
661 : :
662 : : return hash;
663 : : }
664 : :
665 : : static void
666 : 0 : dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
667 : : struct rte_event event)
668 : : {
669 : : uint8_t dest_port_id;
670 : :
671 : 0 : event.flow_id = dsw_port_get_parallel_flow_id(source_port);
672 : :
673 : 0 : dest_port_id = dsw_schedule(dsw, event.queue_id,
674 : 0 : dsw_flow_id_hash(event.flow_id));
675 : :
676 : 0 : dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
677 : 0 : }
678 : :
679 : : static void
680 : 0 : dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
681 : : const struct rte_event *event)
682 : : {
683 : : uint16_t flow_hash;
684 : : uint8_t dest_port_id;
685 : :
686 [ # # ]: 0 : if (unlikely(dsw->queues[event->queue_id].schedule_type ==
687 : : RTE_SCHED_TYPE_PARALLEL)) {
688 : 0 : dsw_port_buffer_parallel(dsw, source_port, *event);
689 : 0 : return;
690 : : }
691 : :
692 : 0 : flow_hash = dsw_flow_id_hash(event->flow_id);
693 : :
694 [ # # ]: 0 : if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
695 : : flow_hash))) {
696 : : dsw_port_buffer_paused(source_port, event);
697 : 0 : return;
698 : : }
699 : :
700 : 0 : dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
701 : :
702 : 0 : dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
703 : : }
704 : :
705 : : static void
706 : 0 : dsw_port_flush_no_longer_paused_events(struct dsw_evdev *dsw,
707 : : struct dsw_port *source_port)
708 : 0 : {
709 : 0 : uint16_t paused_events_len = source_port->paused_events_len;
710 : 0 : struct rte_event paused_events[paused_events_len];
711 : : uint16_t i;
712 : :
713 [ # # ]: 0 : if (paused_events_len == 0)
714 : 0 : return;
715 : :
716 [ # # ]: 0 : rte_memcpy(paused_events, source_port->paused_events,
717 : : paused_events_len * sizeof(struct rte_event));
718 : :
719 : 0 : source_port->paused_events_len = 0;
720 : :
721 [ # # ]: 0 : for (i = 0; i < paused_events_len; i++) {
722 : 0 : struct rte_event *event = &paused_events[i];
723 : : uint16_t flow_hash;
724 : :
725 : 0 : flow_hash = dsw_flow_id_hash(event->flow_id);
726 : :
727 [ # # ]: 0 : if (dsw_port_is_flow_paused(source_port, event->queue_id,
728 : : flow_hash))
729 : : dsw_port_buffer_paused(source_port, event);
730 : : else {
731 : : uint8_t dest_port_id;
732 : :
733 : 0 : dest_port_id = dsw_schedule(dsw, event->queue_id,
734 : : flow_hash);
735 : :
736 : 0 : dsw_port_buffer_non_paused(dsw, source_port,
737 : : dest_port_id, event);
738 : : }
739 : : }
740 : : }
741 : :
742 : : static void
743 : : dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
744 : : {
745 : : uint64_t flow_migration_latency;
746 : :
747 : 0 : flow_migration_latency =
748 : 0 : (rte_get_timer_cycles() - port->emigration_start);
749 : 0 : port->emigration_latency += (flow_migration_latency * finished);
750 : 0 : port->emigrations += finished;
751 : 0 : }
752 : :
753 : : static void
754 : 0 : dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
755 : : uint8_t schedule_type)
756 : : {
757 : : uint8_t i;
758 : : struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
759 : : uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
760 : : uint8_t left_qfs_len = 0;
761 : : uint8_t finished;
762 : :
763 [ # # ]: 0 : for (i = 0; i < port->emigration_targets_len; i++) {
764 : 0 : struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
765 : 0 : uint8_t queue_id = qf->queue_id;
766 : 0 : uint8_t queue_schedule_type =
767 : 0 : dsw->queues[queue_id].schedule_type;
768 : : uint16_t flow_hash = qf->flow_hash;
769 : :
770 [ # # ]: 0 : if (queue_schedule_type != schedule_type) {
771 : 0 : left_port_ids[left_qfs_len] =
772 : 0 : port->emigration_target_port_ids[i];
773 : 0 : left_qfs[left_qfs_len] = *qf;
774 : 0 : left_qfs_len++;
775 : 0 : continue;
776 : : }
777 : :
778 : : DSW_LOG_DP_PORT_LINE(DEBUG, port->id, "Migration completed for "
779 : : "queue_id %d flow_hash %d.", queue_id,
780 : : flow_hash);
781 : : }
782 : :
783 : 0 : finished = port->emigration_targets_len - left_qfs_len;
784 : :
785 [ # # ]: 0 : if (finished > 0)
786 : : dsw_port_emigration_stats(port, finished);
787 : :
788 [ # # ]: 0 : for (i = 0; i < left_qfs_len; i++) {
789 : 0 : port->emigration_target_port_ids[i] = left_port_ids[i];
790 : 0 : port->emigration_target_qfs[i] = left_qfs[i];
791 : : }
792 : 0 : port->emigration_targets_len = left_qfs_len;
793 : :
794 [ # # ]: 0 : if (port->emigration_targets_len == 0) {
795 : 0 : port->migration_state = DSW_MIGRATION_STATE_IDLE;
796 : 0 : port->emigration_targets_len = 0;
797 : 0 : port->seen_events_len = 0;
798 : : }
799 : 0 : }
800 : :
801 : : static void
802 : 0 : dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
803 : : struct dsw_port *source_port)
804 : : {
805 : : uint8_t i;
806 : :
807 [ # # ]: 0 : for (i = 0; i < source_port->emigration_targets_len; i++) {
808 : : struct dsw_queue_flow *qf =
809 : 0 : &source_port->emigration_target_qfs[i];
810 : 0 : uint8_t queue_id = qf->queue_id;
811 : :
812 [ # # ]: 0 : if (dsw->queues[queue_id].schedule_type ==
813 : : RTE_SCHED_TYPE_PARALLEL) {
814 : 0 : uint8_t dest_port_id =
815 : : source_port->emigration_target_port_ids[i];
816 : 0 : uint16_t flow_hash = qf->flow_hash;
817 : :
818 : : /* Single byte-sized stores are always atomic. */
819 : 0 : dsw->queues[queue_id].flow_to_port_map[flow_hash] =
820 : : dest_port_id;
821 : : }
822 : : }
823 : :
824 : 0 : rte_smp_wmb();
825 : :
826 : 0 : dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
827 : 0 : }
828 : :
829 : : static void
830 : 0 : dsw_port_consider_emigration(struct dsw_evdev *dsw,
831 : : struct dsw_port *source_port,
832 : : uint64_t now)
833 : 0 : {
834 : : bool any_port_below_limit;
835 : 0 : struct dsw_queue_flow *seen_events = source_port->seen_events;
836 : 0 : uint16_t seen_events_len = source_port->seen_events_len;
837 : : struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
838 : : uint16_t num_bursts;
839 : : int16_t source_port_load;
840 : 0 : int16_t port_loads[dsw->num_ports];
841 : :
842 [ # # ]: 0 : if (now < source_port->next_emigration)
843 : 0 : return;
844 : :
845 [ # # ]: 0 : if (dsw->num_ports == 1)
846 : : return;
847 : :
848 : : DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "Considering emigration.");
849 : :
850 : : /* For simplicity, postpone migration if there are still
851 : : * events to consume in the in_buffer (from the last
852 : : * emigration).
853 : : */
854 [ # # ]: 0 : if (source_port->in_buffer_len > 0) {
855 : : DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "There are still "
856 : : "events in the input buffer.");
857 : : return;
858 : : }
859 : :
860 [ # # ]: 0 : if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
861 : : DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id,
862 : : "Emigration already in progress.");
863 : : return;
864 : : }
865 : :
866 [ # # ]: 0 : if (seen_events_len < DSW_MAX_EVENTS_RECORDED) {
867 : : DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "Not enough events "
868 : : "are recorded to allow for a migration.");
869 : : return;
870 : : }
871 : :
872 : : /* Postpone migration considering in case paused events exists, since
873 : : * such events may prevent the migration procedure from completing,
874 : : * leading to wasted CPU cycles (e.g., sorting queue flows).
875 : : */
876 [ # # ]: 0 : if (source_port->paused_events_len > 0) {
877 : : DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "Paused events on "
878 : : "port. Postponing any migrations.");
879 : : return;
880 : : }
881 : :
882 : : /* Randomize interval to avoid having all threads considering
883 : : * emigration at the same in point in time, which might lead
884 : : * to all choosing the same target port.
885 : : */
886 : 0 : source_port->next_emigration = now +
887 : 0 : source_port->migration_interval / 2 +
888 : 0 : rte_rand_max(source_port->migration_interval);
889 : :
890 : 0 : source_port_load =
891 : 0 : rte_atomic_load_explicit(&source_port->load,
892 : : rte_memory_order_relaxed);
893 [ # # ]: 0 : if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
894 : : DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id,
895 : : "Load %d is below threshold level %d.",
896 : : DSW_LOAD_TO_PERCENT(source_port_load),
897 : : DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
898 : : return;
899 : : }
900 : :
901 : : /* Avoid starting any expensive operations (sorting etc), in
902 : : * case of a scenario with all ports above the load limit.
903 : : */
904 : : any_port_below_limit =
905 : 0 : dsw_retrieve_port_loads(dsw, port_loads,
906 : : DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
907 [ # # ]: 0 : if (!any_port_below_limit) {
908 : : DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id,
909 : : "Candidate target ports are all too highly "
910 : : "loaded.");
911 : : return;
912 : : }
913 : :
914 : 0 : num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
915 : : bursts);
916 : :
917 : : /* For non-big-little systems, there's no point in moving the
918 : : * only (known) flow.
919 : : */
920 [ # # ]: 0 : if (num_bursts < 2) {
921 : : DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "Only a single flow "
922 : : "queue_id %d flow_hash %d has been seen.",
923 : : bursts[0].queue_flow.queue_id,
924 : : bursts[0].queue_flow.flow_hash);
925 : : return;
926 : : }
927 : :
928 : 0 : dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
929 : : port_loads);
930 : :
931 [ # # ]: 0 : if (source_port->emigration_targets_len == 0)
932 : : return;
933 : :
934 : 0 : source_port->migration_state = DSW_MIGRATION_STATE_FINISH_PENDING;
935 : 0 : source_port->emigration_start = rte_get_timer_cycles();
936 : :
937 : : /* No need to go through the whole pause procedure for
938 : : * parallel queues, since atomic/ordered semantics need not to
939 : : * be maintained.
940 : : */
941 : 0 : dsw_port_move_parallel_flows(dsw, source_port);
942 : : }
943 : :
944 : : static void
945 : 0 : dsw_port_abort_migration(struct dsw_port *source_port)
946 : : {
947 : : RTE_ASSERT(source_port->in_buffer_start == 0);
948 : : RTE_ASSERT(source_port->in_buffer_len == 0);
949 : :
950 : : /* Putting the stashed events in the in_buffer makes sure they
951 : : * are processed before any events on the in_ring, to avoid
952 : : * reordering.
953 : : */
954 : 0 : rte_memcpy(source_port->in_buffer, source_port->emigrating_events,
955 [ # # ]: 0 : source_port->emigrating_events_len * sizeof(struct rte_event));
956 : 0 : source_port->in_buffer_len = source_port->emigrating_events_len;
957 : 0 : source_port->emigrating_events_len = 0;
958 : :
959 : 0 : source_port->emigration_targets_len = 0;
960 : :
961 : 0 : source_port->migration_state = DSW_MIGRATION_STATE_IDLE;
962 : 0 : }
963 : :
964 : : static void
965 : 0 : dsw_port_continue_emigration(struct dsw_evdev *dsw,
966 : : struct dsw_port *source_port)
967 : : {
968 : : /* A flow migration cannot be completed if there are paused
969 : : * events, since some/all of those events may be have been
970 : : * produced as a result of processing the flow(s) selected for
971 : : * migration. Moving such a flow would potentially introduced
972 : : * reordering, since processing the migrated flow on the
973 : : * receiving flow may commence before the to-be-enqueued-to
974 : : * flows are unpaused, leading to paused events on the second
975 : : * port as well, destined for the same paused flow(s). When
976 : : * those flows are unpaused, the resulting events are
977 : : * delivered the owning port in an undefined order.
978 : : *
979 : : * Waiting for the events to be unpaused could lead to a
980 : : * deadlock, where two ports are both waiting for the other to
981 : : * unpause.
982 : : */
983 [ # # ]: 0 : if (source_port->paused_events_len > 0) {
984 : : DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "There are events in "
985 : : "the pause buffer. Aborting migration.");
986 : 0 : dsw_port_abort_migration(source_port);
987 : 0 : return;
988 : : }
989 : :
990 : 0 : dsw_port_add_paused_flows(source_port,
991 : 0 : source_port->emigration_target_qfs,
992 : 0 : source_port->emigration_targets_len);
993 : :
994 : 0 : dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUSE_REQ,
995 : : source_port->emigration_target_qfs,
996 : : source_port->emigration_targets_len);
997 : 0 : source_port->cfm_cnt = 0;
998 : :
999 : 0 : source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
1000 : : }
1001 : :
1002 : : static void
1003 : : dsw_port_try_finish_pending(struct dsw_evdev *dsw, struct dsw_port *source_port)
1004 : : {
1005 [ # # # # ]: 0 : if (unlikely(source_port->migration_state ==
1006 : : DSW_MIGRATION_STATE_FINISH_PENDING &&
1007 : : source_port->pending_releases == 0))
1008 : 0 : dsw_port_continue_emigration(dsw, source_port);
1009 : : }
1010 : :
1011 : : static void
1012 : : dsw_port_flush_no_longer_paused_events(struct dsw_evdev *dsw,
1013 : : struct dsw_port *source_port);
1014 : :
1015 : : static void
1016 : 0 : dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
1017 : : uint8_t originating_port_id,
1018 : : struct dsw_queue_flow *paused_qfs,
1019 : : uint8_t qfs_len)
1020 : : {
1021 : : uint16_t i;
1022 : 0 : struct dsw_ctl_msg cfm = {
1023 : : .type = DSW_CTL_CFM,
1024 : 0 : .originating_port_id = port->id
1025 : : };
1026 : :
1027 : : dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
1028 : :
1029 : 0 : dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
1030 : :
1031 [ # # ]: 0 : for (i = 0; i < qfs_len; i++) {
1032 : 0 : struct dsw_queue_flow *qf = &paused_qfs[i];
1033 : :
1034 [ # # ]: 0 : if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
1035 : 0 : port->immigrations++;
1036 : : }
1037 : :
1038 : 0 : dsw_port_flush_no_longer_paused_events(dsw, port);
1039 : 0 : }
1040 : :
1041 : : static void
1042 : : dsw_port_buffer_in_buffer(struct dsw_port *port,
1043 : : const struct rte_event *event)
1044 : :
1045 : : {
1046 : : RTE_ASSERT(port->in_buffer_start == 0);
1047 : :
1048 : 0 : port->in_buffer[port->in_buffer_len] = *event;
1049 : 0 : port->in_buffer_len++;
1050 : 0 : }
1051 : :
1052 : : static void
1053 : : dsw_port_stash_migrating_event(struct dsw_port *port,
1054 : : const struct rte_event *event)
1055 : : {
1056 : 0 : port->emigrating_events[port->emigrating_events_len] = *event;
1057 : 0 : port->emigrating_events_len++;
1058 : 0 : }
1059 : :
1060 : : static void
1061 : 0 : dsw_port_stash_any_migrating_events(struct dsw_port *port,
1062 : : struct rte_event *events,
1063 : : uint16_t *num)
1064 : : {
1065 : : uint16_t i;
1066 : : uint16_t offset = 0;
1067 : :
1068 [ # # ]: 0 : for (i = 0; i < *num; i++) {
1069 : : uint16_t flow_hash;
1070 : 0 : struct rte_event *in_event = &events[i];
1071 : :
1072 : 0 : flow_hash = dsw_flow_id_hash(in_event->flow_id);
1073 : :
1074 [ # # ]: 0 : if (unlikely(dsw_port_is_flow_migrating(port,
1075 : : in_event->queue_id,
1076 : : flow_hash))) {
1077 : : dsw_port_stash_migrating_event(port, in_event);
1078 : 0 : offset++;
1079 [ # # ]: 0 : } else if (offset > 0) {
1080 [ # # ]: 0 : struct rte_event *out_event = &events[i - offset];
1081 : : rte_memcpy(out_event, in_event,
1082 : : sizeof(struct rte_event));
1083 : : }
1084 : : }
1085 : :
1086 : 0 : *num -= offset;
1087 : 0 : }
1088 : :
1089 : : #define DRAIN_DEQUEUE_BURST_SIZE (32)
1090 : :
1091 : : static void
1092 : 0 : dsw_port_drain_in_ring(struct dsw_port *source_port)
1093 : : {
1094 : 0 : for (;;) {
1095 : : struct rte_event events[DRAIN_DEQUEUE_BURST_SIZE];
1096 : : uint16_t n;
1097 : : uint16_t i;
1098 : : uint16_t available;
1099 : :
1100 [ # # # # : 0 : n = rte_event_ring_dequeue_burst(source_port->in_ring,
# ]
1101 : : events,
1102 : : DRAIN_DEQUEUE_BURST_SIZE,
1103 : : &available);
1104 : :
1105 [ # # # # ]: 0 : if (n == 0 && available == 0)
1106 : : break;
1107 : :
1108 [ # # ]: 0 : for (i = 0; i < n; i++) {
1109 : 0 : struct rte_event *event = &events[i];
1110 : : uint16_t flow_hash;
1111 : :
1112 : 0 : flow_hash = dsw_flow_id_hash(event->flow_id);
1113 : :
1114 [ # # ]: 0 : if (unlikely(dsw_port_is_flow_migrating(source_port,
1115 : : event->queue_id,
1116 : : flow_hash)))
1117 : : dsw_port_stash_migrating_event(source_port,
1118 : : event);
1119 : : else
1120 : : dsw_port_buffer_in_buffer(source_port, event);
1121 : : }
1122 : : }
1123 : 0 : }
1124 : :
1125 : : static void
1126 : 0 : dsw_port_forward_emigrated_event(struct dsw_evdev *dsw,
1127 : : struct dsw_port *source_port,
1128 : : struct rte_event *event)
1129 : : {
1130 : : uint16_t i;
1131 : :
1132 [ # # ]: 0 : for (i = 0; i < source_port->emigration_targets_len; i++) {
1133 : : struct dsw_queue_flow *qf =
1134 : 0 : &source_port->emigration_target_qfs[i];
1135 : 0 : uint8_t dest_port_id =
1136 : : source_port->emigration_target_port_ids[i];
1137 : 0 : struct dsw_port *dest_port = &dsw->ports[dest_port_id];
1138 : :
1139 [ # # ]: 0 : if (event->queue_id == qf->queue_id &&
1140 [ # # ]: 0 : dsw_flow_id_hash(event->flow_id) == qf->flow_hash) {
1141 : : /* No need to care about bursting forwarded
1142 : : * events (to the destination port's in_ring),
1143 : : * since migration doesn't happen very often,
1144 : : * and also the majority of the dequeued
1145 : : * events will likely *not* be forwarded.
1146 : : */
1147 [ # # # # : 0 : while (rte_event_ring_enqueue_burst(dest_port->in_ring,
# ]
1148 : : event, 1,
1149 [ # # ]: 0 : NULL) != 1)
1150 : : rte_pause();
1151 : : return;
1152 : : }
1153 : : }
1154 : :
1155 : : /* Event did not belong to the emigrated flows */
1156 : : dsw_port_buffer_in_buffer(source_port, event);
1157 : : }
1158 : :
1159 : : static void
1160 : : dsw_port_forward_emigrated_flows(struct dsw_evdev *dsw,
1161 : : struct dsw_port *source_port)
1162 : : {
1163 : : uint16_t i;
1164 : :
1165 [ # # ]: 0 : for (i = 0; i < source_port->emigrating_events_len; i++) {
1166 : 0 : struct rte_event *event = &source_port->emigrating_events[i];
1167 : :
1168 : 0 : dsw_port_forward_emigrated_event(dsw, source_port, event);
1169 : : }
1170 : 0 : source_port->emigrating_events_len = 0;
1171 : : }
1172 : :
1173 : : static void
1174 : 0 : dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
1175 : : struct dsw_port *source_port)
1176 : : {
1177 : : uint8_t i;
1178 : :
1179 : : /* There may be events lingering in the output buffer from
1180 : : * prior to the pause took effect.
1181 : : */
1182 : : dsw_port_flush_out_buffers(dsw, source_port);
1183 : :
1184 [ # # ]: 0 : for (i = 0; i < source_port->emigration_targets_len; i++) {
1185 : : struct dsw_queue_flow *qf =
1186 : 0 : &source_port->emigration_target_qfs[i];
1187 : 0 : uint8_t dest_port_id =
1188 : : source_port->emigration_target_port_ids[i];
1189 : :
1190 : 0 : dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
1191 : : dest_port_id;
1192 : : }
1193 : :
1194 : 0 : rte_smp_wmb();
1195 : :
1196 : 0 : dsw_port_drain_in_ring(source_port);
1197 : : dsw_port_forward_emigrated_flows(dsw, source_port);
1198 : :
1199 : : dsw_port_remove_paused_flows(source_port,
1200 : 0 : source_port->emigration_target_qfs,
1201 : 0 : source_port->emigration_targets_len);
1202 : :
1203 : 0 : dsw_port_flush_no_longer_paused_events(dsw, source_port);
1204 : :
1205 : : /* Processing migrating flows during migration may have
1206 : : * produced events to paused flows, including the flows which
1207 : : * were being migrated. Flushing the output buffers before
1208 : : * unpausing the flows on other ports assures that such events
1209 : : * are seen *before* any events produced by processing the
1210 : : * migrating flows on the new port.
1211 : : */
1212 : : dsw_port_flush_out_buffers(dsw, source_port);
1213 : :
1214 : : /* Flow table update and migration destination port's enqueues
1215 : : * must be seen before the control message.
1216 : : */
1217 : 0 : rte_smp_wmb();
1218 : :
1219 : 0 : dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUSE_REQ,
1220 : : source_port->emigration_target_qfs,
1221 : 0 : source_port->emigration_targets_len);
1222 : 0 : source_port->cfm_cnt = 0;
1223 : 0 : source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
1224 : 0 : }
1225 : :
1226 : : static void
1227 : 0 : dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
1228 : : {
1229 : 0 : port->cfm_cnt++;
1230 : :
1231 [ # # ]: 0 : if (port->cfm_cnt == (dsw->num_ports - 1)) {
1232 [ # # # ]: 0 : switch (port->migration_state) {
1233 : 0 : case DSW_MIGRATION_STATE_PAUSING:
1234 : 0 : dsw_port_move_emigrating_flows(dsw, port);
1235 : 0 : break;
1236 : 0 : case DSW_MIGRATION_STATE_UNPAUSING:
1237 : 0 : dsw_port_end_emigration(dsw, port,
1238 : : RTE_SCHED_TYPE_ATOMIC);
1239 : 0 : break;
1240 : : default:
1241 : 0 : RTE_VERIFY(0);
1242 : : break;
1243 : : }
1244 : : }
1245 : 0 : }
1246 : :
1247 : : static void
1248 : 0 : dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
1249 : : {
1250 : : struct dsw_ctl_msg msg;
1251 : :
1252 [ # # ]: 0 : if (dsw_port_ctl_dequeue(port, &msg) == 0) {
1253 [ # # # # ]: 0 : switch (msg.type) {
1254 : 0 : case DSW_CTL_PAUSE_REQ:
1255 : 0 : dsw_port_handle_pause_flows(dsw, port,
1256 : 0 : msg.originating_port_id,
1257 : 0 : msg.qfs, msg.qfs_len);
1258 : 0 : break;
1259 : 0 : case DSW_CTL_UNPAUSE_REQ:
1260 : 0 : dsw_port_handle_unpause_flows(dsw, port,
1261 : 0 : msg.originating_port_id,
1262 : 0 : msg.qfs, msg.qfs_len);
1263 : 0 : break;
1264 : 0 : case DSW_CTL_CFM:
1265 : 0 : dsw_port_handle_confirm(dsw, port);
1266 : 0 : break;
1267 : : }
1268 : : }
1269 : 0 : }
1270 : :
1271 : : static void
1272 : : dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
1273 : : {
1274 : 0 : port->ops_since_bg_task += (num_events+1);
1275 : 0 : }
1276 : :
1277 : : static void
1278 : 0 : dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
1279 : : {
1280 : : /* Polling the control ring is relatively inexpensive, and
1281 : : * polling it often helps bringing down migration latency, so
1282 : : * do this for every iteration.
1283 : : */
1284 : 0 : dsw_port_ctl_process(dsw, port);
1285 : :
1286 : : /* Always check if a migration is waiting for pending releases
1287 : : * to arrive, to minimize the amount of time dequeuing events
1288 : : * from the port is disabled.
1289 : : */
1290 : : dsw_port_try_finish_pending(dsw, port);
1291 : :
1292 : : /* To avoid considering migration and flushing output buffers
1293 : : * on every dequeue/enqueue call, the scheduler only performs
1294 : : * such 'background' tasks every nth
1295 : : * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
1296 : : */
1297 [ # # ]: 0 : if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
1298 : : uint64_t now;
1299 : :
1300 : : now = rte_get_timer_cycles();
1301 : :
1302 : 0 : port->last_bg = now;
1303 : :
1304 : : /* Logic to avoid having events linger in the output
1305 : : * buffer too long.
1306 : : */
1307 : : dsw_port_flush_out_buffers(dsw, port);
1308 : :
1309 : : dsw_port_consider_load_update(port, now);
1310 : :
1311 : 0 : dsw_port_consider_emigration(dsw, port, now);
1312 : :
1313 : 0 : port->ops_since_bg_task = 0;
1314 : : }
1315 : 0 : }
1316 : :
1317 : : static void
1318 : : dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
1319 : : {
1320 : : uint16_t dest_port_id;
1321 : :
1322 [ # # # # : 0 : for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
# # # # #
# # # # #
# # # # ]
1323 : 0 : dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
1324 : : }
1325 : :
1326 : : static __rte_always_inline uint16_t
1327 : : dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
1328 : : const struct rte_event events[],
1329 : : uint16_t events_len, bool op_types_known,
1330 : : uint16_t num_new, uint16_t num_forward,
1331 : : uint16_t num_release)
1332 : : {
1333 : 0 : struct dsw_evdev *dsw = source_port->dsw;
1334 : : bool enough_credits;
1335 : : uint16_t i;
1336 : :
1337 : : DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "Attempting to enqueue %d "
1338 : : "events.", events_len);
1339 : :
1340 : 0 : dsw_port_bg_process(dsw, source_port);
1341 : :
1342 : : /* XXX: For performance (=ring efficiency) reasons, the
1343 : : * scheduler relies on internal non-ring buffers instead of
1344 : : * immediately sending the event to the destination ring. For
1345 : : * a producer that doesn't intend to produce or consume any
1346 : : * more events, the scheduler provides a way to flush the
1347 : : * buffer, by means of doing an enqueue of zero events. In
1348 : : * addition, a port cannot be left "unattended" (e.g. unused)
1349 : : * for long periods of time, since that would stall
1350 : : * migration. Eventdev API extensions to provide a cleaner way
1351 : : * to archive both of these functions should be
1352 : : * considered.
1353 : : */
1354 [ # # # # : 0 : if (unlikely(events_len == 0)) {
# # ]
1355 : : dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1356 : : dsw_port_flush_out_buffers(dsw, source_port);
1357 : : return 0;
1358 : : }
1359 : :
1360 : : dsw_port_note_op(source_port, events_len);
1361 : :
1362 : : if (!op_types_known)
1363 [ # # ]: 0 : for (i = 0; i < events_len; i++) {
1364 [ # # # # ]: 0 : switch (events[i].op) {
1365 : 0 : case RTE_EVENT_OP_NEW:
1366 : 0 : num_new++;
1367 : 0 : break;
1368 : 0 : case RTE_EVENT_OP_FORWARD:
1369 : 0 : num_forward++;
1370 : 0 : break;
1371 : 0 : case RTE_EVENT_OP_RELEASE:
1372 : 0 : num_release++;
1373 : 0 : break;
1374 : : }
1375 : : }
1376 : :
1377 : : /* Technically, we could allow the non-new events up to the
1378 : : * first new event in the array into the system, but for
1379 : : * simplicity reasons, we deny the whole burst if the port is
1380 : : * above the water mark.
1381 : : */
1382 [ # # # # : 0 : if (unlikely(num_new > 0 &&
# # # # ]
1383 : : rte_atomic_load_explicit(&dsw->credits_on_loan,
1384 : : rte_memory_order_relaxed) >
1385 : : source_port->new_event_threshold))
1386 : : return 0;
1387 : :
1388 : 0 : enough_credits = dsw_port_acquire_credits(dsw, source_port, num_new);
1389 [ # # # # : 0 : if (unlikely(!enough_credits))
# # ]
1390 : : return 0;
1391 : :
1392 [ # # ]: 0 : dsw_port_return_credits(dsw, source_port, num_release);
1393 : :
1394 : : /* This may seem harsh, but it's important for an application
1395 : : * to get early feedback for cases where it fails to stick to
1396 : : * the API contract.
1397 : : */
1398 [ # # # # ]: 0 : RTE_VERIFY(num_forward + num_release <= source_port->pending_releases);
1399 : 0 : source_port->pending_releases -= (num_forward + num_release);
1400 : :
1401 : : dsw_port_enqueue_stats(source_port, num_new, num_forward, num_release);
1402 : :
1403 [ # # # # : 0 : for (i = 0; i < events_len; i++) {
# # ]
1404 : 0 : const struct rte_event *event = &events[i];
1405 : :
1406 [ # # # # ]: 0 : if (likely(num_release == 0 ||
1407 : : event->op != RTE_EVENT_OP_RELEASE))
1408 : 0 : dsw_port_buffer_event(dsw, source_port, event);
1409 : 0 : dsw_port_queue_enqueue_stats(source_port, event->queue_id);
1410 : : }
1411 : :
1412 : : DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "%d non-release events "
1413 : : "accepted.", num_new + num_forward);
1414 : :
1415 : 0 : return (num_new + num_forward + num_release);
1416 : : }
1417 : :
1418 : : uint16_t
1419 : 0 : dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1420 : : uint16_t events_len)
1421 : : {
1422 : : struct dsw_port *source_port = port;
1423 : :
1424 [ # # ]: 0 : if (unlikely(events_len > source_port->enqueue_depth))
1425 : : events_len = source_port->enqueue_depth;
1426 : :
1427 : 0 : return dsw_event_enqueue_burst_generic(source_port, events,
1428 : : events_len, false, 0, 0, 0);
1429 : : }
1430 : :
1431 : : uint16_t
1432 : 0 : dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1433 : : uint16_t events_len)
1434 : : {
1435 : : struct dsw_port *source_port = port;
1436 : :
1437 [ # # ]: 0 : if (unlikely(events_len > source_port->enqueue_depth))
1438 : : events_len = source_port->enqueue_depth;
1439 : :
1440 : 0 : return dsw_event_enqueue_burst_generic(source_port, events,
1441 : : events_len, true, events_len,
1442 : : 0, 0);
1443 : : }
1444 : :
1445 : : uint16_t
1446 : 0 : dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1447 : : uint16_t events_len)
1448 : : {
1449 : : struct dsw_port *source_port = port;
1450 : :
1451 [ # # ]: 0 : if (unlikely(events_len > source_port->enqueue_depth))
1452 : : events_len = source_port->enqueue_depth;
1453 : :
1454 : 0 : return dsw_event_enqueue_burst_generic(source_port, events,
1455 : : events_len, true, 0,
1456 : : events_len, 0);
1457 : : }
1458 : :
1459 : : static void
1460 : 0 : dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1461 : : uint16_t num)
1462 : : {
1463 : : uint16_t i;
1464 : :
1465 : 0 : dsw_port_dequeue_stats(port, num);
1466 : :
1467 [ # # ]: 0 : for (i = 0; i < num; i++) {
1468 : 0 : uint16_t l_idx = port->seen_events_idx;
1469 : 0 : struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1470 : 0 : struct rte_event *event = &events[i];
1471 : 0 : qf->queue_id = event->queue_id;
1472 : 0 : qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1473 : :
1474 : 0 : port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1475 : :
1476 : 0 : dsw_port_queue_dequeued_stats(port, event->queue_id);
1477 : : }
1478 : :
1479 [ # # ]: 0 : if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1480 : 0 : port->seen_events_len =
1481 : 0 : RTE_MIN(port->seen_events_len + num,
1482 : : DSW_MAX_EVENTS_RECORDED);
1483 : 0 : }
1484 : :
1485 : : #ifdef DSW_SORT_DEQUEUED
1486 : :
1487 : : #define DSW_EVENT_TO_INT(_event) \
1488 : : ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1489 : :
1490 : : static inline int
1491 : : dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1492 : : {
1493 : : const struct rte_event *event_a = v_event_a;
1494 : : const struct rte_event *event_b = v_event_b;
1495 : :
1496 : : return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1497 : : }
1498 : : #endif
1499 : :
1500 : : static uint16_t
1501 : 0 : dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1502 : : uint16_t num)
1503 : : {
1504 : 0 : enum dsw_migration_state state = port->migration_state;
1505 : : uint16_t dequeued;
1506 : :
1507 [ # # ]: 0 : if (unlikely(state == DSW_MIGRATION_STATE_FINISH_PENDING))
1508 : : /* Do not produce new items of work - only finish
1509 : : * outstanding (unreleased) events, to allow the
1510 : : * migration procedure to continue.
1511 : : */
1512 : 0 : dequeued = 0;
1513 [ # # ]: 0 : else if (unlikely(port->in_buffer_len > 0)) {
1514 : 0 : dequeued = RTE_MIN(num, port->in_buffer_len);
1515 : :
1516 [ # # ]: 0 : rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1517 : : dequeued * sizeof(struct rte_event));
1518 : :
1519 : 0 : port->in_buffer_start += dequeued;
1520 : 0 : port->in_buffer_len -= dequeued;
1521 : :
1522 [ # # ]: 0 : if (port->in_buffer_len == 0)
1523 : 0 : port->in_buffer_start = 0;
1524 : : } else {
1525 [ # # # # : 0 : dequeued = rte_event_ring_dequeue_burst(port->in_ring,
# ]
1526 : : events, num, NULL);
1527 : :
1528 : : /* Stash incoming events belonging to migrating flows,
1529 : : * to avoid having to deal with forwarded events to
1530 : : * flows which are also in the process of being
1531 : : * migrated. A failure to do so leads to reordering,
1532 : : * since paused events on the source port may be
1533 : : * flushed after paused events on the migration
1534 : : * destination port.
1535 : : */
1536 [ # # ]: 0 : if (unlikely(state == DSW_MIGRATION_STATE_PAUSING))
1537 : 0 : dsw_port_stash_any_migrating_events(port, events,
1538 : : &dequeued);
1539 : : }
1540 : :
1541 : 0 : return dequeued;
1542 : : }
1543 : :
1544 : : uint16_t
1545 : 0 : dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1546 : : uint64_t wait __rte_unused)
1547 : : {
1548 : : struct dsw_port *source_port = port;
1549 : 0 : struct dsw_evdev *dsw = source_port->dsw;
1550 : : uint16_t dequeued;
1551 : :
1552 [ # # ]: 0 : if (source_port->implicit_release) {
1553 : 0 : dsw_port_return_credits(dsw, port,
1554 [ # # ]: 0 : source_port->pending_releases);
1555 : :
1556 : 0 : source_port->pending_releases = 0;
1557 : : }
1558 : :
1559 : 0 : dsw_port_bg_process(dsw, source_port);
1560 : :
1561 [ # # ]: 0 : if (unlikely(num > source_port->dequeue_depth))
1562 : : num = source_port->dequeue_depth;
1563 : :
1564 : 0 : dequeued = dsw_port_dequeue_burst(source_port, events, num);
1565 : :
1566 : 0 : source_port->pending_releases += dequeued;
1567 : :
1568 [ # # ]: 0 : dsw_port_load_record(source_port, dequeued);
1569 : :
1570 : 0 : dsw_port_note_op(source_port, dequeued);
1571 : :
1572 [ # # ]: 0 : if (dequeued > 0) {
1573 : : DSW_LOG_DP_PORT_LINE(DEBUG, source_port->id, "Dequeued %d events.",
1574 : : dequeued);
1575 : :
1576 : : /* One potential optimization one might think of is to
1577 : : * add a migration state (prior to 'pausing'), and
1578 : : * only record seen events when the port is in this
1579 : : * state (and transit to 'pausing' when enough events
1580 : : * have been gathered). However, that schema doesn't
1581 : : * seem to improve performance.
1582 : : */
1583 : 0 : dsw_port_record_seen_events(port, events, dequeued);
1584 : : } else /* Zero-size dequeue means a likely idle port, and thus
1585 : : * we can afford trading some efficiency for a slightly
1586 : : * reduced event wall-time latency.
1587 : : */
1588 : : dsw_port_flush_out_buffers(dsw, port);
1589 : :
1590 : : #ifdef DSW_SORT_DEQUEUED
1591 : : dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
1592 : : #endif
1593 : :
1594 : 0 : return dequeued;
1595 : : }
1596 : :
1597 : 0 : void dsw_event_maintain(void *port, int op)
1598 : : {
1599 : : struct dsw_port *source_port = port;
1600 : 0 : struct dsw_evdev *dsw = source_port->dsw;
1601 : :
1602 : : dsw_port_note_op(source_port, 0);
1603 : 0 : dsw_port_bg_process(dsw, source_port);
1604 : :
1605 [ # # ]: 0 : if (op & RTE_EVENT_DEV_MAINT_OP_FLUSH)
1606 : : dsw_port_flush_out_buffers(dsw, source_port);
1607 : 0 : }
|