Branch data Line data Source code
1 : : /* SPDX-License-Identifier: BSD-3-Clause
2 : : * Copyright(c) 2017 Intel Corporation
3 : : */
4 : :
5 : : #include <stdio.h>
6 : : #include <stdlib.h>
7 : : #include <sys/queue.h>
8 : : #include <string.h>
9 : : #include <rte_mbuf.h>
10 : : #include <rte_cycles.h>
11 : : #include <rte_memzone.h>
12 : : #include <rte_errno.h>
13 : : #include <rte_string_fns.h>
14 : : #include <rte_eal_memconfig.h>
15 : : #include <rte_pause.h>
16 : : #include <rte_tailq.h>
17 : :
18 : : #include "rte_distributor.h"
19 : : #include "rte_distributor_single.h"
20 : : #include "distributor_private.h"
21 : :
22 : : TAILQ_HEAD(rte_dist_burst_list, rte_distributor);
23 : :
24 : : static struct rte_tailq_elem rte_dist_burst_tailq = {
25 : : .name = "RTE_DIST_BURST",
26 : : };
27 [ - + ]: 235 : EAL_REGISTER_TAILQ(rte_dist_burst_tailq)
28 : :
29 : : /**** APIs called by workers ****/
30 : :
31 : : /**** Burst Packet APIs called by workers ****/
32 : :
33 : : void
34 : 131283 : rte_distributor_request_pkt(struct rte_distributor *d,
35 : : unsigned int worker_id, struct rte_mbuf **oldpkt,
36 : : unsigned int count)
37 : : {
38 : : struct rte_distributor_buffer *buf = &(d->bufs[worker_id]);
39 : : unsigned int i;
40 : :
41 : : volatile RTE_ATOMIC(int64_t) *retptr64;
42 : :
43 [ - + ]: 131283 : if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
44 [ # # ]: 0 : rte_distributor_request_pkt_single(d->d_single,
45 : : worker_id, count ? oldpkt[0] : NULL);
46 : 0 : return;
47 : : }
48 : :
49 : 131283 : retptr64 = &(buf->retptr64[0]);
50 : : /* Spin while handshake bits are set (scheduler clears it).
51 : : * Sync with worker on GET_BUF flag.
52 : : */
53 [ + + ]: 8521577 : while (unlikely(rte_atomic_load_explicit(retptr64, rte_memory_order_acquire)
54 : : & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
55 : : rte_pause();
56 : 8390294 : uint64_t t = rte_rdtsc()+100;
57 : :
58 [ + + ]: 33560699 : while (rte_rdtsc() < t)
59 : : rte_pause();
60 : : }
61 : :
62 : : /*
63 : : * OK, if we've got here, then the scheduler has just cleared the
64 : : * handshake bits. Populate the retptrs with returning packets.
65 : : */
66 : :
67 [ + + ]: 1180459 : for (i = count; i < RTE_DIST_BURST_SIZE; i++)
68 : 1049176 : buf->retptr64[i] = 0;
69 : :
70 : : /* Set VALID_BUF bit for each packet returned */
71 [ + + ]: 132371 : for (i = count; i-- > 0; )
72 : 1088 : buf->retptr64[i] =
73 : 1088 : (((int64_t)(uintptr_t)(oldpkt[i])) <<
74 : 1088 : RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
75 : :
76 : : /*
77 : : * Finally, set the GET_BUF to signal to distributor that cache
78 : : * line is ready for processing
79 : : * Sync with distributor to release retptrs
80 : : */
81 : 131283 : rte_atomic_store_explicit(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF,
82 : : rte_memory_order_release);
83 : : }
84 : :
85 : : int
86 : 134622 : rte_distributor_poll_pkt(struct rte_distributor *d,
87 : : unsigned int worker_id, struct rte_mbuf **pkts)
88 : : {
89 : : struct rte_distributor_buffer *buf = &d->bufs[worker_id];
90 : : uint64_t ret;
91 : : int count = 0;
92 : : unsigned int i;
93 : :
94 [ - + ]: 134622 : if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
95 : 0 : pkts[0] = rte_distributor_poll_pkt_single(d->d_single,
96 : : worker_id);
97 : 0 : return (pkts[0]) ? 1 : 0;
98 : : }
99 : :
100 : : /* If any of below bits is set, return.
101 : : * GET_BUF is set when distributor hasn't sent any packets yet
102 : : * RETURN_BUF is set when distributor must retrieve in-flight packets
103 : : * Sync with distributor to acquire bufptrs
104 : : */
105 : 134622 : if (rte_atomic_load_explicit(&(buf->bufptr64[0]), rte_memory_order_acquire)
106 [ + + ]: 134622 : & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))
107 : : return -1;
108 : :
109 : : /* since bufptr64 is signed, this should be an arithmetic shift */
110 [ + + ]: 1181547 : for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
111 [ + + ]: 1050264 : if (likely(buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)) {
112 : 1049666 : ret = buf->bufptr64[i] >> RTE_DISTRIB_FLAG_BITS;
113 : 1049666 : pkts[count++] = (struct rte_mbuf *)((uintptr_t)(ret));
114 : : }
115 : : }
116 : :
117 : : /*
118 : : * so now we've got the contents of the cacheline into an array of
119 : : * mbuf pointers, so toggle the bit so scheduler can start working
120 : : * on the next cacheline while we're working.
121 : : * Sync with distributor on GET_BUF flag. Release bufptrs.
122 : : */
123 : 131283 : rte_atomic_store_explicit(&(buf->bufptr64[0]),
124 : : buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, rte_memory_order_release);
125 : :
126 : 131283 : return count;
127 : : }
128 : :
129 : : int
130 : 1180949 : rte_distributor_get_pkt(struct rte_distributor *d,
131 : : unsigned int worker_id, struct rte_mbuf **pkts,
132 : : struct rte_mbuf **oldpkt, unsigned int return_count)
133 : : {
134 : : int count;
135 : :
136 [ + + ]: 1180949 : if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
137 [ + - ]: 1049666 : if (return_count <= 1) {
138 [ + + ]: 1049666 : pkts[0] = rte_distributor_get_pkt_single(d->d_single,
139 : : worker_id, return_count ? oldpkt[0] : NULL);
140 : 1049666 : return (pkts[0]) ? 1 : 0;
141 : : } else
142 : : return -EINVAL;
143 : : }
144 : :
145 : 131283 : rte_distributor_request_pkt(d, worker_id, oldpkt, return_count);
146 : :
147 : 131283 : count = rte_distributor_poll_pkt(d, worker_id, pkts);
148 [ + + ]: 134622 : while (count == -1) {
149 : 3339 : uint64_t t = rte_rdtsc() + 100;
150 : :
151 [ + + ]: 12997 : while (rte_rdtsc() < t)
152 : : rte_pause();
153 : :
154 : 3339 : count = rte_distributor_poll_pkt(d, worker_id, pkts);
155 : : }
156 : : return count;
157 : : }
158 : :
159 : : int
160 : 4 : rte_distributor_return_pkt(struct rte_distributor *d,
161 : : unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
162 : : {
163 : : struct rte_distributor_buffer *buf = &d->bufs[worker_id];
164 : : unsigned int i;
165 : :
166 [ + + ]: 4 : if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
167 [ + - ]: 2 : if (num == 1)
168 : 2 : return rte_distributor_return_pkt_single(d->d_single,
169 : : worker_id, oldpkt[0]);
170 [ # # ]: 0 : else if (num == 0)
171 : 0 : return rte_distributor_return_pkt_single(d->d_single,
172 : : worker_id, NULL);
173 : : else
174 : : return -EINVAL;
175 : : }
176 : :
177 : : /* Spin while handshake bits are set (scheduler clears it).
178 : : * Sync with worker on GET_BUF flag.
179 : : */
180 [ + + ]: 3 : while (unlikely(rte_atomic_load_explicit(&(buf->retptr64[0]), rte_memory_order_relaxed)
181 : : & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
182 : : rte_pause();
183 : 1 : uint64_t t = rte_rdtsc()+100;
184 : :
185 [ - + ]: 2 : while (rte_rdtsc() < t)
186 : : rte_pause();
187 : : }
188 : :
189 : : /* Sync with distributor to acquire retptrs */
190 : 2 : __atomic_thread_fence(rte_memory_order_acquire);
191 [ + + ]: 18 : for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
192 : : /* Switch off the return bit first */
193 : 16 : buf->retptr64[i] = 0;
194 : :
195 [ + + ]: 4 : for (i = num; i-- > 0; )
196 : 2 : buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
197 : 2 : RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
198 : :
199 : : /* Use RETURN_BUF on bufptr64 to notify distributor that
200 : : * we won't read any mbufs from there even if GET_BUF is set.
201 : : * This allows distributor to retrieve in-flight already sent packets.
202 : : */
203 : 2 : rte_atomic_fetch_or_explicit(&(buf->bufptr64[0]), RTE_DISTRIB_RETURN_BUF,
204 : : rte_memory_order_acq_rel);
205 : :
206 : : /* set the RETURN_BUF on retptr64 even if we got no returns.
207 : : * Sync with distributor on RETURN_BUF flag. Release retptrs.
208 : : * Notify distributor that we don't request more packets any more.
209 : : */
210 : 2 : rte_atomic_store_explicit(&(buf->retptr64[0]),
211 : : buf->retptr64[0] | RTE_DISTRIB_RETURN_BUF, rte_memory_order_release);
212 : :
213 : 2 : return 0;
214 : : }
215 : :
216 : : /**** APIs called on distributor core ***/
217 : :
218 : : /* stores a packet returned from a worker inside the returns array */
219 : : static inline void
220 : : store_return(uintptr_t oldbuf, struct rte_distributor *d,
221 : : unsigned int *ret_start, unsigned int *ret_count)
222 : : {
223 : 1090 : if (!oldbuf)
224 : : return;
225 : : /* store returns in a circular buffer */
226 : 1090 : d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
227 : 1090 : = (void *)oldbuf;
228 : 1090 : *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK);
229 : 1090 : *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK);
230 : : }
231 : :
232 : : /*
233 : : * Match then flow_ids (tags) of the incoming packets to the flow_ids
234 : : * of the inflight packets (both inflight on the workers and in each worker
235 : : * backlog). This will then allow us to pin those packets to the relevant
236 : : * workers to give us our atomic flow pinning.
237 : : */
238 : : void
239 : 0 : find_match_scalar(struct rte_distributor *d,
240 : : uint16_t *data_ptr,
241 : : uint16_t *output_ptr)
242 : : {
243 : : struct rte_distributor_backlog *bl;
244 : : uint16_t i, j, w;
245 : :
246 : : /*
247 : : * Function overview:
248 : : * 1. Loop through all worker ID's
249 : : * 2. Compare the current inflights to the incoming tags
250 : : * 3. Compare the current backlog to the incoming tags
251 : : * 4. Add any matches to the output
252 : : */
253 : :
254 [ # # ]: 0 : for (j = 0 ; j < RTE_DIST_BURST_SIZE; j++)
255 : 0 : output_ptr[j] = 0;
256 : :
257 [ # # ]: 0 : for (i = 0; i < d->num_workers; i++) {
258 : 0 : bl = &d->backlog[i];
259 : :
260 [ # # ]: 0 : for (j = 0; j < RTE_DIST_BURST_SIZE ; j++)
261 [ # # ]: 0 : for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
262 [ # # ]: 0 : if (d->in_flight_tags[i][w] == data_ptr[j]) {
263 : 0 : output_ptr[j] = i+1;
264 : 0 : break;
265 : : }
266 [ # # ]: 0 : for (j = 0; j < RTE_DIST_BURST_SIZE; j++)
267 [ # # ]: 0 : for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
268 [ # # ]: 0 : if (bl->tags[w] == data_ptr[j]) {
269 : 0 : output_ptr[j] = i+1;
270 : 0 : break;
271 : : }
272 : : }
273 : :
274 : : /*
275 : : * At this stage, the output contains 8 16-bit values, with
276 : : * each non-zero value containing the worker ID on which the
277 : : * corresponding flow is pinned to.
278 : : */
279 : 0 : }
280 : :
281 : : /*
282 : : * When worker called rte_distributor_return_pkt()
283 : : * and passed RTE_DISTRIB_RETURN_BUF handshake through retptr64,
284 : : * distributor must retrieve both inflight and backlog packets assigned
285 : : * to the worker and reprocess them to another worker.
286 : : */
287 : : static void
288 : 2 : handle_worker_shutdown(struct rte_distributor *d, unsigned int wkr)
289 : : {
290 : : struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
291 : : /* double BURST size for storing both inflights and backlog */
292 : : struct rte_mbuf *pkts[RTE_DIST_BURST_SIZE * 2];
293 : : unsigned int pkts_count = 0;
294 : : unsigned int i;
295 : :
296 : : /* If GET_BUF is cleared there are in-flight packets sent
297 : : * to worker which does not require new packets.
298 : : * They must be retrieved and assigned to another worker.
299 : : */
300 : 2 : if (!(rte_atomic_load_explicit(&(buf->bufptr64[0]), rte_memory_order_acquire)
301 [ + - ]: 2 : & RTE_DISTRIB_GET_BUF))
302 [ + + ]: 18 : for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
303 [ - + ]: 16 : if (buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)
304 : 0 : pkts[pkts_count++] = (void *)((uintptr_t)
305 : 0 : (buf->bufptr64[i]
306 : 0 : >> RTE_DISTRIB_FLAG_BITS));
307 : :
308 : : /* Make following operations on handshake flags on bufptr64:
309 : : * - set GET_BUF to indicate that distributor can overwrite buffer
310 : : * with new packets if worker will make a new request.
311 : : * - clear RETURN_BUF to unlock reads on worker side.
312 : : */
313 : 2 : rte_atomic_store_explicit(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF,
314 : : rte_memory_order_release);
315 : :
316 : : /* Collect backlog packets from worker */
317 [ - + ]: 2 : for (i = 0; i < d->backlog[wkr].count; i++)
318 : 0 : pkts[pkts_count++] = (void *)((uintptr_t)
319 : 0 : (d->backlog[wkr].pkts[i] >> RTE_DISTRIB_FLAG_BITS));
320 : :
321 : 2 : d->backlog[wkr].count = 0;
322 : :
323 : : /* Clear both inflight and backlog tags */
324 [ + + ]: 18 : for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
325 : 16 : d->in_flight_tags[wkr][i] = 0;
326 : 16 : d->backlog[wkr].tags[i] = 0;
327 : : }
328 : :
329 : : /* Recursive call */
330 [ - + ]: 2 : if (pkts_count > 0)
331 : 0 : rte_distributor_process(d, pkts, pkts_count);
332 : 2 : }
333 : :
334 : :
335 : : /*
336 : : * When the handshake bits indicate that there are packets coming
337 : : * back from the worker, this function is called to copy and store
338 : : * the valid returned pointers (store_return).
339 : : */
340 : : static unsigned int
341 : 765498 : handle_returns(struct rte_distributor *d, unsigned int wkr)
342 : : {
343 : : struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
344 : : uintptr_t oldbuf;
345 : 765498 : unsigned int ret_start = d->returns.start,
346 : 765498 : ret_count = d->returns.count;
347 : : unsigned int count = 0;
348 : : unsigned int i;
349 : :
350 : : /* Sync on GET_BUF flag. Acquire retptrs. */
351 : 765498 : if (rte_atomic_load_explicit(&(buf->retptr64[0]), rte_memory_order_acquire)
352 [ + + ]: 765498 : & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) {
353 [ + + ]: 1181565 : for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
354 [ + + ]: 1050280 : if (buf->retptr64[i] & RTE_DISTRIB_VALID_BUF) {
355 [ + - ]: 1090 : oldbuf = ((uintptr_t)(buf->retptr64[i] >>
356 : : RTE_DISTRIB_FLAG_BITS));
357 : : /* store returns in a circular buffer */
358 : : store_return(oldbuf, d, &ret_start, &ret_count);
359 : 1090 : count++;
360 : 1090 : buf->retptr64[i] &= ~RTE_DISTRIB_VALID_BUF;
361 : : }
362 : : }
363 : 131285 : d->returns.start = ret_start;
364 : 131285 : d->returns.count = ret_count;
365 : :
366 : : /* If worker requested packets with GET_BUF, set it to active
367 : : * otherwise (RETURN_BUF), set it to not active.
368 : : */
369 : 131285 : d->activesum -= d->active[wkr];
370 : 131285 : d->active[wkr] = !!(buf->retptr64[0] & RTE_DISTRIB_GET_BUF);
371 : 131285 : d->activesum += d->active[wkr];
372 : :
373 : : /* If worker returned packets without requesting new ones,
374 : : * handle all in-flights and backlog packets assigned to it.
375 : : */
376 [ + + ]: 131285 : if (unlikely(buf->retptr64[0] & RTE_DISTRIB_RETURN_BUF))
377 : 2 : handle_worker_shutdown(d, wkr);
378 : :
379 : : /* Clear for the worker to populate with more returns.
380 : : * Sync with distributor on GET_BUF flag. Release retptrs.
381 : : */
382 : 131285 : rte_atomic_store_explicit(&(buf->retptr64[0]), 0, rte_memory_order_release);
383 : : }
384 : 765498 : return count;
385 : : }
386 : :
387 : : /*
388 : : * This function releases a burst (cache line) to a worker.
389 : : * It is called from the process function when a cacheline is
390 : : * full to make room for more packets for that worker, or when
391 : : * all packets have been assigned to bursts and need to be flushed
392 : : * to the workers.
393 : : * It also needs to wait for any outstanding packets from the worker
394 : : * before sending out new packets.
395 : : */
396 : : static unsigned int
397 : 131286 : release(struct rte_distributor *d, unsigned int wkr)
398 : : {
399 : : struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
400 : : unsigned int i;
401 : :
402 : 131286 : handle_returns(d, wkr);
403 [ + + ]: 131286 : if (unlikely(!d->active[wkr]))
404 : : return 0;
405 : :
406 : : /* Sync with worker on GET_BUF flag */
407 : 732344 : while (!(rte_atomic_load_explicit(&(d->bufs[wkr].bufptr64[0]), rte_memory_order_acquire)
408 [ + + ]: 732344 : & RTE_DISTRIB_GET_BUF)) {
409 : 601060 : handle_returns(d, wkr);
410 [ + - ]: 601060 : if (unlikely(!d->active[wkr]))
411 : : return 0;
412 : : rte_pause();
413 : : }
414 : :
415 : : buf->count = 0;
416 : :
417 [ + + ]: 1180950 : for (i = 0; i < d->backlog[wkr].count; i++) {
418 : 1049666 : d->bufs[wkr].bufptr64[i] = d->backlog[wkr].pkts[i] |
419 : 1049666 : RTE_DISTRIB_GET_BUF | RTE_DISTRIB_VALID_BUF;
420 : 1049666 : d->in_flight_tags[wkr][i] = d->backlog[wkr].tags[i];
421 : : }
422 : 131284 : buf->count = i;
423 [ + + ]: 131890 : for ( ; i < RTE_DIST_BURST_SIZE ; i++) {
424 : 606 : buf->bufptr64[i] = RTE_DISTRIB_GET_BUF;
425 : 606 : d->in_flight_tags[wkr][i] = 0;
426 : : }
427 : :
428 : 131284 : d->backlog[wkr].count = 0;
429 : :
430 : : /* Clear the GET bit.
431 : : * Sync with worker on GET_BUF flag. Release bufptrs.
432 : : */
433 : 131284 : rte_atomic_store_explicit(&(buf->bufptr64[0]),
434 : : buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, rte_memory_order_release);
435 : 131284 : return buf->count;
436 : :
437 : : }
438 : :
439 : :
440 : : /* process a set of packets to distribute them to workers */
441 : : int
442 : 65808 : rte_distributor_process(struct rte_distributor *d,
443 : : struct rte_mbuf **mbufs, unsigned int num_mbufs)
444 : : {
445 : : unsigned int next_idx = 0;
446 : : static unsigned int wkr;
447 : : struct rte_mbuf *next_mb = NULL;
448 : : int64_t next_value = 0;
449 : : uint16_t new_tag = 0;
450 : : uint16_t flows[RTE_DIST_BURST_SIZE] __rte_cache_aligned;
451 : : unsigned int i, j, w, wid, matching_required;
452 : :
453 [ + + ]: 65808 : if (d->alg_type == RTE_DIST_ALG_SINGLE) {
454 : : /* Call the old API */
455 : 32806 : return rte_distributor_process_single(d->d_single,
456 : : mbufs, num_mbufs);
457 : : }
458 : :
459 [ + + ]: 66004 : for (wid = 0 ; wid < d->num_workers; wid++)
460 : 33002 : handle_returns(d, wid);
461 : :
462 [ + + ]: 33002 : if (unlikely(num_mbufs == 0)) {
463 : : /* Flush out all non-full cache-lines to workers. */
464 [ + + ]: 396 : for (wid = 0 ; wid < d->num_workers; wid++) {
465 : : /* Sync with worker on GET_BUF flag. */
466 : 198 : if (rte_atomic_load_explicit(&(d->bufs[wid].bufptr64[0]),
467 [ + + ]: 198 : rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF) {
468 : 111 : d->bufs[wid].count = 0;
469 : 111 : release(d, wid);
470 : 111 : handle_returns(d, wid);
471 : : }
472 : : }
473 : : return 0;
474 : : }
475 : :
476 [ + - ]: 32804 : if (unlikely(!d->activesum))
477 : : return 0;
478 : :
479 [ + + ]: 164014 : while (next_idx < num_mbufs) {
480 : : uint16_t matches[RTE_DIST_BURST_SIZE] __rte_aligned(128);
481 : : unsigned int pkts;
482 : :
483 [ + + ]: 131210 : if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
484 : : pkts = num_mbufs - next_idx;
485 : : else
486 : : pkts = RTE_DIST_BURST_SIZE;
487 : :
488 [ + + ]: 1180876 : for (i = 0; i < pkts; i++) {
489 [ + - ]: 1049666 : if (mbufs[next_idx + i]) {
490 : : /* flows have to be non-zero */
491 : 1049666 : flows[i] = mbufs[next_idx + i]->hash.usr | 1;
492 : : } else
493 : 0 : flows[i] = 0;
494 : : }
495 [ + + ]: 131224 : for (; i < RTE_DIST_BURST_SIZE; i++)
496 : 14 : flows[i] = 0;
497 : :
498 : : matching_required = 1;
499 : :
500 [ + + ]: 1180876 : for (j = 0; j < pkts; j++) {
501 [ - + ]: 1049666 : if (unlikely(!d->activesum))
502 : 0 : return next_idx;
503 : :
504 [ + + ]: 1049666 : if (unlikely(matching_required)) {
505 [ + - ]: 131210 : switch (d->dist_match_fn) {
506 : 131210 : case RTE_DIST_MATCH_VECTOR:
507 : 131210 : find_match_vec(d, &flows[0],
508 : : &matches[0]);
509 : 131210 : break;
510 : 0 : default:
511 : 0 : find_match_scalar(d, &flows[0],
512 : : &matches[0]);
513 : : }
514 : : matching_required = 0;
515 : : }
516 : : /*
517 : : * Matches array now contain the intended worker ID (+1) of
518 : : * the incoming packets. Any zeroes need to be assigned
519 : : * workers.
520 : : */
521 : :
522 : 1049666 : next_mb = mbufs[next_idx++];
523 : 1049666 : next_value = (((int64_t)(uintptr_t)next_mb) <<
524 : : RTE_DISTRIB_FLAG_BITS);
525 : : /*
526 : : * User is advocated to set tag value for each
527 : : * mbuf before calling rte_distributor_process.
528 : : * User defined tags are used to identify flows,
529 : : * or sessions.
530 : : */
531 : : /* flows MUST be non-zero */
532 : 1049666 : new_tag = (uint16_t)(next_mb->hash.usr) | 1;
533 : :
534 : : /*
535 : : * Uncommenting the next line will cause the find_match
536 : : * function to be optimized out, making this function
537 : : * do parallel (non-atomic) distribution
538 : : */
539 : : /* matches[j] = 0; */
540 : :
541 [ + + - + ]: 1049666 : if (matches[j] && d->active[matches[j]-1]) {
542 : : struct rte_distributor_backlog *bl =
543 : : &d->backlog[matches[j]-1];
544 [ + + ]: 49 : if (unlikely(bl->count ==
545 : : RTE_DIST_BURST_SIZE)) {
546 : 6 : release(d, matches[j]-1);
547 [ - + ]: 6 : if (!d->active[matches[j]-1]) {
548 : 0 : j--;
549 : : next_idx--;
550 : : matching_required = 1;
551 : 0 : continue;
552 : : }
553 : : }
554 : :
555 : : /* Add to worker that already has flow */
556 : 49 : unsigned int idx = bl->count++;
557 : :
558 : 49 : bl->tags[idx] = new_tag;
559 : 49 : bl->pkts[idx] = next_value;
560 : :
561 : : } else {
562 : : struct rte_distributor_backlog *bl;
563 : :
564 [ - + ]: 1049617 : while (unlikely(!d->active[wkr]))
565 : 0 : wkr = (wkr + 1) % d->num_workers;
566 : : bl = &d->backlog[wkr];
567 : :
568 [ + + ]: 1049617 : if (unlikely(bl->count ==
569 : : RTE_DIST_BURST_SIZE)) {
570 : 131164 : release(d, wkr);
571 [ - + ]: 131164 : if (!d->active[wkr]) {
572 : 0 : j--;
573 : : next_idx--;
574 : : matching_required = 1;
575 : 0 : continue;
576 : : }
577 : : }
578 : :
579 : : /* Add to current worker */
580 : 1049617 : unsigned int idx = bl->count++;
581 : :
582 : 1049617 : bl->tags[idx] = new_tag;
583 : 1049617 : bl->pkts[idx] = next_value;
584 : : /*
585 : : * Now that we've just added an unpinned flow
586 : : * to a worker, we need to ensure that all
587 : : * other packets with that same flow will go
588 : : * to the same worker in this burst.
589 : : */
590 [ + + ]: 5772888 : for (w = j; w < pkts; w++)
591 [ + + ]: 4723271 : if (flows[w] == new_tag)
592 : 1049636 : matches[w] = wkr+1;
593 : : }
594 : : }
595 : 131210 : wkr = (wkr + 1) % d->num_workers;
596 : : }
597 : :
598 : : /* Flush out all non-full cache-lines to workers. */
599 [ + + ]: 65608 : for (wid = 0 ; wid < d->num_workers; wid++)
600 : : /* Sync with worker on GET_BUF flag. */
601 : 32804 : if ((rte_atomic_load_explicit(&(d->bufs[wid].bufptr64[0]),
602 [ + + ]: 32804 : rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF)) {
603 : 5 : d->bufs[wid].count = 0;
604 : 5 : release(d, wid);
605 : : }
606 : :
607 : 32804 : return num_mbufs;
608 : : }
609 : :
610 : : /* return to the caller, packets returned from workers */
611 : : int
612 : 78 : rte_distributor_returned_pkts(struct rte_distributor *d,
613 : : struct rte_mbuf **mbufs, unsigned int max_mbufs)
614 : : {
615 : : struct rte_distributor_returned_pkts *returns = &d->returns;
616 : 78 : unsigned int retval = (max_mbufs < returns->count) ?
617 : : max_mbufs : returns->count;
618 : : unsigned int i;
619 : :
620 [ + + ]: 78 : if (d->alg_type == RTE_DIST_ALG_SINGLE) {
621 : : /* Call the old API */
622 : 39 : return rte_distributor_returned_pkts_single(d->d_single,
623 : : mbufs, max_mbufs);
624 : : }
625 : :
626 [ + + ]: 1129 : for (i = 0; i < retval; i++) {
627 : 1090 : unsigned int idx = (returns->start + i) &
628 : : RTE_DISTRIB_RETURNS_MASK;
629 : :
630 : 1090 : mbufs[i] = returns->mbufs[idx];
631 : : }
632 : 39 : returns->start += i;
633 : 39 : returns->count -= i;
634 : :
635 : 39 : return retval;
636 : : }
637 : :
638 : : /*
639 : : * Return the number of packets in-flight in a distributor, i.e. packets
640 : : * being worked on or queued up in a backlog.
641 : : */
642 : : static inline unsigned int
643 : : total_outstanding(const struct rte_distributor *d)
644 : : {
645 : : unsigned int wkr, total_outstanding = 0;
646 : :
647 [ + + + + ]: 470 : for (wkr = 0; wkr < d->num_workers; wkr++)
648 : 235 : total_outstanding += d->backlog[wkr].count + d->bufs[wkr].count;
649 : :
650 : : return total_outstanding;
651 : : }
652 : :
653 : : /*
654 : : * Flush the distributor, so that there are no outstanding packets in flight or
655 : : * queued up.
656 : : */
657 : : int
658 : 78 : rte_distributor_flush(struct rte_distributor *d)
659 : : {
660 : : unsigned int flushed;
661 : : unsigned int wkr;
662 : :
663 [ + + ]: 78 : if (d->alg_type == RTE_DIST_ALG_SINGLE) {
664 : : /* Call the old API */
665 : 39 : return rte_distributor_flush_single(d->d_single);
666 : : }
667 : :
668 : : flushed = total_outstanding(d);
669 : :
670 [ + + ]: 196 : while (total_outstanding(d) > 0)
671 : 157 : rte_distributor_process(d, NULL, 0);
672 : :
673 : : /* wait 10ms to allow all worker drain the pkts */
674 : 39 : rte_delay_us(10000);
675 : :
676 : : /*
677 : : * Send empty burst to all workers to allow them to exit
678 : : * gracefully, should they need to.
679 : : */
680 : 39 : rte_distributor_process(d, NULL, 0);
681 : :
682 [ + + ]: 78 : for (wkr = 0; wkr < d->num_workers; wkr++)
683 : 39 : handle_returns(d, wkr);
684 : :
685 : 39 : return flushed;
686 : : }
687 : :
688 : : /* clears the internal returns array in the distributor */
689 : : void
690 : 6 : rte_distributor_clear_returns(struct rte_distributor *d)
691 : : {
692 : : unsigned int wkr;
693 : :
694 [ + + ]: 6 : if (d->alg_type == RTE_DIST_ALG_SINGLE) {
695 : : /* Call the old API */
696 : 3 : rte_distributor_clear_returns_single(d->d_single);
697 : 3 : return;
698 : : }
699 : :
700 : : /* throw away returns, so workers can exit */
701 [ + + ]: 6 : for (wkr = 0; wkr < d->num_workers; wkr++)
702 : : /* Sync with worker. Release retptrs. */
703 : 3 : rte_atomic_store_explicit(&(d->bufs[wkr].retptr64[0]), 0,
704 : : rte_memory_order_release);
705 : :
706 : 3 : d->returns.start = d->returns.count = 0;
707 : : }
708 : :
709 : : /* creates a distributor instance */
710 : : struct rte_distributor *
711 : 6 : rte_distributor_create(const char *name,
712 : : unsigned int socket_id,
713 : : unsigned int num_workers,
714 : : unsigned int alg_type)
715 : : {
716 : : struct rte_distributor *d;
717 : : struct rte_dist_burst_list *dist_burst_list;
718 : : char mz_name[RTE_MEMZONE_NAMESIZE];
719 : : const struct rte_memzone *mz;
720 : : unsigned int i;
721 : :
722 : : /* TODO Reorganise function properly around RTE_DIST_ALG_SINGLE/BURST */
723 : :
724 : : /* compilation-time checks */
725 : : RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
726 : : RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
727 : :
728 [ + + + + ]: 6 : if (name == NULL || num_workers >=
729 : : (unsigned int)RTE_MIN(RTE_DISTRIB_MAX_WORKERS, RTE_MAX_LCORE)) {
730 : 4 : rte_errno = EINVAL;
731 : 4 : return NULL;
732 : : }
733 : :
734 [ + + ]: 2 : if (alg_type == RTE_DIST_ALG_SINGLE) {
735 : 1 : d = malloc(sizeof(struct rte_distributor));
736 [ - + ]: 1 : if (d == NULL) {
737 : 0 : rte_errno = ENOMEM;
738 : 0 : return NULL;
739 : : }
740 : 1 : d->d_single = rte_distributor_create_single(name,
741 : : socket_id, num_workers);
742 [ - + ]: 1 : if (d->d_single == NULL) {
743 : 0 : free(d);
744 : : /* rte_errno will have been set */
745 : 0 : return NULL;
746 : : }
747 : 1 : d->alg_type = alg_type;
748 : 1 : return d;
749 : : }
750 : :
751 : : snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
752 : 1 : mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
753 [ - + ]: 1 : if (mz == NULL) {
754 : 0 : rte_errno = ENOMEM;
755 : 0 : return NULL;
756 : : }
757 : :
758 : 1 : d = mz->addr;
759 : 1 : strlcpy(d->name, name, sizeof(d->name));
760 : 1 : d->num_workers = num_workers;
761 : 1 : d->alg_type = alg_type;
762 : :
763 : 1 : d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
764 : : #if defined(RTE_ARCH_X86)
765 [ + - ]: 1 : if (rte_vect_get_max_simd_bitwidth() >= RTE_VECT_SIMD_128)
766 : 1 : d->dist_match_fn = RTE_DIST_MATCH_VECTOR;
767 : : #endif
768 : :
769 : : /*
770 : : * Set up the backlog tags so they're pointing at the second cache
771 : : * line for performance during flow matching
772 : : */
773 [ + + ]: 2 : for (i = 0 ; i < num_workers ; i++)
774 : 1 : d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE];
775 : :
776 : 1 : memset(d->active, 0, sizeof(d->active));
777 : 1 : d->activesum = 0;
778 : :
779 : 1 : dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head,
780 : : rte_dist_burst_list);
781 : :
782 : :
783 : 1 : rte_mcfg_tailq_write_lock();
784 : 1 : TAILQ_INSERT_TAIL(dist_burst_list, d, next);
785 : 1 : rte_mcfg_tailq_write_unlock();
786 : :
787 : 1 : return d;
788 : : }
|