Branch data Line data Source code
1 : : /* SPDX-License-Identifier: BSD-3-Clause
2 : : * Copyright(c) 2018 Ericsson AB
3 : : */
4 : :
5 : : #include "dsw_evdev.h"
6 : :
7 : : #ifdef DSW_SORT_DEQUEUED
8 : : #include "dsw_sort.h"
9 : : #endif
10 : :
11 : : #include <stdbool.h>
12 : : #include <stdlib.h>
13 : : #include <string.h>
14 : :
15 : : #include <rte_cycles.h>
16 : : #include <rte_memcpy.h>
17 : : #include <rte_random.h>
18 : :
19 : : static bool
20 : 0 : dsw_port_acquire_credits(struct dsw_evdev *dsw, struct dsw_port *port,
21 : : int32_t credits)
22 : : {
23 : 0 : int32_t inflight_credits = port->inflight_credits;
24 : 0 : int32_t missing_credits = credits - inflight_credits;
25 : : int32_t total_on_loan;
26 : : int32_t available;
27 : : int32_t acquired_credits;
28 : : int32_t new_total_on_loan;
29 : :
30 [ # # ]: 0 : if (likely(missing_credits <= 0)) {
31 : 0 : port->inflight_credits -= credits;
32 : 0 : return true;
33 : : }
34 : :
35 : 0 : total_on_loan =
36 : 0 : __atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED);
37 : 0 : available = dsw->max_inflight - total_on_loan;
38 : 0 : acquired_credits = RTE_MAX(missing_credits, DSW_PORT_MIN_CREDITS);
39 : :
40 [ # # ]: 0 : if (available < acquired_credits)
41 : : return false;
42 : :
43 : : /* This is a race, no locks are involved, and thus some other
44 : : * thread can allocate tokens in between the check and the
45 : : * allocation.
46 : : */
47 : 0 : new_total_on_loan =
48 : 0 : __atomic_fetch_add(&dsw->credits_on_loan, acquired_credits,
49 : : __ATOMIC_RELAXED) + acquired_credits;
50 : :
51 [ # # ]: 0 : if (unlikely(new_total_on_loan > dsw->max_inflight)) {
52 : : /* Some other port took the last credits */
53 : 0 : __atomic_fetch_sub(&dsw->credits_on_loan, acquired_credits,
54 : : __ATOMIC_RELAXED);
55 : 0 : return false;
56 : : }
57 : :
58 : : DSW_LOG_DP_PORT(DEBUG, port->id, "Acquired %d tokens from pool.\n",
59 : : acquired_credits);
60 : :
61 : 0 : port->inflight_credits += acquired_credits;
62 : 0 : port->inflight_credits -= credits;
63 : :
64 : 0 : return true;
65 : : }
66 : :
67 : : static void
68 : : dsw_port_return_credits(struct dsw_evdev *dsw, struct dsw_port *port,
69 : : int32_t credits)
70 : : {
71 : 0 : port->inflight_credits += credits;
72 : :
73 [ # # ]: 0 : if (unlikely(port->inflight_credits > DSW_PORT_MAX_CREDITS)) {
74 : : int32_t leave_credits = DSW_PORT_MIN_CREDITS;
75 : 0 : int32_t return_credits =
76 : : port->inflight_credits - leave_credits;
77 : :
78 : 0 : port->inflight_credits = leave_credits;
79 : :
80 : 0 : __atomic_fetch_sub(&dsw->credits_on_loan, return_credits,
81 : : __ATOMIC_RELAXED);
82 : :
83 : : DSW_LOG_DP_PORT(DEBUG, port->id,
84 : : "Returned %d tokens to pool.\n",
85 : : return_credits);
86 : : }
87 : : }
88 : :
89 : : static void
90 : : dsw_port_enqueue_stats(struct dsw_port *port, uint16_t num_new,
91 : : uint16_t num_forward, uint16_t num_release)
92 : : {
93 : 0 : port->new_enqueued += num_new;
94 : 0 : port->forward_enqueued += num_forward;
95 : 0 : port->release_enqueued += num_release;
96 : 0 : }
97 : :
98 : : static void
99 : : dsw_port_queue_enqueue_stats(struct dsw_port *source_port, uint8_t queue_id)
100 : : {
101 : 0 : source_port->queue_enqueued[queue_id]++;
102 : : }
103 : :
104 : : static void
105 : : dsw_port_dequeue_stats(struct dsw_port *port, uint16_t num)
106 : : {
107 : 0 : port->dequeued += num;
108 : : }
109 : :
110 : : static void
111 : : dsw_port_queue_dequeued_stats(struct dsw_port *source_port, uint8_t queue_id)
112 : : {
113 : 0 : source_port->queue_dequeued[queue_id]++;
114 : : }
115 : :
116 : : static void
117 : : dsw_port_load_record(struct dsw_port *port, unsigned int dequeued)
118 : : {
119 [ # # ]: 0 : if (dequeued > 0 && port->busy_start == 0)
120 : : /* work period begins */
121 : 0 : port->busy_start = rte_get_timer_cycles();
122 [ # # # # ]: 0 : else if (dequeued == 0 && port->busy_start > 0) {
123 : : /* work period ends */
124 : 0 : uint64_t work_period =
125 : 0 : rte_get_timer_cycles() - port->busy_start;
126 : 0 : port->busy_cycles += work_period;
127 : 0 : port->busy_start = 0;
128 : : }
129 : : }
130 : :
131 : : static int16_t
132 : : dsw_port_load_close_period(struct dsw_port *port, uint64_t now)
133 : : {
134 : 0 : uint64_t passed = now - port->measurement_start;
135 : 0 : uint64_t busy_cycles = port->busy_cycles;
136 : :
137 : 0 : if (port->busy_start > 0) {
138 : 0 : busy_cycles += (now - port->busy_start);
139 : 0 : port->busy_start = now;
140 : : }
141 : :
142 : 0 : int16_t load = (DSW_MAX_LOAD * busy_cycles) / passed;
143 : :
144 : 0 : port->measurement_start = now;
145 : 0 : port->busy_cycles = 0;
146 : :
147 : 0 : port->total_busy_cycles += busy_cycles;
148 : :
149 : : return load;
150 : : }
151 : :
152 : : static void
153 : 0 : dsw_port_load_update(struct dsw_port *port, uint64_t now)
154 : : {
155 : : int16_t old_load;
156 : : int16_t period_load;
157 : : int16_t new_load;
158 : :
159 [ # # ]: 0 : old_load = __atomic_load_n(&port->load, __ATOMIC_RELAXED);
160 : :
161 : : period_load = dsw_port_load_close_period(port, now);
162 : :
163 : 0 : new_load = (period_load + old_load*DSW_OLD_LOAD_WEIGHT) /
164 : : (DSW_OLD_LOAD_WEIGHT+1);
165 : :
166 : 0 : __atomic_store_n(&port->load, new_load, __ATOMIC_RELAXED);
167 : :
168 : : /* The load of the recently immigrated flows should hopefully
169 : : * be reflected the load estimate by now.
170 : : */
171 : 0 : __atomic_store_n(&port->immigration_load, 0, __ATOMIC_RELAXED);
172 : 0 : }
173 : :
174 : : static void
175 : : dsw_port_consider_load_update(struct dsw_port *port, uint64_t now)
176 : : {
177 [ # # ]: 0 : if (now < port->next_load_update)
178 : : return;
179 : :
180 : 0 : port->next_load_update = now + port->load_update_interval;
181 : :
182 : 0 : dsw_port_load_update(port, now);
183 : : }
184 : :
185 : : static void
186 : 0 : dsw_port_ctl_enqueue(struct dsw_port *port, struct dsw_ctl_msg *msg)
187 : : {
188 : : /* there's always room on the ring */
189 [ # # # # : 0 : while (rte_ring_enqueue_elem(port->ctl_in_ring, msg, sizeof(*msg)) != 0)
# ]
190 : : rte_pause();
191 : 0 : }
192 : :
193 : : static int
194 : 0 : dsw_port_ctl_dequeue(struct dsw_port *port, struct dsw_ctl_msg *msg)
195 : : {
196 [ # # # # : 0 : return rte_ring_dequeue_elem(port->ctl_in_ring, msg, sizeof(*msg));
# ]
197 : : }
198 : :
199 : : static void
200 : 0 : dsw_port_ctl_broadcast(struct dsw_evdev *dsw, struct dsw_port *source_port,
201 : : uint8_t type, struct dsw_queue_flow *qfs,
202 : : uint8_t qfs_len)
203 : : {
204 : : uint16_t port_id;
205 : 0 : struct dsw_ctl_msg msg = {
206 : : .type = type,
207 : 0 : .originating_port_id = source_port->id,
208 : : .qfs_len = qfs_len
209 : : };
210 : :
211 : 0 : memcpy(msg.qfs, qfs, sizeof(struct dsw_queue_flow) * qfs_len);
212 : :
213 [ # # ]: 0 : for (port_id = 0; port_id < dsw->num_ports; port_id++)
214 [ # # ]: 0 : if (port_id != source_port->id)
215 : 0 : dsw_port_ctl_enqueue(&dsw->ports[port_id], &msg);
216 : 0 : }
217 : :
218 : : static __rte_always_inline bool
219 : : dsw_is_queue_flow_in_ary(const struct dsw_queue_flow *qfs, uint16_t qfs_len,
220 : : uint8_t queue_id, uint16_t flow_hash)
221 : : {
222 : : uint16_t i;
223 : :
224 [ # # # # : 0 : for (i = 0; i < qfs_len; i++)
# # # # #
# ]
225 [ # # # # : 0 : if (qfs[i].queue_id == queue_id &&
# # # # #
# ]
226 [ # # # # : 0 : qfs[i].flow_hash == flow_hash)
# # # # #
# ]
227 : : return true;
228 : :
229 : : return false;
230 : : }
231 : :
232 : : static __rte_always_inline bool
233 : : dsw_port_is_flow_paused(struct dsw_port *port, uint8_t queue_id,
234 : : uint16_t flow_hash)
235 : : {
236 : 0 : return dsw_is_queue_flow_in_ary(port->paused_flows,
237 : 0 : port->paused_flows_len,
238 : : queue_id, flow_hash);
239 : : }
240 : :
241 : : static __rte_always_inline bool
242 : : dsw_port_is_flow_migrating(struct dsw_port *port, uint8_t queue_id,
243 : : uint16_t flow_hash)
244 : : {
245 : 0 : return dsw_is_queue_flow_in_ary(port->emigration_target_qfs,
246 : 0 : port->emigration_targets_len,
247 : : queue_id, flow_hash);
248 : : }
249 : :
250 : : static void
251 : : dsw_port_add_paused_flows(struct dsw_port *port, struct dsw_queue_flow *qfs,
252 : : uint8_t qfs_len)
253 : : {
254 : : uint8_t i;
255 : :
256 [ # # # # ]: 0 : for (i = 0; i < qfs_len; i++) {
257 : 0 : struct dsw_queue_flow *qf = &qfs[i];
258 : :
259 : : DSW_LOG_DP_PORT(DEBUG, port->id,
260 : : "Pausing queue_id %d flow_hash %d.\n",
261 : : qf->queue_id, qf->flow_hash);
262 : :
263 : 0 : port->paused_flows[port->paused_flows_len] = *qf;
264 : 0 : port->paused_flows_len++;
265 : : };
266 : : }
267 : :
268 : : static void
269 : 0 : dsw_port_remove_paused_flow(struct dsw_port *port,
270 : : struct dsw_queue_flow *target_qf)
271 : : {
272 : : uint16_t i;
273 : :
274 [ # # ]: 0 : for (i = 0; i < port->paused_flows_len; i++) {
275 : 0 : struct dsw_queue_flow *qf = &port->paused_flows[i];
276 : :
277 [ # # ]: 0 : if (qf->queue_id == target_qf->queue_id &&
278 [ # # ]: 0 : qf->flow_hash == target_qf->flow_hash) {
279 : 0 : uint16_t last_idx = port->paused_flows_len-1;
280 [ # # ]: 0 : if (i != last_idx)
281 : 0 : port->paused_flows[i] =
282 : 0 : port->paused_flows[last_idx];
283 : 0 : port->paused_flows_len--;
284 : :
285 : : DSW_LOG_DP_PORT(DEBUG, port->id,
286 : : "Unpausing queue_id %d flow_hash %d.\n",
287 : : target_qf->queue_id,
288 : : target_qf->flow_hash);
289 : :
290 : 0 : return;
291 : : }
292 : : }
293 : :
294 : 0 : DSW_LOG_DP_PORT(ERR, port->id,
295 : : "Failed to unpause queue_id %d flow_hash %d.\n",
296 : : target_qf->queue_id, target_qf->flow_hash);
297 : : }
298 : :
299 : : static void
300 : : dsw_port_remove_paused_flows(struct dsw_port *port,
301 : : struct dsw_queue_flow *qfs, uint8_t qfs_len)
302 : : {
303 : : uint8_t i;
304 : :
305 [ # # # # ]: 0 : for (i = 0; i < qfs_len; i++)
306 : 0 : dsw_port_remove_paused_flow(port, &qfs[i]);
307 : : }
308 : :
309 : : static void
310 : : dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port);
311 : :
312 : : static void
313 : 0 : dsw_port_handle_pause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
314 : : uint8_t originating_port_id,
315 : : struct dsw_queue_flow *paused_qfs,
316 : : uint8_t qfs_len)
317 : : {
318 : 0 : struct dsw_ctl_msg cfm = {
319 : : .type = DSW_CTL_CFM,
320 : 0 : .originating_port_id = port->id
321 : : };
322 : :
323 : : /* There might be already-scheduled events belonging to the
324 : : * paused flow in the output buffers.
325 : : */
326 : : dsw_port_flush_out_buffers(dsw, port);
327 : :
328 : : dsw_port_add_paused_flows(port, paused_qfs, qfs_len);
329 : :
330 : : /* Make sure any stores to the original port's in_ring is seen
331 : : * before the ctl message.
332 : : */
333 : 0 : rte_smp_wmb();
334 : :
335 : 0 : dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
336 : 0 : }
337 : :
338 : : struct dsw_queue_flow_burst {
339 : : struct dsw_queue_flow queue_flow;
340 : : uint16_t count;
341 : : };
342 : :
343 : : #define DSW_QF_TO_INT(_qf) \
344 : : ((int)((((_qf)->queue_id)<<16)|((_qf)->flow_hash)))
345 : :
346 : : static inline int
347 : 0 : dsw_cmp_qf(const void *v_qf_a, const void *v_qf_b)
348 : : {
349 : : const struct dsw_queue_flow *qf_a = v_qf_a;
350 : : const struct dsw_queue_flow *qf_b = v_qf_b;
351 : :
352 : 0 : return DSW_QF_TO_INT(qf_a) - DSW_QF_TO_INT(qf_b);
353 : : }
354 : :
355 : : static uint16_t
356 : 0 : dsw_sort_qfs_to_bursts(struct dsw_queue_flow *qfs, uint16_t qfs_len,
357 : : struct dsw_queue_flow_burst *bursts)
358 : : {
359 : : uint16_t i;
360 : : struct dsw_queue_flow_burst *current_burst = NULL;
361 : : uint16_t num_bursts = 0;
362 : :
363 : : /* We don't need the stable property, and the list is likely
364 : : * large enough for qsort() to outperform dsw_stable_sort(),
365 : : * so we use qsort() here.
366 : : */
367 : 0 : qsort(qfs, qfs_len, sizeof(qfs[0]), dsw_cmp_qf);
368 : :
369 : : /* arrange the (now-consecutive) events into bursts */
370 [ # # ]: 0 : for (i = 0; i < qfs_len; i++) {
371 [ # # ]: 0 : if (i == 0 ||
372 [ # # ]: 0 : dsw_cmp_qf(&qfs[i], ¤t_burst->queue_flow) != 0) {
373 : 0 : current_burst = &bursts[num_bursts];
374 : 0 : current_burst->queue_flow = qfs[i];
375 : 0 : current_burst->count = 0;
376 : 0 : num_bursts++;
377 : : }
378 : 0 : current_burst->count++;
379 : : }
380 : :
381 : 0 : return num_bursts;
382 : : }
383 : :
384 : : static bool
385 : 0 : dsw_retrieve_port_loads(struct dsw_evdev *dsw, int16_t *port_loads,
386 : : int16_t load_limit)
387 : : {
388 : : bool below_limit = false;
389 : : uint16_t i;
390 : :
391 [ # # ]: 0 : for (i = 0; i < dsw->num_ports; i++) {
392 : 0 : int16_t measured_load =
393 : 0 : __atomic_load_n(&dsw->ports[i].load, __ATOMIC_RELAXED);
394 : 0 : int32_t immigration_load =
395 : 0 : __atomic_load_n(&dsw->ports[i].immigration_load,
396 : : __ATOMIC_RELAXED);
397 : 0 : int32_t load = measured_load + immigration_load;
398 : :
399 : 0 : load = RTE_MIN(load, DSW_MAX_LOAD);
400 : :
401 [ # # ]: 0 : if (load < load_limit)
402 : : below_limit = true;
403 : 0 : port_loads[i] = load;
404 : : }
405 : 0 : return below_limit;
406 : : }
407 : :
408 : : static int16_t
409 : : dsw_flow_load(uint16_t num_events, int16_t port_load)
410 : : {
411 : 0 : return ((int32_t)port_load * (int32_t)num_events) /
412 : : DSW_MAX_EVENTS_RECORDED;
413 : : }
414 : :
415 : : static int16_t
416 : : dsw_evaluate_migration(int16_t source_load, int16_t target_load,
417 : : int16_t flow_load)
418 : : {
419 : : int32_t res_target_load;
420 : : int32_t imbalance;
421 : :
422 : 0 : if (target_load > DSW_MAX_TARGET_LOAD_FOR_MIGRATION)
423 : : return -1;
424 : :
425 : 0 : imbalance = source_load - target_load;
426 : :
427 [ # # ]: 0 : if (imbalance < DSW_REBALANCE_THRESHOLD)
428 : : return -1;
429 : :
430 : 0 : res_target_load = target_load + flow_load;
431 : :
432 : : /* If the estimated load of the target port will be higher
433 : : * than the source port's load, it doesn't make sense to move
434 : : * the flow.
435 : : */
436 [ # # ]: 0 : if (res_target_load > source_load)
437 : : return -1;
438 : :
439 : : /* The more idle the target will be, the better. This will
440 : : * make migration prefer moving smaller flows, and flows to
441 : : * lightly loaded ports.
442 : : */
443 : 0 : return DSW_MAX_LOAD - res_target_load;
444 : : }
445 : :
446 : : static bool
447 : : dsw_is_serving_port(struct dsw_evdev *dsw, uint8_t port_id, uint8_t queue_id)
448 : : {
449 : : struct dsw_queue *queue = &dsw->queues[queue_id];
450 : : uint16_t i;
451 : :
452 [ # # ]: 0 : for (i = 0; i < queue->num_serving_ports; i++)
453 [ # # ]: 0 : if (queue->serving_ports[i] == port_id)
454 : : return true;
455 : :
456 : : return false;
457 : : }
458 : :
459 : : static bool
460 : 0 : dsw_select_emigration_target(struct dsw_evdev *dsw,
461 : : struct dsw_port *source_port,
462 : : struct dsw_queue_flow_burst *bursts,
463 : : uint16_t num_bursts,
464 : : int16_t *port_loads, uint16_t num_ports,
465 : : uint8_t *target_port_ids,
466 : : struct dsw_queue_flow *target_qfs,
467 : : uint8_t *targets_len)
468 : : {
469 : 0 : int16_t source_port_load = port_loads[source_port->id];
470 : : struct dsw_queue_flow *candidate_qf = NULL;
471 : : uint8_t candidate_port_id = 0;
472 : : int16_t candidate_weight = -1;
473 : : int16_t candidate_flow_load = -1;
474 : : uint16_t i;
475 : :
476 [ # # ]: 0 : if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION)
477 : : return false;
478 : :
479 [ # # ]: 0 : for (i = 0; i < num_bursts; i++) {
480 : 0 : struct dsw_queue_flow_burst *burst = &bursts[i];
481 : 0 : struct dsw_queue_flow *qf = &burst->queue_flow;
482 : : int16_t flow_load;
483 : : uint16_t port_id;
484 : :
485 [ # # ]: 0 : if (dsw_is_queue_flow_in_ary(target_qfs, *targets_len,
486 : 0 : qf->queue_id, qf->flow_hash))
487 : 0 : continue;
488 : :
489 : 0 : flow_load = dsw_flow_load(burst->count, source_port_load);
490 : :
491 [ # # ]: 0 : for (port_id = 0; port_id < num_ports; port_id++) {
492 : : int16_t weight;
493 : :
494 [ # # ]: 0 : if (port_id == source_port->id)
495 : 0 : continue;
496 : :
497 [ # # ]: 0 : if (!dsw_is_serving_port(dsw, port_id, qf->queue_id))
498 : 0 : continue;
499 : :
500 : 0 : weight = dsw_evaluate_migration(source_port_load,
501 [ # # ]: 0 : port_loads[port_id],
502 : : flow_load);
503 : :
504 [ # # ]: 0 : if (weight > candidate_weight) {
505 : : candidate_qf = qf;
506 : : candidate_port_id = port_id;
507 : : candidate_weight = weight;
508 : : candidate_flow_load = flow_load;
509 : : }
510 : : }
511 : : }
512 : :
513 [ # # ]: 0 : if (candidate_weight < 0)
514 : : return false;
515 : :
516 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "Selected queue_id %d "
517 : : "flow_hash %d (with flow load %d) for migration "
518 : : "to port %d.\n", candidate_qf->queue_id,
519 : : candidate_qf->flow_hash,
520 : : DSW_LOAD_TO_PERCENT(candidate_flow_load),
521 : : candidate_port_id);
522 : :
523 : 0 : port_loads[candidate_port_id] += candidate_flow_load;
524 : 0 : port_loads[source_port->id] -= candidate_flow_load;
525 : :
526 : 0 : target_port_ids[*targets_len] = candidate_port_id;
527 : 0 : target_qfs[*targets_len] = *candidate_qf;
528 : 0 : (*targets_len)++;
529 : :
530 : 0 : __atomic_fetch_add(&dsw->ports[candidate_port_id].immigration_load,
531 : : candidate_flow_load, __ATOMIC_RELAXED);
532 : :
533 : 0 : return true;
534 : : }
535 : :
536 : : static void
537 : 0 : dsw_select_emigration_targets(struct dsw_evdev *dsw,
538 : : struct dsw_port *source_port,
539 : : struct dsw_queue_flow_burst *bursts,
540 : : uint16_t num_bursts, int16_t *port_loads)
541 : : {
542 : 0 : struct dsw_queue_flow *target_qfs = source_port->emigration_target_qfs;
543 : 0 : uint8_t *target_port_ids = source_port->emigration_target_port_ids;
544 : 0 : uint8_t *targets_len = &source_port->emigration_targets_len;
545 : : uint16_t i;
546 : :
547 [ # # ]: 0 : for (i = 0; i < DSW_MAX_FLOWS_PER_MIGRATION; i++) {
548 : : bool found;
549 : :
550 : 0 : found = dsw_select_emigration_target(dsw, source_port,
551 : : bursts, num_bursts,
552 : 0 : port_loads, dsw->num_ports,
553 : : target_port_ids,
554 : : target_qfs,
555 : : targets_len);
556 [ # # ]: 0 : if (!found)
557 : : break;
558 : : }
559 : :
560 : : if (*targets_len == 0)
561 : : DSW_LOG_DP_PORT(DEBUG, source_port->id,
562 : : "For the %d flows considered, no target port "
563 : : "was found.\n", num_bursts);
564 : 0 : }
565 : :
566 : : static uint8_t
567 : : dsw_schedule(struct dsw_evdev *dsw, uint8_t queue_id, uint16_t flow_hash)
568 : : {
569 : : struct dsw_queue *queue = &dsw->queues[queue_id];
570 : : uint8_t port_id;
571 : :
572 [ # # # # ]: 0 : if (queue->num_serving_ports > 1)
573 : 0 : port_id = queue->flow_to_port_map[flow_hash];
574 : : else
575 : : /* A single-link queue, or atomic/ordered/parallel but
576 : : * with just a single serving port.
577 : : */
578 : 0 : port_id = queue->serving_ports[0];
579 : :
580 : : DSW_LOG_DP(DEBUG, "Event with queue_id %d flow_hash %d is scheduled "
581 : : "to port %d.\n", queue_id, flow_hash, port_id);
582 : :
583 : : return port_id;
584 : : }
585 : :
586 : : static void
587 : 0 : dsw_port_transmit_buffered(struct dsw_evdev *dsw, struct dsw_port *source_port,
588 : : uint8_t dest_port_id)
589 : : {
590 : 0 : struct dsw_port *dest_port = &(dsw->ports[dest_port_id]);
591 : : uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
592 : 0 : struct rte_event *buffer = source_port->out_buffer[dest_port_id];
593 : :
594 [ # # ]: 0 : if (*buffer_len == 0)
595 : : return;
596 : :
597 : : /* The rings are dimensioned to fit all in-flight events (even
598 : : * on a single ring), so looping will work.
599 : : */
600 [ # # # # : 0 : rte_event_ring_enqueue_bulk(dest_port->in_ring, buffer, *buffer_len,
# ]
601 : : NULL);
602 : :
603 : 0 : (*buffer_len) = 0;
604 : : }
605 : :
606 : : static uint16_t
607 : : dsw_port_get_parallel_flow_id(struct dsw_port *port)
608 : : {
609 : 0 : uint16_t flow_id = port->next_parallel_flow_id;
610 : :
611 : 0 : port->next_parallel_flow_id =
612 : 0 : (port->next_parallel_flow_id + 1) % DSW_PARALLEL_FLOWS;
613 : :
614 : : return flow_id;
615 : : }
616 : :
617 : : static void
618 : : dsw_port_buffer_paused(struct dsw_port *port,
619 : : const struct rte_event *paused_event)
620 : : {
621 : 0 : port->paused_events[port->paused_events_len] = *paused_event;
622 : 0 : port->paused_events_len++;
623 : 0 : }
624 : :
625 : :
626 : : static void
627 : 0 : dsw_port_buffer_non_paused(struct dsw_evdev *dsw, struct dsw_port *source_port,
628 : : uint8_t dest_port_id, const struct rte_event *event)
629 : : {
630 : 0 : struct rte_event *buffer = source_port->out_buffer[dest_port_id];
631 : : uint16_t *buffer_len = &source_port->out_buffer_len[dest_port_id];
632 : :
633 [ # # ]: 0 : if (*buffer_len == DSW_MAX_PORT_OUT_BUFFER)
634 : 0 : dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
635 : :
636 : 0 : buffer[*buffer_len] = *event;
637 : :
638 : 0 : (*buffer_len)++;
639 : 0 : }
640 : :
641 : : #define DSW_FLOW_ID_BITS (24)
642 : : static uint16_t
643 : : dsw_flow_id_hash(uint32_t flow_id)
644 : : {
645 : : uint16_t hash = 0;
646 : : uint16_t offset = 0;
647 : :
648 : : do {
649 : 0 : hash ^= ((flow_id >> offset) & DSW_MAX_FLOWS_MASK);
650 : 0 : offset += DSW_MAX_FLOWS_BITS;
651 [ # # # # : 0 : } while (offset < DSW_FLOW_ID_BITS);
# # # # #
# # # #
# ]
652 : :
653 : : return hash;
654 : : }
655 : :
656 : : static void
657 : 0 : dsw_port_buffer_parallel(struct dsw_evdev *dsw, struct dsw_port *source_port,
658 : : struct rte_event event)
659 : : {
660 : : uint8_t dest_port_id;
661 : :
662 : 0 : event.flow_id = dsw_port_get_parallel_flow_id(source_port);
663 : :
664 [ # # ]: 0 : dest_port_id = dsw_schedule(dsw, event.queue_id,
665 : 0 : dsw_flow_id_hash(event.flow_id));
666 : :
667 : 0 : dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, &event);
668 : 0 : }
669 : :
670 : : static void
671 : 0 : dsw_port_buffer_event(struct dsw_evdev *dsw, struct dsw_port *source_port,
672 : : const struct rte_event *event)
673 : : {
674 : : uint16_t flow_hash;
675 : : uint8_t dest_port_id;
676 : :
677 [ # # ]: 0 : if (unlikely(dsw->queues[event->queue_id].schedule_type ==
678 : : RTE_SCHED_TYPE_PARALLEL)) {
679 : 0 : dsw_port_buffer_parallel(dsw, source_port, *event);
680 : 0 : return;
681 : : }
682 : :
683 : 0 : flow_hash = dsw_flow_id_hash(event->flow_id);
684 : :
685 [ # # ]: 0 : if (unlikely(dsw_port_is_flow_paused(source_port, event->queue_id,
686 : : flow_hash))) {
687 : : dsw_port_buffer_paused(source_port, event);
688 : 0 : return;
689 : : }
690 : :
691 : : dest_port_id = dsw_schedule(dsw, event->queue_id, flow_hash);
692 : :
693 : 0 : dsw_port_buffer_non_paused(dsw, source_port, dest_port_id, event);
694 : : }
695 : :
696 : : static void
697 : 0 : dsw_port_flush_no_longer_paused_events(struct dsw_evdev *dsw,
698 : : struct dsw_port *source_port)
699 : 0 : {
700 : 0 : uint16_t paused_events_len = source_port->paused_events_len;
701 : 0 : struct rte_event paused_events[paused_events_len];
702 : : uint16_t i;
703 : :
704 [ # # ]: 0 : if (paused_events_len == 0)
705 : 0 : return;
706 : :
707 [ # # ]: 0 : rte_memcpy(paused_events, source_port->paused_events,
708 : : paused_events_len * sizeof(struct rte_event));
709 : :
710 : 0 : source_port->paused_events_len = 0;
711 : :
712 [ # # ]: 0 : for (i = 0; i < paused_events_len; i++) {
713 : 0 : struct rte_event *event = &paused_events[i];
714 : : uint16_t flow_hash;
715 : :
716 : 0 : flow_hash = dsw_flow_id_hash(event->flow_id);
717 : :
718 [ # # ]: 0 : if (dsw_port_is_flow_paused(source_port, event->queue_id,
719 : : flow_hash))
720 : : dsw_port_buffer_paused(source_port, event);
721 : : else {
722 : : uint8_t dest_port_id;
723 : :
724 : : dest_port_id = dsw_schedule(dsw, event->queue_id,
725 : : flow_hash);
726 : :
727 : 0 : dsw_port_buffer_non_paused(dsw, source_port,
728 : : dest_port_id, event);
729 : : }
730 : : }
731 : : }
732 : :
733 : : static void
734 : : dsw_port_emigration_stats(struct dsw_port *port, uint8_t finished)
735 : : {
736 : : uint64_t flow_migration_latency;
737 : :
738 : 0 : flow_migration_latency =
739 : 0 : (rte_get_timer_cycles() - port->emigration_start);
740 : 0 : port->emigration_latency += (flow_migration_latency * finished);
741 : 0 : port->emigrations += finished;
742 : 0 : }
743 : :
744 : : static void
745 : 0 : dsw_port_end_emigration(struct dsw_evdev *dsw, struct dsw_port *port,
746 : : uint8_t schedule_type)
747 : : {
748 : : uint8_t i;
749 : : struct dsw_queue_flow left_qfs[DSW_MAX_FLOWS_PER_MIGRATION];
750 : : uint8_t left_port_ids[DSW_MAX_FLOWS_PER_MIGRATION];
751 : : uint8_t left_qfs_len = 0;
752 : : uint8_t finished;
753 : :
754 [ # # ]: 0 : for (i = 0; i < port->emigration_targets_len; i++) {
755 : 0 : struct dsw_queue_flow *qf = &port->emigration_target_qfs[i];
756 : 0 : uint8_t queue_id = qf->queue_id;
757 : 0 : uint8_t queue_schedule_type =
758 : 0 : dsw->queues[queue_id].schedule_type;
759 : : uint16_t flow_hash = qf->flow_hash;
760 : :
761 [ # # ]: 0 : if (queue_schedule_type != schedule_type) {
762 : 0 : left_port_ids[left_qfs_len] =
763 : 0 : port->emigration_target_port_ids[i];
764 : 0 : left_qfs[left_qfs_len] = *qf;
765 : 0 : left_qfs_len++;
766 : 0 : continue;
767 : : }
768 : :
769 : : DSW_LOG_DP_PORT(DEBUG, port->id, "Migration completed for "
770 : : "queue_id %d flow_hash %d.\n", queue_id,
771 : : flow_hash);
772 : : }
773 : :
774 : 0 : finished = port->emigration_targets_len - left_qfs_len;
775 : :
776 [ # # ]: 0 : if (finished > 0)
777 : : dsw_port_emigration_stats(port, finished);
778 : :
779 [ # # ]: 0 : for (i = 0; i < left_qfs_len; i++) {
780 : 0 : port->emigration_target_port_ids[i] = left_port_ids[i];
781 : 0 : port->emigration_target_qfs[i] = left_qfs[i];
782 : : }
783 : 0 : port->emigration_targets_len = left_qfs_len;
784 : :
785 [ # # ]: 0 : if (port->emigration_targets_len == 0) {
786 : 0 : port->migration_state = DSW_MIGRATION_STATE_IDLE;
787 : 0 : port->seen_events_len = 0;
788 : : }
789 : 0 : }
790 : :
791 : : static void
792 : 0 : dsw_port_move_parallel_flows(struct dsw_evdev *dsw,
793 : : struct dsw_port *source_port)
794 : : {
795 : : uint8_t i;
796 : :
797 [ # # ]: 0 : for (i = 0; i < source_port->emigration_targets_len; i++) {
798 : : struct dsw_queue_flow *qf =
799 : 0 : &source_port->emigration_target_qfs[i];
800 : 0 : uint8_t queue_id = qf->queue_id;
801 : :
802 [ # # ]: 0 : if (dsw->queues[queue_id].schedule_type ==
803 : : RTE_SCHED_TYPE_PARALLEL) {
804 : 0 : uint8_t dest_port_id =
805 : : source_port->emigration_target_port_ids[i];
806 : 0 : uint16_t flow_hash = qf->flow_hash;
807 : :
808 : : /* Single byte-sized stores are always atomic. */
809 : 0 : dsw->queues[queue_id].flow_to_port_map[flow_hash] =
810 : : dest_port_id;
811 : : }
812 : : }
813 : :
814 : 0 : rte_smp_wmb();
815 : :
816 : 0 : dsw_port_end_emigration(dsw, source_port, RTE_SCHED_TYPE_PARALLEL);
817 : 0 : }
818 : :
819 : : static void
820 : 0 : dsw_port_consider_emigration(struct dsw_evdev *dsw,
821 : : struct dsw_port *source_port,
822 : : uint64_t now)
823 : 0 : {
824 : : bool any_port_below_limit;
825 : 0 : struct dsw_queue_flow *seen_events = source_port->seen_events;
826 : 0 : uint16_t seen_events_len = source_port->seen_events_len;
827 : : struct dsw_queue_flow_burst bursts[DSW_MAX_EVENTS_RECORDED];
828 : : uint16_t num_bursts;
829 : : int16_t source_port_load;
830 : 0 : int16_t port_loads[dsw->num_ports];
831 : :
832 [ # # ]: 0 : if (now < source_port->next_emigration)
833 : 0 : return;
834 : :
835 [ # # ]: 0 : if (dsw->num_ports == 1)
836 : : return;
837 : :
838 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "Considering emigration.\n");
839 : :
840 [ # # ]: 0 : if (seen_events_len < DSW_MAX_EVENTS_RECORDED) {
841 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "Not enough events "
842 : : "are recorded to allow for a migration.\n");
843 : : return;
844 : : }
845 : :
846 : : /* A flow migration cannot be initiated if there are paused
847 : : * events, since some/all of those events may be have been
848 : : * produced as a result of processing the flow(s) selected for
849 : : * migration. Moving such a flow would potentially introduced
850 : : * reordering, since processing the migrated flow on the
851 : : * receiving flow may commence before the to-be-enqueued-to
852 : :
853 : : * flows are unpaused, leading to paused events on the second
854 : : * port as well, destined for the same paused flow(s). When
855 : : * those flows are unpaused, the resulting events are
856 : : * delivered the owning port in an undefined order.
857 : : */
858 [ # # ]: 0 : if (source_port->paused_events_len > 0) {
859 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are "
860 : : "events in the paus buffer.\n");
861 : : return;
862 : : }
863 : :
864 : : /* Randomize interval to avoid having all threads considering
865 : : * emigration at the same in point in time, which might lead
866 : : * to all choosing the same target port.
867 : : */
868 : 0 : source_port->next_emigration = now +
869 : 0 : source_port->migration_interval / 2 +
870 : 0 : rte_rand() % source_port->migration_interval;
871 : :
872 [ # # ]: 0 : if (source_port->migration_state != DSW_MIGRATION_STATE_IDLE) {
873 : : DSW_LOG_DP_PORT(DEBUG, source_port->id,
874 : : "Emigration already in progress.\n");
875 : : return;
876 : : }
877 : :
878 : : /* For simplicity, avoid migration in the unlikely case there
879 : : * is still events to consume in the in_buffer (from the last
880 : : * emigration).
881 : : */
882 [ # # ]: 0 : if (source_port->in_buffer_len > 0) {
883 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "There are still "
884 : : "events in the input buffer.\n");
885 : : return;
886 : : }
887 : :
888 : 0 : source_port_load =
889 : 0 : __atomic_load_n(&source_port->load, __ATOMIC_RELAXED);
890 [ # # ]: 0 : if (source_port_load < DSW_MIN_SOURCE_LOAD_FOR_MIGRATION) {
891 : : DSW_LOG_DP_PORT(DEBUG, source_port->id,
892 : : "Load %d is below threshold level %d.\n",
893 : : DSW_LOAD_TO_PERCENT(source_port_load),
894 : : DSW_LOAD_TO_PERCENT(DSW_MIN_SOURCE_LOAD_FOR_MIGRATION));
895 : : return;
896 : : }
897 : :
898 : : /* Avoid starting any expensive operations (sorting etc), in
899 : : * case of a scenario with all ports above the load limit.
900 : : */
901 : : any_port_below_limit =
902 : 0 : dsw_retrieve_port_loads(dsw, port_loads,
903 : : DSW_MAX_TARGET_LOAD_FOR_MIGRATION);
904 [ # # ]: 0 : if (!any_port_below_limit) {
905 : : DSW_LOG_DP_PORT(DEBUG, source_port->id,
906 : : "Candidate target ports are all too highly "
907 : : "loaded.\n");
908 : : return;
909 : : }
910 : :
911 : 0 : num_bursts = dsw_sort_qfs_to_bursts(seen_events, seen_events_len,
912 : : bursts);
913 : :
914 : : /* For non-big-little systems, there's no point in moving the
915 : : * only (known) flow.
916 : : */
917 [ # # ]: 0 : if (num_bursts < 2) {
918 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "Only a single flow "
919 : : "queue_id %d flow_hash %d has been seen.\n",
920 : : bursts[0].queue_flow.queue_id,
921 : : bursts[0].queue_flow.flow_hash);
922 : : return;
923 : : }
924 : :
925 : 0 : dsw_select_emigration_targets(dsw, source_port, bursts, num_bursts,
926 : : port_loads);
927 : :
928 [ # # ]: 0 : if (source_port->emigration_targets_len == 0)
929 : : return;
930 : :
931 : 0 : source_port->migration_state = DSW_MIGRATION_STATE_PAUSING;
932 : 0 : source_port->emigration_start = rte_get_timer_cycles();
933 : :
934 : : /* No need to go through the whole pause procedure for
935 : : * parallel queues, since atomic/ordered semantics need not to
936 : : * be maintained.
937 : : */
938 : 0 : dsw_port_move_parallel_flows(dsw, source_port);
939 : :
940 : : /* All flows were on PARALLEL queues. */
941 [ # # ]: 0 : if (source_port->migration_state == DSW_MIGRATION_STATE_IDLE)
942 : : return;
943 : :
944 : : /* There might be 'loopback' events already scheduled in the
945 : : * output buffers.
946 : : */
947 : : dsw_port_flush_out_buffers(dsw, source_port);
948 : :
949 : 0 : dsw_port_add_paused_flows(source_port,
950 : 0 : source_port->emigration_target_qfs,
951 : 0 : source_port->emigration_targets_len);
952 : :
953 : 0 : dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_PAUS_REQ,
954 : : source_port->emigration_target_qfs,
955 : : source_port->emigration_targets_len);
956 : 0 : source_port->cfm_cnt = 0;
957 : : }
958 : :
959 : : static void
960 : : dsw_port_flush_no_longer_paused_events(struct dsw_evdev *dsw,
961 : : struct dsw_port *source_port);
962 : :
963 : : static void
964 : 0 : dsw_port_handle_unpause_flows(struct dsw_evdev *dsw, struct dsw_port *port,
965 : : uint8_t originating_port_id,
966 : : struct dsw_queue_flow *paused_qfs,
967 : : uint8_t qfs_len)
968 : : {
969 : : uint16_t i;
970 : 0 : struct dsw_ctl_msg cfm = {
971 : : .type = DSW_CTL_CFM,
972 : 0 : .originating_port_id = port->id
973 : : };
974 : :
975 : : dsw_port_remove_paused_flows(port, paused_qfs, qfs_len);
976 : :
977 : 0 : rte_smp_rmb();
978 : :
979 : 0 : dsw_port_ctl_enqueue(&dsw->ports[originating_port_id], &cfm);
980 : :
981 [ # # ]: 0 : for (i = 0; i < qfs_len; i++) {
982 : 0 : struct dsw_queue_flow *qf = &paused_qfs[i];
983 : :
984 [ # # # # ]: 0 : if (dsw_schedule(dsw, qf->queue_id, qf->flow_hash) == port->id)
985 : 0 : port->immigrations++;
986 : : }
987 : :
988 : 0 : dsw_port_flush_no_longer_paused_events(dsw, port);
989 : 0 : }
990 : :
991 : : static void
992 : : dsw_port_buffer_in_buffer(struct dsw_port *port,
993 : : const struct rte_event *event)
994 : :
995 : : {
996 : : RTE_ASSERT(port->in_buffer_start == 0);
997 : :
998 : 0 : port->in_buffer[port->in_buffer_len] = *event;
999 : 0 : port->in_buffer_len++;
1000 : 0 : }
1001 : :
1002 : : static void
1003 : 0 : dsw_port_forward_emigrated_event(struct dsw_evdev *dsw,
1004 : : struct dsw_port *source_port,
1005 : : struct rte_event *event)
1006 : : {
1007 : : uint16_t i;
1008 : :
1009 [ # # ]: 0 : for (i = 0; i < source_port->emigration_targets_len; i++) {
1010 : : struct dsw_queue_flow *qf =
1011 : 0 : &source_port->emigration_target_qfs[i];
1012 : 0 : uint8_t dest_port_id =
1013 : : source_port->emigration_target_port_ids[i];
1014 : 0 : struct dsw_port *dest_port = &dsw->ports[dest_port_id];
1015 : :
1016 [ # # ]: 0 : if (event->queue_id == qf->queue_id &&
1017 [ # # ]: 0 : dsw_flow_id_hash(event->flow_id) == qf->flow_hash) {
1018 : : /* No need to care about bursting forwarded
1019 : : * events (to the destination port's in_ring),
1020 : : * since migration doesn't happen very often,
1021 : : * and also the majority of the dequeued
1022 : : * events will likely *not* be forwarded.
1023 : : */
1024 [ # # # # : 0 : while (rte_event_ring_enqueue_burst(dest_port->in_ring,
# ]
1025 : : event, 1,
1026 [ # # ]: 0 : NULL) != 1)
1027 : : rte_pause();
1028 : : return;
1029 : : }
1030 : : }
1031 : :
1032 : : /* Event did not belong to the emigrated flows */
1033 : : dsw_port_buffer_in_buffer(source_port, event);
1034 : : }
1035 : :
1036 : : static void
1037 : : dsw_port_stash_migrating_event(struct dsw_port *port,
1038 : : const struct rte_event *event)
1039 : : {
1040 : 0 : port->emigrating_events[port->emigrating_events_len] = *event;
1041 : 0 : port->emigrating_events_len++;
1042 : 0 : }
1043 : :
1044 : : #define DRAIN_DEQUEUE_BURST_SIZE (32)
1045 : :
1046 : : static void
1047 : 0 : dsw_port_drain_in_ring(struct dsw_port *source_port)
1048 : : {
1049 : : uint16_t num_events;
1050 : : uint16_t dequeued;
1051 : :
1052 : : /* Control ring message should been seen before the ring count
1053 : : * is read on the port's in_ring.
1054 : : */
1055 : 0 : rte_smp_rmb();
1056 : :
1057 : 0 : num_events = rte_event_ring_count(source_port->in_ring);
1058 : :
1059 [ # # ]: 0 : for (dequeued = 0; dequeued < num_events; ) {
1060 : 0 : uint16_t burst_size = RTE_MIN(DRAIN_DEQUEUE_BURST_SIZE,
1061 : : num_events - dequeued);
1062 : 0 : struct rte_event events[burst_size];
1063 : : uint16_t len;
1064 : : uint16_t i;
1065 : :
1066 [ # # # # : 0 : len = rte_event_ring_dequeue_burst(source_port->in_ring,
# ]
1067 : : events, burst_size,
1068 : : NULL);
1069 : :
1070 [ # # ]: 0 : for (i = 0; i < len; i++) {
1071 : 0 : struct rte_event *event = &events[i];
1072 : : uint16_t flow_hash;
1073 : :
1074 : 0 : flow_hash = dsw_flow_id_hash(event->flow_id);
1075 : :
1076 [ # # ]: 0 : if (unlikely(dsw_port_is_flow_migrating(source_port,
1077 : : event->queue_id,
1078 : : flow_hash)))
1079 : : dsw_port_stash_migrating_event(source_port,
1080 : : event);
1081 : : else
1082 : : dsw_port_buffer_in_buffer(source_port, event);
1083 : : }
1084 : :
1085 : 0 : dequeued += len;
1086 : : }
1087 : 0 : }
1088 : :
1089 : : static void
1090 : : dsw_port_forward_emigrated_flows(struct dsw_evdev *dsw,
1091 : : struct dsw_port *source_port)
1092 : : {
1093 : : uint16_t i;
1094 : :
1095 [ # # ]: 0 : for (i = 0; i < source_port->emigrating_events_len; i++) {
1096 : 0 : struct rte_event *event = &source_port->emigrating_events[i];
1097 : :
1098 : 0 : dsw_port_forward_emigrated_event(dsw, source_port, event);
1099 : : }
1100 : 0 : source_port->emigrating_events_len = 0;
1101 : : }
1102 : :
1103 : : static void
1104 : 0 : dsw_port_move_emigrating_flows(struct dsw_evdev *dsw,
1105 : : struct dsw_port *source_port)
1106 : : {
1107 : : uint8_t i;
1108 : :
1109 : : dsw_port_flush_out_buffers(dsw, source_port);
1110 : :
1111 [ # # ]: 0 : for (i = 0; i < source_port->emigration_targets_len; i++) {
1112 : : struct dsw_queue_flow *qf =
1113 : 0 : &source_port->emigration_target_qfs[i];
1114 : 0 : uint8_t dest_port_id =
1115 : : source_port->emigration_target_port_ids[i];
1116 : :
1117 : 0 : dsw->queues[qf->queue_id].flow_to_port_map[qf->flow_hash] =
1118 : : dest_port_id;
1119 : : }
1120 : :
1121 : 0 : rte_smp_wmb();
1122 : :
1123 : 0 : dsw_port_drain_in_ring(source_port);
1124 : : dsw_port_forward_emigrated_flows(dsw, source_port);
1125 : :
1126 : : dsw_port_remove_paused_flows(source_port,
1127 : 0 : source_port->emigration_target_qfs,
1128 : 0 : source_port->emigration_targets_len);
1129 : :
1130 : 0 : dsw_port_flush_no_longer_paused_events(dsw, source_port);
1131 : :
1132 : : /* Flow table update and migration destination port's enqueues
1133 : : * must be seen before the control message.
1134 : : */
1135 : 0 : rte_smp_wmb();
1136 : :
1137 : 0 : dsw_port_ctl_broadcast(dsw, source_port, DSW_CTL_UNPAUS_REQ,
1138 : : source_port->emigration_target_qfs,
1139 : 0 : source_port->emigration_targets_len);
1140 : 0 : source_port->cfm_cnt = 0;
1141 : 0 : source_port->migration_state = DSW_MIGRATION_STATE_UNPAUSING;
1142 : 0 : }
1143 : :
1144 : : static void
1145 : 0 : dsw_port_handle_confirm(struct dsw_evdev *dsw, struct dsw_port *port)
1146 : : {
1147 : 0 : port->cfm_cnt++;
1148 : :
1149 [ # # ]: 0 : if (port->cfm_cnt == (dsw->num_ports-1)) {
1150 [ # # # ]: 0 : switch (port->migration_state) {
1151 : 0 : case DSW_MIGRATION_STATE_PAUSING:
1152 : 0 : dsw_port_move_emigrating_flows(dsw, port);
1153 : 0 : break;
1154 : 0 : case DSW_MIGRATION_STATE_UNPAUSING:
1155 : 0 : dsw_port_end_emigration(dsw, port,
1156 : : RTE_SCHED_TYPE_ATOMIC);
1157 : 0 : break;
1158 : : default:
1159 : : RTE_ASSERT(0);
1160 : : break;
1161 : : }
1162 : : }
1163 : 0 : }
1164 : :
1165 : : static void
1166 : 0 : dsw_port_ctl_process(struct dsw_evdev *dsw, struct dsw_port *port)
1167 : : {
1168 : : struct dsw_ctl_msg msg;
1169 : :
1170 [ # # ]: 0 : if (dsw_port_ctl_dequeue(port, &msg) == 0) {
1171 [ # # # # ]: 0 : switch (msg.type) {
1172 : 0 : case DSW_CTL_PAUS_REQ:
1173 : 0 : dsw_port_handle_pause_flows(dsw, port,
1174 : 0 : msg.originating_port_id,
1175 : 0 : msg.qfs, msg.qfs_len);
1176 : 0 : break;
1177 : 0 : case DSW_CTL_UNPAUS_REQ:
1178 : 0 : dsw_port_handle_unpause_flows(dsw, port,
1179 : 0 : msg.originating_port_id,
1180 : 0 : msg.qfs, msg.qfs_len);
1181 : 0 : break;
1182 : 0 : case DSW_CTL_CFM:
1183 : 0 : dsw_port_handle_confirm(dsw, port);
1184 : 0 : break;
1185 : : }
1186 : : }
1187 : 0 : }
1188 : :
1189 : : static void
1190 : : dsw_port_note_op(struct dsw_port *port, uint16_t num_events)
1191 : : {
1192 : 0 : port->ops_since_bg_task += (num_events+1);
1193 : 0 : }
1194 : :
1195 : : static void
1196 : 0 : dsw_port_bg_process(struct dsw_evdev *dsw, struct dsw_port *port)
1197 : : {
1198 : : /* For simplicity (in the migration logic), avoid all
1199 : : * background processing in case event processing is in
1200 : : * progress.
1201 : : */
1202 [ # # ]: 0 : if (port->pending_releases > 0)
1203 : : return;
1204 : :
1205 : : /* Polling the control ring is relatively inexpensive, and
1206 : : * polling it often helps bringing down migration latency, so
1207 : : * do this for every iteration.
1208 : : */
1209 : 0 : dsw_port_ctl_process(dsw, port);
1210 : :
1211 : : /* To avoid considering migration and flushing output buffers
1212 : : * on every dequeue/enqueue call, the scheduler only performs
1213 : : * such 'background' tasks every nth
1214 : : * (i.e. DSW_MAX_PORT_OPS_PER_BG_TASK) operation.
1215 : : */
1216 [ # # ]: 0 : if (unlikely(port->ops_since_bg_task >= DSW_MAX_PORT_OPS_PER_BG_TASK)) {
1217 : : uint64_t now;
1218 : :
1219 : : now = rte_get_timer_cycles();
1220 : :
1221 : 0 : port->last_bg = now;
1222 : :
1223 : : /* Logic to avoid having events linger in the output
1224 : : * buffer too long.
1225 : : */
1226 : : dsw_port_flush_out_buffers(dsw, port);
1227 : :
1228 : : dsw_port_consider_load_update(port, now);
1229 : :
1230 : 0 : dsw_port_consider_emigration(dsw, port, now);
1231 : :
1232 : 0 : port->ops_since_bg_task = 0;
1233 : : }
1234 : : }
1235 : :
1236 : : static void
1237 : : dsw_port_flush_out_buffers(struct dsw_evdev *dsw, struct dsw_port *source_port)
1238 : : {
1239 : : uint16_t dest_port_id;
1240 : :
1241 [ # # # # : 0 : for (dest_port_id = 0; dest_port_id < dsw->num_ports; dest_port_id++)
# # # # #
# # # # #
# # # # ]
1242 : 0 : dsw_port_transmit_buffered(dsw, source_port, dest_port_id);
1243 : : }
1244 : :
1245 : : uint16_t
1246 : 0 : dsw_event_enqueue(void *port, const struct rte_event *ev)
1247 : : {
1248 : 0 : return dsw_event_enqueue_burst(port, ev, unlikely(ev == NULL) ? 0 : 1);
1249 : : }
1250 : :
1251 : : static __rte_always_inline uint16_t
1252 : : dsw_event_enqueue_burst_generic(struct dsw_port *source_port,
1253 : : const struct rte_event events[],
1254 : : uint16_t events_len, bool op_types_known,
1255 : : uint16_t num_new, uint16_t num_release,
1256 : : uint16_t num_non_release)
1257 : : {
1258 : 0 : struct dsw_evdev *dsw = source_port->dsw;
1259 : : bool enough_credits;
1260 : : uint16_t i;
1261 : :
1262 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "Attempting to enqueue %d "
1263 : : "events.\n", events_len);
1264 : :
1265 : 0 : dsw_port_bg_process(dsw, source_port);
1266 : :
1267 : : /* XXX: For performance (=ring efficiency) reasons, the
1268 : : * scheduler relies on internal non-ring buffers instead of
1269 : : * immediately sending the event to the destination ring. For
1270 : : * a producer that doesn't intend to produce or consume any
1271 : : * more events, the scheduler provides a way to flush the
1272 : : * buffer, by means of doing an enqueue of zero events. In
1273 : : * addition, a port cannot be left "unattended" (e.g. unused)
1274 : : * for long periods of time, since that would stall
1275 : : * migration. Eventdev API extensions to provide a cleaner way
1276 : : * to archive both of these functions should be
1277 : : * considered.
1278 : : */
1279 [ # # # # : 0 : if (unlikely(events_len == 0)) {
# # ]
1280 : : dsw_port_note_op(source_port, DSW_MAX_PORT_OPS_PER_BG_TASK);
1281 : : dsw_port_flush_out_buffers(dsw, source_port);
1282 : : return 0;
1283 : : }
1284 : :
1285 : : dsw_port_note_op(source_port, events_len);
1286 : :
1287 : : if (!op_types_known)
1288 [ # # ]: 0 : for (i = 0; i < events_len; i++) {
1289 [ # # # ]: 0 : switch (events[i].op) {
1290 : 0 : case RTE_EVENT_OP_RELEASE:
1291 : 0 : num_release++;
1292 : 0 : break;
1293 : 0 : case RTE_EVENT_OP_NEW:
1294 : 0 : num_new++;
1295 : : /* Falls through. */
1296 : 0 : default:
1297 : 0 : num_non_release++;
1298 : 0 : break;
1299 : : }
1300 : : }
1301 : :
1302 : : /* Technically, we could allow the non-new events up to the
1303 : : * first new event in the array into the system, but for
1304 : : * simplicity reasons, we deny the whole burst if the port is
1305 : : * above the water mark.
1306 : : */
1307 [ # # # # : 0 : if (unlikely(num_new > 0 &&
# # # # ]
1308 : : __atomic_load_n(&dsw->credits_on_loan, __ATOMIC_RELAXED) >
1309 : : source_port->new_event_threshold))
1310 : : return 0;
1311 : :
1312 : 0 : enough_credits = dsw_port_acquire_credits(dsw, source_port,
1313 : : num_non_release);
1314 [ # # # # : 0 : if (unlikely(!enough_credits))
# # ]
1315 : : return 0;
1316 : :
1317 : 0 : source_port->pending_releases -= num_release;
1318 : :
1319 : : dsw_port_enqueue_stats(source_port, num_new,
1320 : 0 : num_non_release-num_new, num_release);
1321 : :
1322 [ # # # # : 0 : for (i = 0; i < events_len; i++) {
# # ]
1323 : 0 : const struct rte_event *event = &events[i];
1324 : :
1325 [ # # # # ]: 0 : if (likely(num_release == 0 ||
1326 : : event->op != RTE_EVENT_OP_RELEASE))
1327 : 0 : dsw_port_buffer_event(dsw, source_port, event);
1328 : 0 : dsw_port_queue_enqueue_stats(source_port, event->queue_id);
1329 : : }
1330 : :
1331 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "%d non-release events "
1332 : : "accepted.\n", num_non_release);
1333 : :
1334 : 0 : return (num_non_release + num_release);
1335 : : }
1336 : :
1337 : : uint16_t
1338 : 0 : dsw_event_enqueue_burst(void *port, const struct rte_event events[],
1339 : : uint16_t events_len)
1340 : : {
1341 : : struct dsw_port *source_port = port;
1342 : :
1343 [ # # ]: 0 : if (unlikely(events_len > source_port->enqueue_depth))
1344 : : events_len = source_port->enqueue_depth;
1345 : :
1346 : 0 : return dsw_event_enqueue_burst_generic(source_port, events,
1347 : : events_len, false, 0, 0, 0);
1348 : : }
1349 : :
1350 : : uint16_t
1351 : 0 : dsw_event_enqueue_new_burst(void *port, const struct rte_event events[],
1352 : : uint16_t events_len)
1353 : : {
1354 : : struct dsw_port *source_port = port;
1355 : :
1356 [ # # ]: 0 : if (unlikely(events_len > source_port->enqueue_depth))
1357 : : events_len = source_port->enqueue_depth;
1358 : :
1359 : 0 : return dsw_event_enqueue_burst_generic(source_port, events,
1360 : : events_len, true, events_len,
1361 : : 0, events_len);
1362 : : }
1363 : :
1364 : : uint16_t
1365 : 0 : dsw_event_enqueue_forward_burst(void *port, const struct rte_event events[],
1366 : : uint16_t events_len)
1367 : : {
1368 : : struct dsw_port *source_port = port;
1369 : :
1370 [ # # ]: 0 : if (unlikely(events_len > source_port->enqueue_depth))
1371 : : events_len = source_port->enqueue_depth;
1372 : :
1373 : 0 : return dsw_event_enqueue_burst_generic(source_port, events,
1374 : : events_len, true, 0, 0,
1375 : : events_len);
1376 : : }
1377 : :
1378 : : uint16_t
1379 : 0 : dsw_event_dequeue(void *port, struct rte_event *events, uint64_t wait)
1380 : : {
1381 : 0 : return dsw_event_dequeue_burst(port, events, 1, wait);
1382 : : }
1383 : :
1384 : : static void
1385 : 0 : dsw_port_record_seen_events(struct dsw_port *port, struct rte_event *events,
1386 : : uint16_t num)
1387 : : {
1388 : : uint16_t i;
1389 : :
1390 : 0 : dsw_port_dequeue_stats(port, num);
1391 : :
1392 [ # # ]: 0 : for (i = 0; i < num; i++) {
1393 : 0 : uint16_t l_idx = port->seen_events_idx;
1394 : 0 : struct dsw_queue_flow *qf = &port->seen_events[l_idx];
1395 : 0 : struct rte_event *event = &events[i];
1396 : 0 : qf->queue_id = event->queue_id;
1397 : 0 : qf->flow_hash = dsw_flow_id_hash(event->flow_id);
1398 : :
1399 : 0 : port->seen_events_idx = (l_idx+1) % DSW_MAX_EVENTS_RECORDED;
1400 : :
1401 : 0 : dsw_port_queue_dequeued_stats(port, event->queue_id);
1402 : : }
1403 : :
1404 [ # # ]: 0 : if (unlikely(port->seen_events_len != DSW_MAX_EVENTS_RECORDED))
1405 : 0 : port->seen_events_len =
1406 : 0 : RTE_MIN(port->seen_events_len + num,
1407 : : DSW_MAX_EVENTS_RECORDED);
1408 : 0 : }
1409 : :
1410 : : #ifdef DSW_SORT_DEQUEUED
1411 : :
1412 : : #define DSW_EVENT_TO_INT(_event) \
1413 : : ((int)((((_event)->queue_id)<<16)|((_event)->flow_id)))
1414 : :
1415 : : static inline int
1416 : : dsw_cmp_event(const void *v_event_a, const void *v_event_b)
1417 : : {
1418 : : const struct rte_event *event_a = v_event_a;
1419 : : const struct rte_event *event_b = v_event_b;
1420 : :
1421 : : return DSW_EVENT_TO_INT(event_a) - DSW_EVENT_TO_INT(event_b);
1422 : : }
1423 : : #endif
1424 : :
1425 : : static uint16_t
1426 : 0 : dsw_port_dequeue_burst(struct dsw_port *port, struct rte_event *events,
1427 : : uint16_t num)
1428 : : {
1429 [ # # ]: 0 : if (unlikely(port->in_buffer_len > 0)) {
1430 : 0 : uint16_t dequeued = RTE_MIN(num, port->in_buffer_len);
1431 : :
1432 [ # # ]: 0 : rte_memcpy(events, &port->in_buffer[port->in_buffer_start],
1433 : : dequeued * sizeof(struct rte_event));
1434 : :
1435 : 0 : port->in_buffer_start += dequeued;
1436 : 0 : port->in_buffer_len -= dequeued;
1437 : :
1438 [ # # ]: 0 : if (port->in_buffer_len == 0)
1439 : 0 : port->in_buffer_start = 0;
1440 : :
1441 : 0 : return dequeued;
1442 : : }
1443 : :
1444 [ # # # # : 0 : return rte_event_ring_dequeue_burst(port->in_ring, events, num, NULL);
# ]
1445 : : }
1446 : :
1447 : : static void
1448 : 0 : dsw_port_stash_migrating_events(struct dsw_port *port,
1449 : : struct rte_event *events, uint16_t *num)
1450 : : {
1451 : : uint16_t i;
1452 : :
1453 : : /* The assumption here - performance-wise - is that events
1454 : : * belonging to migrating flows are relatively rare.
1455 : : */
1456 [ # # ]: 0 : for (i = 0; i < (*num); ) {
1457 : 0 : struct rte_event *event = &events[i];
1458 : : uint16_t flow_hash;
1459 : :
1460 : 0 : flow_hash = dsw_flow_id_hash(event->flow_id);
1461 : :
1462 [ # # ]: 0 : if (unlikely(dsw_port_is_flow_migrating(port, event->queue_id,
1463 : : flow_hash))) {
1464 : : uint16_t left;
1465 : :
1466 : : dsw_port_stash_migrating_event(port, event);
1467 : :
1468 : 0 : (*num)--;
1469 : 0 : left = *num - i;
1470 : :
1471 [ # # ]: 0 : if (left > 0)
1472 : 0 : memmove(event, event + 1,
1473 : : left * sizeof(struct rte_event));
1474 : : } else
1475 : 0 : i++;
1476 : : }
1477 : 0 : }
1478 : :
1479 : : uint16_t
1480 : 0 : dsw_event_dequeue_burst(void *port, struct rte_event *events, uint16_t num,
1481 : : uint64_t wait __rte_unused)
1482 : : {
1483 : : struct dsw_port *source_port = port;
1484 : 0 : struct dsw_evdev *dsw = source_port->dsw;
1485 : : uint16_t dequeued;
1486 : :
1487 : 0 : source_port->pending_releases = 0;
1488 : :
1489 : 0 : dsw_port_bg_process(dsw, source_port);
1490 : :
1491 [ # # ]: 0 : if (unlikely(num > source_port->dequeue_depth))
1492 : : num = source_port->dequeue_depth;
1493 : :
1494 : 0 : dequeued = dsw_port_dequeue_burst(source_port, events, num);
1495 : :
1496 [ # # ]: 0 : if (unlikely(source_port->migration_state ==
1497 : : DSW_MIGRATION_STATE_PAUSING))
1498 : 0 : dsw_port_stash_migrating_events(source_port, events,
1499 : : &dequeued);
1500 : :
1501 : 0 : source_port->pending_releases = dequeued;
1502 : :
1503 [ # # ]: 0 : dsw_port_load_record(source_port, dequeued);
1504 : :
1505 : 0 : dsw_port_note_op(source_port, dequeued);
1506 : :
1507 [ # # ]: 0 : if (dequeued > 0) {
1508 : : DSW_LOG_DP_PORT(DEBUG, source_port->id, "Dequeued %d events.\n",
1509 : : dequeued);
1510 : :
1511 : : dsw_port_return_credits(dsw, source_port, dequeued);
1512 : :
1513 : : /* One potential optimization one might think of is to
1514 : : * add a migration state (prior to 'pausing'), and
1515 : : * only record seen events when the port is in this
1516 : : * state (and transit to 'pausing' when enough events
1517 : : * have been gathered). However, that schema doesn't
1518 : : * seem to improve performance.
1519 : : */
1520 : 0 : dsw_port_record_seen_events(port, events, dequeued);
1521 : : } else /* Zero-size dequeue means a likely idle port, and thus
1522 : : * we can afford trading some efficiency for a slightly
1523 : : * reduced event wall-time latency.
1524 : : */
1525 : : dsw_port_flush_out_buffers(dsw, port);
1526 : :
1527 : : #ifdef DSW_SORT_DEQUEUED
1528 : : dsw_stable_sort(events, dequeued, sizeof(events[0]), dsw_cmp_event);
1529 : : #endif
1530 : :
1531 : 0 : return dequeued;
1532 : : }
1533 : :
1534 : 0 : void dsw_event_maintain(void *port, int op)
1535 : : {
1536 : : struct dsw_port *source_port = port;
1537 : 0 : struct dsw_evdev *dsw = source_port->dsw;
1538 : :
1539 : : dsw_port_note_op(source_port, 0);
1540 : 0 : dsw_port_bg_process(dsw, source_port);
1541 : :
1542 [ # # ]: 0 : if (op & RTE_EVENT_DEV_MAINT_OP_FLUSH)
1543 : : dsw_port_flush_out_buffers(dsw, source_port);
1544 : 0 : }
|