Branch data Line data Source code
1 : : /* SPDX-License-Identifier: BSD-3-Clause
2 : : * Copyright(c) 2017 Intel Corporation
3 : : */
4 : :
5 : : #include <stdalign.h>
6 : : #include <stdio.h>
7 : : #include <stdlib.h>
8 : : #include <sys/queue.h>
9 : : #include <string.h>
10 : : #include <rte_mbuf.h>
11 : : #include <rte_cycles.h>
12 : : #include <rte_memzone.h>
13 : : #include <rte_errno.h>
14 : : #include <rte_string_fns.h>
15 : : #include <rte_eal_memconfig.h>
16 : : #include <rte_pause.h>
17 : : #include <rte_tailq.h>
18 : :
19 : : #include "rte_distributor.h"
20 : : #include "rte_distributor_single.h"
21 : : #include "distributor_private.h"
22 : :
23 : : TAILQ_HEAD(rte_dist_burst_list, rte_distributor);
24 : :
25 : : static struct rte_tailq_elem rte_dist_burst_tailq = {
26 : : .name = "RTE_DIST_BURST",
27 : : };
28 [ - + ]: 252 : EAL_REGISTER_TAILQ(rte_dist_burst_tailq)
29 : :
30 : : /**** APIs called by workers ****/
31 : :
32 : : /**** Burst Packet APIs called by workers ****/
33 : :
34 : : void
35 : 131283 : rte_distributor_request_pkt(struct rte_distributor *d,
36 : : unsigned int worker_id, struct rte_mbuf **oldpkt,
37 : : unsigned int count)
38 : : {
39 : : struct rte_distributor_buffer *buf = &(d->bufs[worker_id]);
40 : : unsigned int i;
41 : :
42 : : volatile RTE_ATOMIC(int64_t) *retptr64;
43 : :
44 [ - + ]: 131283 : if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
45 [ # # ]: 0 : rte_distributor_request_pkt_single(d->d_single,
46 : : worker_id, count ? oldpkt[0] : NULL);
47 : 0 : return;
48 : : }
49 : :
50 : 131283 : retptr64 = &(buf->retptr64[0]);
51 : : /* Spin while handshake bits are set (scheduler clears it).
52 : : * Sync with worker on GET_BUF flag.
53 : : */
54 [ + + ]: 3114860 : while (unlikely(rte_atomic_load_explicit(retptr64, rte_memory_order_acquire)
55 : : & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
56 : : rte_pause();
57 : 2983577 : uint64_t t = rte_rdtsc()+100;
58 : :
59 [ + + ]: 11932174 : while (rte_rdtsc() < t)
60 : : rte_pause();
61 : : }
62 : :
63 : : /*
64 : : * OK, if we've got here, then the scheduler has just cleared the
65 : : * handshake bits. Populate the retptrs with returning packets.
66 : : */
67 : :
68 [ + + ]: 1180459 : for (i = count; i < RTE_DIST_BURST_SIZE; i++)
69 : 1049176 : buf->retptr64[i] = 0;
70 : :
71 : : /* Set VALID_BUF bit for each packet returned */
72 [ + + ]: 132371 : for (i = count; i-- > 0; )
73 : 1088 : buf->retptr64[i] =
74 : 1088 : (((int64_t)(uintptr_t)(oldpkt[i])) <<
75 : 1088 : RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
76 : :
77 : : /*
78 : : * Finally, set the GET_BUF to signal to distributor that cache
79 : : * line is ready for processing
80 : : * Sync with distributor to release retptrs
81 : : */
82 : 131283 : rte_atomic_store_explicit(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF,
83 : : rte_memory_order_release);
84 : : }
85 : :
86 : : int
87 : 4875317 : rte_distributor_poll_pkt(struct rte_distributor *d,
88 : : unsigned int worker_id, struct rte_mbuf **pkts)
89 : : {
90 : : struct rte_distributor_buffer *buf = &d->bufs[worker_id];
91 : : uint64_t ret;
92 : : int count = 0;
93 : : unsigned int i;
94 : :
95 [ - + ]: 4875317 : if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
96 : 0 : pkts[0] = rte_distributor_poll_pkt_single(d->d_single,
97 : : worker_id);
98 : 0 : return (pkts[0]) ? 1 : 0;
99 : : }
100 : :
101 : : /* If any of below bits is set, return.
102 : : * GET_BUF is set when distributor hasn't sent any packets yet
103 : : * RETURN_BUF is set when distributor must retrieve in-flight packets
104 : : * Sync with distributor to acquire bufptrs
105 : : */
106 : 4875317 : if (rte_atomic_load_explicit(&(buf->bufptr64[0]), rte_memory_order_acquire)
107 [ + + ]: 4875317 : & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))
108 : : return -1;
109 : :
110 : : /* since bufptr64 is signed, this should be an arithmetic shift */
111 [ + + ]: 1181547 : for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
112 [ + + ]: 1050264 : if (likely(buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)) {
113 : 1049666 : ret = buf->bufptr64[i] >> RTE_DISTRIB_FLAG_BITS;
114 : 1049666 : pkts[count++] = (struct rte_mbuf *)((uintptr_t)(ret));
115 : : }
116 : : }
117 : :
118 : : /*
119 : : * so now we've got the contents of the cacheline into an array of
120 : : * mbuf pointers, so toggle the bit so scheduler can start working
121 : : * on the next cacheline while we're working.
122 : : * Sync with distributor on GET_BUF flag. Release bufptrs.
123 : : */
124 : 131283 : rte_atomic_store_explicit(&(buf->bufptr64[0]),
125 : : buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, rte_memory_order_release);
126 : :
127 : 131283 : return count;
128 : : }
129 : :
130 : : int
131 : 1180949 : rte_distributor_get_pkt(struct rte_distributor *d,
132 : : unsigned int worker_id, struct rte_mbuf **pkts,
133 : : struct rte_mbuf **oldpkt, unsigned int return_count)
134 : : {
135 : : int count;
136 : :
137 [ + + ]: 1180949 : if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
138 [ + - ]: 1049666 : if (return_count <= 1) {
139 [ + + ]: 1049666 : pkts[0] = rte_distributor_get_pkt_single(d->d_single,
140 : : worker_id, return_count ? oldpkt[0] : NULL);
141 : 1049666 : return (pkts[0]) ? 1 : 0;
142 : : } else
143 : : return -EINVAL;
144 : : }
145 : :
146 : 131283 : rte_distributor_request_pkt(d, worker_id, oldpkt, return_count);
147 : :
148 : 131283 : count = rte_distributor_poll_pkt(d, worker_id, pkts);
149 [ + + ]: 4875317 : while (count == -1) {
150 : 4744034 : uint64_t t = rte_rdtsc() + 100;
151 : :
152 [ + + ]: 18974994 : while (rte_rdtsc() < t)
153 : : rte_pause();
154 : :
155 : 4744034 : count = rte_distributor_poll_pkt(d, worker_id, pkts);
156 : : }
157 : : return count;
158 : : }
159 : :
160 : : int
161 : 4 : rte_distributor_return_pkt(struct rte_distributor *d,
162 : : unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
163 : : {
164 : : struct rte_distributor_buffer *buf = &d->bufs[worker_id];
165 : : unsigned int i;
166 : :
167 [ + + ]: 4 : if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
168 [ + - ]: 2 : if (num == 1)
169 : 2 : return rte_distributor_return_pkt_single(d->d_single,
170 : : worker_id, oldpkt[0]);
171 [ # # ]: 0 : else if (num == 0)
172 : 0 : return rte_distributor_return_pkt_single(d->d_single,
173 : : worker_id, NULL);
174 : : else
175 : : return -EINVAL;
176 : : }
177 : :
178 : : /* Spin while handshake bits are set (scheduler clears it).
179 : : * Sync with worker on GET_BUF flag.
180 : : */
181 [ - + ]: 2 : while (unlikely(rte_atomic_load_explicit(&(buf->retptr64[0]), rte_memory_order_relaxed)
182 : : & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
183 : : rte_pause();
184 : 0 : uint64_t t = rte_rdtsc()+100;
185 : :
186 [ # # ]: 0 : while (rte_rdtsc() < t)
187 : : rte_pause();
188 : : }
189 : :
190 : : /* Sync with distributor to acquire retptrs */
191 : : rte_atomic_thread_fence(rte_memory_order_acquire);
192 [ + + ]: 18 : for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
193 : : /* Switch off the return bit first */
194 : 16 : buf->retptr64[i] = 0;
195 : :
196 [ + + ]: 4 : for (i = num; i-- > 0; )
197 : 2 : buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
198 : 2 : RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
199 : :
200 : : /* Use RETURN_BUF on bufptr64 to notify distributor that
201 : : * we won't read any mbufs from there even if GET_BUF is set.
202 : : * This allows distributor to retrieve in-flight already sent packets.
203 : : */
204 : 2 : rte_atomic_fetch_or_explicit(&(buf->bufptr64[0]), RTE_DISTRIB_RETURN_BUF,
205 : : rte_memory_order_acq_rel);
206 : :
207 : : /* set the RETURN_BUF on retptr64 even if we got no returns.
208 : : * Sync with distributor on RETURN_BUF flag. Release retptrs.
209 : : * Notify distributor that we don't request more packets any more.
210 : : */
211 : 2 : rte_atomic_store_explicit(&(buf->retptr64[0]),
212 : : buf->retptr64[0] | RTE_DISTRIB_RETURN_BUF, rte_memory_order_release);
213 : :
214 : 2 : return 0;
215 : : }
216 : :
217 : : /**** APIs called on distributor core ***/
218 : :
219 : : /* stores a packet returned from a worker inside the returns array */
220 : : static inline void
221 : : store_return(uintptr_t oldbuf, struct rte_distributor *d,
222 : : unsigned int *ret_start, unsigned int *ret_count)
223 : : {
224 : 1090 : if (!oldbuf)
225 : : return;
226 : : /* store returns in a circular buffer */
227 : 1090 : d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
228 : 1090 : = (void *)oldbuf;
229 : 1090 : *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK);
230 : 1090 : *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK);
231 : : }
232 : :
233 : : /*
234 : : * Match then flow_ids (tags) of the incoming packets to the flow_ids
235 : : * of the inflight packets (both inflight on the workers and in each worker
236 : : * backlog). This will then allow us to pin those packets to the relevant
237 : : * workers to give us our atomic flow pinning.
238 : : */
239 : : void
240 : 0 : find_match_scalar(struct rte_distributor *d,
241 : : uint16_t *data_ptr,
242 : : uint16_t *output_ptr)
243 : : {
244 : : struct rte_distributor_backlog *bl;
245 : : uint16_t i, j, w;
246 : :
247 : : /*
248 : : * Function overview:
249 : : * 1. Loop through all worker ID's
250 : : * 2. Compare the current inflights to the incoming tags
251 : : * 3. Compare the current backlog to the incoming tags
252 : : * 4. Add any matches to the output
253 : : */
254 : :
255 [ # # ]: 0 : for (j = 0 ; j < RTE_DIST_BURST_SIZE; j++)
256 : 0 : output_ptr[j] = 0;
257 : :
258 [ # # ]: 0 : for (i = 0; i < d->num_workers; i++) {
259 : 0 : bl = &d->backlog[i];
260 : :
261 [ # # ]: 0 : for (j = 0; j < RTE_DIST_BURST_SIZE ; j++)
262 [ # # ]: 0 : for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
263 [ # # ]: 0 : if (d->in_flight_tags[i][w] == data_ptr[j]) {
264 : 0 : output_ptr[j] = i+1;
265 : 0 : break;
266 : : }
267 [ # # ]: 0 : for (j = 0; j < RTE_DIST_BURST_SIZE; j++)
268 [ # # ]: 0 : for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
269 [ # # ]: 0 : if (bl->tags[w] == data_ptr[j]) {
270 : 0 : output_ptr[j] = i+1;
271 : 0 : break;
272 : : }
273 : : }
274 : :
275 : : /*
276 : : * At this stage, the output contains 8 16-bit values, with
277 : : * each non-zero value containing the worker ID on which the
278 : : * corresponding flow is pinned to.
279 : : */
280 : 0 : }
281 : :
282 : : /*
283 : : * When worker called rte_distributor_return_pkt()
284 : : * and passed RTE_DISTRIB_RETURN_BUF handshake through retptr64,
285 : : * distributor must retrieve both inflight and backlog packets assigned
286 : : * to the worker and reprocess them to another worker.
287 : : */
288 : : static void
289 : 2 : handle_worker_shutdown(struct rte_distributor *d, unsigned int wkr)
290 : : {
291 : : struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
292 : : /* double BURST size for storing both inflights and backlog */
293 : : struct rte_mbuf *pkts[RTE_DIST_BURST_SIZE * 2];
294 : : unsigned int pkts_count = 0;
295 : : unsigned int i;
296 : :
297 : : /* If GET_BUF is cleared there are in-flight packets sent
298 : : * to worker which does not require new packets.
299 : : * They must be retrieved and assigned to another worker.
300 : : */
301 : 2 : if (!(rte_atomic_load_explicit(&(buf->bufptr64[0]), rte_memory_order_acquire)
302 [ + - ]: 2 : & RTE_DISTRIB_GET_BUF))
303 [ + + ]: 18 : for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
304 [ - + ]: 16 : if (buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)
305 : 0 : pkts[pkts_count++] = (void *)((uintptr_t)
306 : 0 : (buf->bufptr64[i]
307 : 0 : >> RTE_DISTRIB_FLAG_BITS));
308 : :
309 : : /* Make following operations on handshake flags on bufptr64:
310 : : * - set GET_BUF to indicate that distributor can overwrite buffer
311 : : * with new packets if worker will make a new request.
312 : : * - clear RETURN_BUF to unlock reads on worker side.
313 : : */
314 : 2 : rte_atomic_store_explicit(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF,
315 : : rte_memory_order_release);
316 : :
317 : : /* Collect backlog packets from worker */
318 [ - + ]: 2 : for (i = 0; i < d->backlog[wkr].count; i++)
319 : 0 : pkts[pkts_count++] = (void *)((uintptr_t)
320 : 0 : (d->backlog[wkr].pkts[i] >> RTE_DISTRIB_FLAG_BITS));
321 : :
322 : 2 : d->backlog[wkr].count = 0;
323 : :
324 : : /* Clear both inflight and backlog tags */
325 [ + + ]: 18 : for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
326 : 16 : d->in_flight_tags[wkr][i] = 0;
327 : 16 : d->backlog[wkr].tags[i] = 0;
328 : : }
329 : :
330 : : /* Recursive call */
331 [ - + ]: 2 : if (pkts_count > 0)
332 : 0 : rte_distributor_process(d, pkts, pkts_count);
333 : 2 : }
334 : :
335 : :
336 : : /*
337 : : * When the handshake bits indicate that there are packets coming
338 : : * back from the worker, this function is called to copy and store
339 : : * the valid returned pointers (store_return).
340 : : */
341 : : static unsigned int
342 : 7234185 : handle_returns(struct rte_distributor *d, unsigned int wkr)
343 : : {
344 : : struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
345 : : uintptr_t oldbuf;
346 : 7234185 : unsigned int ret_start = d->returns.start,
347 : 7234185 : ret_count = d->returns.count;
348 : : unsigned int count = 0;
349 : : unsigned int i;
350 : :
351 : : /* Sync on GET_BUF flag. Acquire retptrs. */
352 : 7234185 : if (rte_atomic_load_explicit(&(buf->retptr64[0]), rte_memory_order_acquire)
353 [ + + ]: 7234185 : & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) {
354 [ + + ]: 1181565 : for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
355 [ + + ]: 1050280 : if (buf->retptr64[i] & RTE_DISTRIB_VALID_BUF) {
356 [ + - ]: 1090 : oldbuf = ((uintptr_t)(buf->retptr64[i] >>
357 : : RTE_DISTRIB_FLAG_BITS));
358 : : /* store returns in a circular buffer */
359 : : store_return(oldbuf, d, &ret_start, &ret_count);
360 : 1090 : count++;
361 : 1090 : buf->retptr64[i] &= ~RTE_DISTRIB_VALID_BUF;
362 : : }
363 : : }
364 : 131285 : d->returns.start = ret_start;
365 : 131285 : d->returns.count = ret_count;
366 : :
367 : : /* If worker requested packets with GET_BUF, set it to active
368 : : * otherwise (RETURN_BUF), set it to not active.
369 : : */
370 : 131285 : d->activesum -= d->active[wkr];
371 : 131285 : d->active[wkr] = !!(buf->retptr64[0] & RTE_DISTRIB_GET_BUF);
372 : 131285 : d->activesum += d->active[wkr];
373 : :
374 : : /* If worker returned packets without requesting new ones,
375 : : * handle all in-flights and backlog packets assigned to it.
376 : : */
377 [ + + ]: 131285 : if (unlikely(buf->retptr64[0] & RTE_DISTRIB_RETURN_BUF))
378 : 2 : handle_worker_shutdown(d, wkr);
379 : :
380 : : /* Clear for the worker to populate with more returns.
381 : : * Sync with distributor on GET_BUF flag. Release retptrs.
382 : : */
383 : 131285 : rte_atomic_store_explicit(&(buf->retptr64[0]), 0, rte_memory_order_release);
384 : : }
385 : 7234185 : return count;
386 : : }
387 : :
388 : : /*
389 : : * This function releases a burst (cache line) to a worker.
390 : : * It is called from the process function when a cacheline is
391 : : * full to make room for more packets for that worker, or when
392 : : * all packets have been assigned to bursts and need to be flushed
393 : : * to the workers.
394 : : * It also needs to wait for any outstanding packets from the worker
395 : : * before sending out new packets.
396 : : */
397 : : static unsigned int
398 : 131286 : release(struct rte_distributor *d, unsigned int wkr)
399 : : {
400 : : struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
401 : : unsigned int i;
402 : :
403 : 131286 : handle_returns(d, wkr);
404 [ + + ]: 131286 : if (unlikely(!d->active[wkr]))
405 : : return 0;
406 : :
407 : : /* Sync with worker on GET_BUF flag */
408 : 7132660 : while (!(rte_atomic_load_explicit(&(d->bufs[wkr].bufptr64[0]), rte_memory_order_acquire)
409 [ + + ]: 7132660 : & RTE_DISTRIB_GET_BUF)) {
410 : 7001376 : handle_returns(d, wkr);
411 [ + - ]: 7001376 : if (unlikely(!d->active[wkr]))
412 : : return 0;
413 : : rte_pause();
414 : : }
415 : :
416 : : buf->count = 0;
417 : :
418 [ + + ]: 1180950 : for (i = 0; i < d->backlog[wkr].count; i++) {
419 : 1049666 : d->bufs[wkr].bufptr64[i] = d->backlog[wkr].pkts[i] |
420 : 1049666 : RTE_DISTRIB_GET_BUF | RTE_DISTRIB_VALID_BUF;
421 : 1049666 : d->in_flight_tags[wkr][i] = d->backlog[wkr].tags[i];
422 : : }
423 : 131284 : buf->count = i;
424 [ + + ]: 131890 : for ( ; i < RTE_DIST_BURST_SIZE ; i++) {
425 : 606 : buf->bufptr64[i] = RTE_DISTRIB_GET_BUF;
426 : 606 : d->in_flight_tags[wkr][i] = 0;
427 : : }
428 : :
429 : 131284 : d->backlog[wkr].count = 0;
430 : :
431 : : /* Clear the GET bit.
432 : : * Sync with worker on GET_BUF flag. Release bufptrs.
433 : : */
434 : 131284 : rte_atomic_store_explicit(&(buf->bufptr64[0]),
435 : : buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, rte_memory_order_release);
436 : 131284 : return buf->count;
437 : :
438 : : }
439 : :
440 : :
441 : : /* process a set of packets to distribute them to workers */
442 : : int
443 : 134180 : rte_distributor_process(struct rte_distributor *d,
444 : : struct rte_mbuf **mbufs, unsigned int num_mbufs)
445 : : {
446 : : unsigned int next_idx = 0;
447 : : static unsigned int wkr;
448 : : struct rte_mbuf *next_mb = NULL;
449 : : int64_t next_value = 0;
450 : : uint16_t new_tag = 0;
451 : : alignas(RTE_CACHE_LINE_SIZE) uint16_t flows[RTE_DIST_BURST_SIZE];
452 : : unsigned int i, j, w, wid, matching_required;
453 : :
454 [ + + ]: 134180 : if (d->alg_type == RTE_DIST_ALG_SINGLE) {
455 : : /* Call the old API */
456 : 32806 : return rte_distributor_process_single(d->d_single,
457 : : mbufs, num_mbufs);
458 : : }
459 : :
460 [ + + ]: 202748 : for (wid = 0 ; wid < d->num_workers; wid++)
461 : 101374 : handle_returns(d, wid);
462 : :
463 [ + + ]: 101374 : if (unlikely(num_mbufs == 0)) {
464 : : /* Flush out all non-full cache-lines to workers. */
465 [ + + ]: 137140 : for (wid = 0 ; wid < d->num_workers; wid++) {
466 : : /* Sync with worker on GET_BUF flag. */
467 : 68570 : if (rte_atomic_load_explicit(&(d->bufs[wid].bufptr64[0]),
468 [ + + ]: 68570 : rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF) {
469 : 110 : d->bufs[wid].count = 0;
470 : 110 : release(d, wid);
471 : 110 : handle_returns(d, wid);
472 : : }
473 : : }
474 : : return 0;
475 : : }
476 : :
477 [ + - ]: 32804 : if (unlikely(!d->activesum))
478 : : return 0;
479 : :
480 [ + + ]: 164014 : while (next_idx < num_mbufs) {
481 : : alignas(128) uint16_t matches[RTE_DIST_BURST_SIZE];
482 : : unsigned int pkts;
483 : :
484 [ + + ]: 131210 : if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
485 : : pkts = num_mbufs - next_idx;
486 : : else
487 : : pkts = RTE_DIST_BURST_SIZE;
488 : :
489 [ + + ]: 1180876 : for (i = 0; i < pkts; i++) {
490 [ + - ]: 1049666 : if (mbufs[next_idx + i]) {
491 : : /* flows have to be non-zero */
492 : 1049666 : flows[i] = mbufs[next_idx + i]->hash.usr | 1;
493 : : } else
494 : 0 : flows[i] = 0;
495 : : }
496 [ + + ]: 131224 : for (; i < RTE_DIST_BURST_SIZE; i++)
497 : 14 : flows[i] = 0;
498 : :
499 : : matching_required = 1;
500 : :
501 [ + + ]: 1180876 : for (j = 0; j < pkts; j++) {
502 [ - + ]: 1049666 : if (unlikely(!d->activesum))
503 : 0 : return next_idx;
504 : :
505 [ + + ]: 1049666 : if (unlikely(matching_required)) {
506 [ + - ]: 131210 : switch (d->dist_match_fn) {
507 : 131210 : case RTE_DIST_MATCH_VECTOR:
508 : 131210 : find_match_vec(d, &flows[0],
509 : : &matches[0]);
510 : 131210 : break;
511 : 0 : default:
512 : 0 : find_match_scalar(d, &flows[0],
513 : : &matches[0]);
514 : : }
515 : : matching_required = 0;
516 : : }
517 : : /*
518 : : * Matches array now contain the intended worker ID (+1) of
519 : : * the incoming packets. Any zeroes need to be assigned
520 : : * workers.
521 : : */
522 : :
523 : 1049666 : next_mb = mbufs[next_idx++];
524 : 1049666 : next_value = (((int64_t)(uintptr_t)next_mb) <<
525 : : RTE_DISTRIB_FLAG_BITS);
526 : : /*
527 : : * User is advocated to set tag value for each
528 : : * mbuf before calling rte_distributor_process.
529 : : * User defined tags are used to identify flows,
530 : : * or sessions.
531 : : */
532 : : /* flows MUST be non-zero */
533 : 1049666 : new_tag = (uint16_t)(next_mb->hash.usr) | 1;
534 : :
535 : : /*
536 : : * Uncommenting the next line will cause the find_match
537 : : * function to be optimized out, making this function
538 : : * do parallel (non-atomic) distribution
539 : : */
540 : : /* matches[j] = 0; */
541 : :
542 [ + + - + ]: 1049666 : if (matches[j] && d->active[matches[j]-1]) {
543 : : struct rte_distributor_backlog *bl =
544 : : &d->backlog[matches[j]-1];
545 [ + + ]: 49 : if (unlikely(bl->count ==
546 : : RTE_DIST_BURST_SIZE)) {
547 : 6 : release(d, matches[j]-1);
548 [ - + ]: 6 : if (!d->active[matches[j]-1]) {
549 : 0 : j--;
550 : : next_idx--;
551 : : matching_required = 1;
552 : 0 : continue;
553 : : }
554 : : }
555 : :
556 : : /* Add to worker that already has flow */
557 : 49 : unsigned int idx = bl->count++;
558 : :
559 : 49 : bl->tags[idx] = new_tag;
560 : 49 : bl->pkts[idx] = next_value;
561 : :
562 : : } else {
563 : : struct rte_distributor_backlog *bl;
564 : :
565 [ - + ]: 1049617 : while (unlikely(!d->active[wkr]))
566 : 0 : wkr = (wkr + 1) % d->num_workers;
567 : : bl = &d->backlog[wkr];
568 : :
569 [ + + ]: 1049617 : if (unlikely(bl->count ==
570 : : RTE_DIST_BURST_SIZE)) {
571 : 131147 : release(d, wkr);
572 [ - + ]: 131147 : if (!d->active[wkr]) {
573 : 0 : j--;
574 : : next_idx--;
575 : : matching_required = 1;
576 : 0 : continue;
577 : : }
578 : : }
579 : :
580 : : /* Add to current worker */
581 : 1049617 : unsigned int idx = bl->count++;
582 : :
583 : 1049617 : bl->tags[idx] = new_tag;
584 : 1049617 : bl->pkts[idx] = next_value;
585 : : /*
586 : : * Now that we've just added an unpinned flow
587 : : * to a worker, we need to ensure that all
588 : : * other packets with that same flow will go
589 : : * to the same worker in this burst.
590 : : */
591 [ + + ]: 5772888 : for (w = j; w < pkts; w++)
592 [ + + ]: 4723271 : if (flows[w] == new_tag)
593 : 1049636 : matches[w] = wkr+1;
594 : : }
595 : : }
596 : 131210 : wkr = (wkr + 1) % d->num_workers;
597 : : }
598 : :
599 : : /* Flush out all non-full cache-lines to workers. */
600 [ + + ]: 65608 : for (wid = 0 ; wid < d->num_workers; wid++)
601 : : /* Sync with worker on GET_BUF flag. */
602 : 32804 : if ((rte_atomic_load_explicit(&(d->bufs[wid].bufptr64[0]),
603 [ + + ]: 32804 : rte_memory_order_acquire) & RTE_DISTRIB_GET_BUF)) {
604 : 23 : d->bufs[wid].count = 0;
605 : 23 : release(d, wid);
606 : : }
607 : :
608 : 32804 : return num_mbufs;
609 : : }
610 : :
611 : : /* return to the caller, packets returned from workers */
612 : : int
613 : 78 : rte_distributor_returned_pkts(struct rte_distributor *d,
614 : : struct rte_mbuf **mbufs, unsigned int max_mbufs)
615 : : {
616 : : struct rte_distributor_returned_pkts *returns = &d->returns;
617 : 78 : unsigned int retval = (max_mbufs < returns->count) ?
618 : : max_mbufs : returns->count;
619 : : unsigned int i;
620 : :
621 [ + + ]: 78 : if (d->alg_type == RTE_DIST_ALG_SINGLE) {
622 : : /* Call the old API */
623 : 39 : return rte_distributor_returned_pkts_single(d->d_single,
624 : : mbufs, max_mbufs);
625 : : }
626 : :
627 [ + + ]: 1129 : for (i = 0; i < retval; i++) {
628 : 1090 : unsigned int idx = (returns->start + i) &
629 : : RTE_DISTRIB_RETURNS_MASK;
630 : :
631 : 1090 : mbufs[i] = returns->mbufs[idx];
632 : : }
633 : 39 : returns->start += i;
634 : 39 : returns->count -= i;
635 : :
636 : 39 : return retval;
637 : : }
638 : :
639 : : /*
640 : : * Return the number of packets in-flight in a distributor, i.e. packets
641 : : * being worked on or queued up in a backlog.
642 : : */
643 : : static inline unsigned int
644 : : total_outstanding(const struct rte_distributor *d)
645 : : {
646 : : unsigned int wkr, total_outstanding = 0;
647 : :
648 [ + + + + ]: 137214 : for (wkr = 0; wkr < d->num_workers; wkr++)
649 : 68607 : total_outstanding += d->backlog[wkr].count + d->bufs[wkr].count;
650 : :
651 : : return total_outstanding;
652 : : }
653 : :
654 : : /*
655 : : * Flush the distributor, so that there are no outstanding packets in flight or
656 : : * queued up.
657 : : */
658 : : int
659 : 78 : rte_distributor_flush(struct rte_distributor *d)
660 : : {
661 : : unsigned int flushed;
662 : : unsigned int wkr;
663 : :
664 [ + + ]: 78 : if (d->alg_type == RTE_DIST_ALG_SINGLE) {
665 : : /* Call the old API */
666 : 39 : return rte_distributor_flush_single(d->d_single);
667 : : }
668 : :
669 : : flushed = total_outstanding(d);
670 : :
671 [ + + ]: 68568 : while (total_outstanding(d) > 0)
672 : 68529 : rte_distributor_process(d, NULL, 0);
673 : :
674 : : /* wait 10ms to allow all worker drain the pkts */
675 : 39 : rte_delay_us(10000);
676 : :
677 : : /*
678 : : * Send empty burst to all workers to allow them to exit
679 : : * gracefully, should they need to.
680 : : */
681 : 39 : rte_distributor_process(d, NULL, 0);
682 : :
683 [ + + ]: 78 : for (wkr = 0; wkr < d->num_workers; wkr++)
684 : 39 : handle_returns(d, wkr);
685 : :
686 : 39 : return flushed;
687 : : }
688 : :
689 : : /* clears the internal returns array in the distributor */
690 : : void
691 : 6 : rte_distributor_clear_returns(struct rte_distributor *d)
692 : : {
693 : : unsigned int wkr;
694 : :
695 [ + + ]: 6 : if (d->alg_type == RTE_DIST_ALG_SINGLE) {
696 : : /* Call the old API */
697 : 3 : rte_distributor_clear_returns_single(d->d_single);
698 : 3 : return;
699 : : }
700 : :
701 : : /* throw away returns, so workers can exit */
702 [ + + ]: 6 : for (wkr = 0; wkr < d->num_workers; wkr++)
703 : : /* Sync with worker. Release retptrs. */
704 : 3 : rte_atomic_store_explicit(&(d->bufs[wkr].retptr64[0]), 0,
705 : : rte_memory_order_release);
706 : :
707 : 3 : d->returns.start = d->returns.count = 0;
708 : : }
709 : :
710 : : /* creates a distributor instance */
711 : : struct rte_distributor *
712 : 6 : rte_distributor_create(const char *name,
713 : : unsigned int socket_id,
714 : : unsigned int num_workers,
715 : : unsigned int alg_type)
716 : : {
717 : : struct rte_distributor *d;
718 : : struct rte_dist_burst_list *dist_burst_list;
719 : : char mz_name[RTE_MEMZONE_NAMESIZE];
720 : : const struct rte_memzone *mz;
721 : : unsigned int i;
722 : :
723 : : /* TODO Reorganise function properly around RTE_DIST_ALG_SINGLE/BURST */
724 : :
725 : : /* compilation-time checks */
726 : : RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
727 : : RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
728 : :
729 [ + + + + ]: 6 : if (name == NULL || num_workers >=
730 : : (unsigned int)RTE_MIN(RTE_DISTRIB_MAX_WORKERS, RTE_MAX_LCORE)) {
731 : 4 : rte_errno = EINVAL;
732 : 4 : return NULL;
733 : : }
734 : :
735 [ + + ]: 2 : if (alg_type == RTE_DIST_ALG_SINGLE) {
736 : 1 : d = malloc(sizeof(struct rte_distributor));
737 [ - + ]: 1 : if (d == NULL) {
738 : 0 : rte_errno = ENOMEM;
739 : 0 : return NULL;
740 : : }
741 : 1 : d->d_single = rte_distributor_create_single(name,
742 : : socket_id, num_workers);
743 [ - + ]: 1 : if (d->d_single == NULL) {
744 : 0 : free(d);
745 : : /* rte_errno will have been set */
746 : 0 : return NULL;
747 : : }
748 : 1 : d->alg_type = alg_type;
749 : 1 : return d;
750 : : }
751 : :
752 : : snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
753 : 1 : mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
754 [ - + ]: 1 : if (mz == NULL) {
755 : 0 : rte_errno = ENOMEM;
756 : 0 : return NULL;
757 : : }
758 : :
759 : 1 : d = mz->addr;
760 : 1 : strlcpy(d->name, name, sizeof(d->name));
761 : 1 : d->num_workers = num_workers;
762 : 1 : d->alg_type = alg_type;
763 : :
764 : 1 : d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
765 : : #if defined(RTE_ARCH_X86)
766 [ + - ]: 1 : if (rte_vect_get_max_simd_bitwidth() >= RTE_VECT_SIMD_128)
767 : 1 : d->dist_match_fn = RTE_DIST_MATCH_VECTOR;
768 : : #endif
769 : :
770 : : /*
771 : : * Set up the backlog tags so they're pointing at the second cache
772 : : * line for performance during flow matching
773 : : */
774 [ + + ]: 2 : for (i = 0 ; i < num_workers ; i++)
775 : 1 : d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE];
776 : :
777 : 1 : memset(d->active, 0, sizeof(d->active));
778 : 1 : d->activesum = 0;
779 : :
780 : 1 : dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head,
781 : : rte_dist_burst_list);
782 : :
783 : :
784 : 1 : rte_mcfg_tailq_write_lock();
785 : 1 : TAILQ_INSERT_TAIL(dist_burst_list, d, next);
786 : 1 : rte_mcfg_tailq_write_unlock();
787 : :
788 : 1 : return d;
789 : : }
|