Branch data Line data Source code
1 : : /* SPDX-License-Identifier: BSD-3-Clause
2 : : *
3 : : * Copyright (c) 2018-2020 Arm Limited
4 : : */
5 : :
6 : : #include <stdio.h>
7 : : #include <string.h>
8 : : #include <stdint.h>
9 : : #include <inttypes.h>
10 : : #include <errno.h>
11 : :
12 : : #include <rte_common.h>
13 : : #include <rte_log.h>
14 : : #include <rte_memory.h>
15 : : #include <rte_malloc.h>
16 : : #include <rte_errno.h>
17 : : #include <rte_ring_elem.h>
18 : :
19 : : #include "rte_rcu_qsbr.h"
20 : : #include "rcu_qsbr_pvt.h"
21 : :
22 : : #define RCU_LOG(level, ...) \
23 : : RTE_LOG_LINE_PREFIX(level, RCU, "%s(): ", __func__, __VA_ARGS__)
24 : :
25 : : /* Get the memory size of QSBR variable */
26 : : size_t
27 : 7929 : rte_rcu_qsbr_get_memsize(uint32_t max_threads)
28 : : {
29 : : size_t sz;
30 : :
31 [ + + ]: 7929 : if (max_threads == 0) {
32 : 1 : RCU_LOG(ERR, "Invalid max_threads %u", max_threads);
33 : 1 : rte_errno = EINVAL;
34 : :
35 : 1 : return 1;
36 : : }
37 : :
38 : : sz = sizeof(struct rte_rcu_qsbr);
39 : :
40 : : /* Add the size of quiescent state counter array */
41 : 7928 : sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
42 : :
43 : : /* Add the size of the registered thread ID bitmap array */
44 : 7928 : sz += __RTE_QSBR_THRID_ARRAY_SIZE(max_threads);
45 : :
46 : 7928 : return sz;
47 : : }
48 : :
49 : : /* Initialize a quiescent state variable */
50 : : int
51 : 7448 : rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
52 : : {
53 : : size_t sz;
54 : :
55 [ + + ]: 7448 : if (v == NULL) {
56 : 1 : RCU_LOG(ERR, "Invalid input parameter");
57 : 1 : rte_errno = EINVAL;
58 : :
59 : 1 : return 1;
60 : : }
61 : :
62 : 7447 : sz = rte_rcu_qsbr_get_memsize(max_threads);
63 [ + - ]: 7447 : if (sz == 1)
64 : : return 1;
65 : :
66 : : /* Set all the threads to offline */
67 : : memset(v, 0, sz);
68 : 7447 : v->max_threads = max_threads;
69 : 7447 : v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
70 : : __RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
71 : : __RTE_QSBR_THRID_ARRAY_ELM_SIZE;
72 : 7447 : v->token = __RTE_QSBR_CNT_INIT;
73 : 7447 : v->acked_token = __RTE_QSBR_CNT_INIT - 1;
74 : :
75 : 7447 : return 0;
76 : : }
77 : :
78 : : /* Register a reader thread to report its quiescent state
79 : : * on a QS variable.
80 : : */
81 : : int
82 : 543 : rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
83 : : {
84 : : unsigned int i, id;
85 : : uint64_t old_bmap;
86 : :
87 [ + + + + ]: 543 : if (v == NULL || thread_id >= v->max_threads) {
88 : 3 : RCU_LOG(ERR, "Invalid input parameter");
89 : 3 : rte_errno = EINVAL;
90 : :
91 : 3 : return 1;
92 : : }
93 : :
94 : : __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u",
95 : : v->qsbr_cnt[thread_id].lock_cnt);
96 : :
97 : 540 : id = thread_id & __RTE_QSBR_THRID_MASK;
98 : 540 : i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
99 : :
100 : : /* Add the thread to the bitmap of registered threads */
101 : 540 : old_bmap = rte_atomic_fetch_or_explicit(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
102 : : RTE_BIT64(id), rte_memory_order_release);
103 : :
104 : : /* Increment the number of threads registered only if the thread was not already
105 : : * registered
106 : : */
107 [ + + ]: 540 : if (!(old_bmap & RTE_BIT64(id)))
108 : 539 : rte_atomic_fetch_add_explicit(&v->num_threads, 1, rte_memory_order_relaxed);
109 : :
110 : : return 0;
111 : : }
112 : :
113 : : /* Remove a reader thread, from the list of threads reporting their
114 : : * quiescent state on a QS variable.
115 : : */
116 : : int
117 : 276 : rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
118 : : {
119 : : unsigned int i, id;
120 : : uint64_t old_bmap;
121 : :
122 [ + + + + ]: 276 : if (v == NULL || thread_id >= v->max_threads) {
123 : 3 : RCU_LOG(ERR, "Invalid input parameter");
124 : 3 : rte_errno = EINVAL;
125 : :
126 : 3 : return 1;
127 : : }
128 : :
129 : : __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u",
130 : : v->qsbr_cnt[thread_id].lock_cnt);
131 : :
132 : 273 : id = thread_id & __RTE_QSBR_THRID_MASK;
133 : 273 : i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
134 : :
135 : : /* Make sure any loads of the shared data structure are
136 : : * completed before removal of the thread from the bitmap of
137 : : * reporting threads.
138 : : */
139 : 273 : old_bmap = rte_atomic_fetch_and_explicit(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
140 : : ~RTE_BIT64(id), rte_memory_order_release);
141 : :
142 : : /* Decrement the number of threads unregistered only if the thread was not already
143 : : * unregistered
144 : : */
145 [ + + ]: 273 : if (old_bmap & RTE_BIT64(id))
146 : 270 : rte_atomic_fetch_sub_explicit(&v->num_threads, 1, rte_memory_order_relaxed);
147 : :
148 : : return 0;
149 : : }
150 : :
151 : : /* Wait till the reader threads have entered quiescent state. */
152 : : void
153 [ + + ]: 2058 : rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
154 : : {
155 : : uint64_t t;
156 : :
157 : : RTE_ASSERT(v != NULL);
158 : :
159 : : t = rte_rcu_qsbr_start(v);
160 : :
161 : : /* If the current thread has readside critical section,
162 : : * update its quiescent state status.
163 : : */
164 [ + + ]: 2058 : if (thread_id != RTE_QSBR_THRID_INVALID)
165 : : rte_rcu_qsbr_quiescent(v, thread_id);
166 : :
167 : : /* Wait for other readers to enter quiescent state */
168 : : rte_rcu_qsbr_check(v, t, true);
169 : 2058 : }
170 : :
171 : : /* Dump the details of a single quiescent state variable to a file. */
172 : : int
173 : 6 : rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
174 : : {
175 : : uint64_t bmap;
176 : : uint32_t i, t, id;
177 : :
178 [ + + ]: 6 : if (v == NULL || f == NULL) {
179 : 3 : RCU_LOG(ERR, "Invalid input parameter");
180 : 3 : rte_errno = EINVAL;
181 : :
182 : 3 : return 1;
183 : : }
184 : :
185 : : fprintf(f, "\nQuiescent State Variable @%p\n", v);
186 : :
187 : 3 : fprintf(f, " QS variable memory size = %zu\n",
188 : : rte_rcu_qsbr_get_memsize(v->max_threads));
189 : 3 : fprintf(f, " Given # max threads = %u\n", v->max_threads);
190 : 3 : fprintf(f, " Current # threads = %u\n", v->num_threads);
191 : :
192 : : fprintf(f, " Registered thread IDs = ");
193 [ + + ]: 9 : for (i = 0; i < v->num_elems; i++) {
194 : 6 : bmap = rte_atomic_load_explicit(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
195 : : rte_memory_order_acquire);
196 : 6 : id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
197 [ + + ]: 7 : while (bmap) {
198 : : t = rte_ctz64(bmap);
199 : 1 : fprintf(f, "%u ", id + t);
200 : :
201 : 1 : bmap &= ~RTE_BIT64(t);
202 : : }
203 : : }
204 : :
205 : : fprintf(f, "\n");
206 : :
207 : 3 : fprintf(f, " Token = %" PRIu64 "\n",
208 : 3 : rte_atomic_load_explicit(&v->token, rte_memory_order_acquire));
209 : :
210 : 3 : fprintf(f, " Least Acknowledged Token = %" PRIu64 "\n",
211 : 3 : rte_atomic_load_explicit(&v->acked_token, rte_memory_order_acquire));
212 : :
213 : : fprintf(f, "Quiescent State Counts for readers:\n");
214 [ + + ]: 9 : for (i = 0; i < v->num_elems; i++) {
215 : 6 : bmap = rte_atomic_load_explicit(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
216 : : rte_memory_order_acquire);
217 : 6 : id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
218 [ + + ]: 7 : while (bmap) {
219 : : t = rte_ctz64(bmap);
220 : 1 : fprintf(f, "thread ID = %u, count = %" PRIu64 ", lock count = %u\n",
221 : : id + t,
222 : 1 : rte_atomic_load_explicit(
223 : : &v->qsbr_cnt[id + t].cnt,
224 : : rte_memory_order_relaxed),
225 : 1 : rte_atomic_load_explicit(
226 : : &v->qsbr_cnt[id + t].lock_cnt,
227 : : rte_memory_order_relaxed));
228 : 1 : bmap &= ~RTE_BIT64(t);
229 : : }
230 : : }
231 : :
232 : : return 0;
233 : : }
234 : :
235 : : /* Create a queue used to store the data structure elements that can
236 : : * be freed later. This queue is referred to as 'defer queue'.
237 : : */
238 : : struct rte_rcu_qsbr_dq *
239 : 24 : rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
240 : : {
241 : : struct rte_rcu_qsbr_dq *dq;
242 : : uint32_t qs_fifo_size;
243 : : unsigned int flags;
244 : :
245 [ + + + + ]: 24 : if (params == NULL || params->free_fn == NULL ||
246 [ + + + - ]: 21 : params->v == NULL || params->name == NULL ||
247 [ + + + + ]: 20 : params->size == 0 || params->esize == 0 ||
248 [ + + ]: 18 : (params->esize % 4 != 0)) {
249 : 9 : RCU_LOG(ERR, "Invalid input parameter");
250 : 9 : rte_errno = EINVAL;
251 : :
252 : 9 : return NULL;
253 : : }
254 : : /* If auto reclamation is configured, reclaim limit
255 : : * should be a valid value.
256 : : */
257 [ + - ]: 15 : if ((params->trigger_reclaim_limit <= params->size) &&
258 [ - + ]: 15 : (params->max_reclaim_size == 0)) {
259 : 0 : RCU_LOG(ERR,
260 : : "Invalid input parameter, size = %u, trigger_reclaim_limit = %u, "
261 : : "max_reclaim_size = %u",
262 : : params->size, params->trigger_reclaim_limit,
263 : : params->max_reclaim_size);
264 : 0 : rte_errno = EINVAL;
265 : :
266 : 0 : return NULL;
267 : : }
268 : :
269 : 15 : dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
270 : : RTE_CACHE_LINE_SIZE);
271 [ - + ]: 15 : if (dq == NULL) {
272 : 0 : rte_errno = ENOMEM;
273 : :
274 : 0 : return NULL;
275 : : }
276 : :
277 : : /* Decide the flags for the ring.
278 : : * If MT safety is requested, use RTS for ring enqueue as most
279 : : * use cases involve dq-enqueue happening on the control plane.
280 : : * Ring dequeue is always HTS due to the possibility of revert.
281 : : */
282 : : flags = RING_F_MP_RTS_ENQ;
283 [ + + ]: 15 : if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
284 : : flags = RING_F_SP_ENQ;
285 : 15 : flags |= RING_F_MC_HTS_DEQ;
286 : : /* round up qs_fifo_size to next power of two that is not less than
287 : : * max_size.
288 : : */
289 : 15 : qs_fifo_size = rte_align32pow2(params->size + 1);
290 : : /* Add token size to ring element size */
291 : 30 : dq->r = rte_ring_create_elem(params->name,
292 : 15 : __RTE_QSBR_TOKEN_SIZE + params->esize,
293 : : qs_fifo_size, SOCKET_ID_ANY, flags);
294 [ - + ]: 15 : if (dq->r == NULL) {
295 : 0 : RCU_LOG(ERR, "defer queue create failed");
296 : 0 : rte_free(dq);
297 : 0 : return NULL;
298 : : }
299 : :
300 : 15 : dq->v = params->v;
301 : 15 : dq->size = params->size;
302 : 15 : dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
303 : 15 : dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
304 : 15 : dq->max_reclaim_size = params->max_reclaim_size;
305 : 15 : dq->free_fn = params->free_fn;
306 : 15 : dq->p = params->p;
307 : :
308 : 15 : return dq;
309 : : }
310 : :
311 : : /* Enqueue one resource to the defer queue to free after the grace
312 : : * period is over.
313 : : */
314 : 1102 : int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
315 : 1102 : {
316 : : __rte_rcu_qsbr_dq_elem_t *dq_elem;
317 : : uint32_t cur_size;
318 : :
319 [ + + ]: 1102 : if (dq == NULL || e == NULL) {
320 : 3 : RCU_LOG(ERR, "Invalid input parameter");
321 : 3 : rte_errno = EINVAL;
322 : :
323 : 3 : return 1;
324 : : }
325 : :
326 : 1099 : char data[dq->esize];
327 : : dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
328 : : /* Start the grace period */
329 [ + + ]: 1099 : dq_elem->token = rte_rcu_qsbr_start(dq->v);
330 : :
331 : : /* Reclaim resources if the queue size has hit the reclaim
332 : : * limit. This helps the queue from growing too large and
333 : : * allows time for reader threads to report their quiescent state.
334 : : */
335 [ + + ]: 1099 : cur_size = rte_ring_count(dq->r);
336 [ + + ]: 1099 : if (cur_size > dq->trigger_reclaim_limit) {
337 : 1036 : RCU_LOG(INFO, "Triggering reclamation");
338 : 1036 : rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size,
339 : : NULL, NULL, NULL);
340 : : }
341 : :
342 : : /* Enqueue the token and resource. Generating the token and
343 : : * enqueuing (token + resource) on the queue is not an
344 : : * atomic operation. When the defer queue is shared by multiple
345 : : * writers, this might result in tokens enqueued out of order
346 : : * on the queue. So, some tokens might wait longer than they
347 : : * are required to be reclaimed.
348 : : */
349 [ - + + - : 1099 : memcpy(dq_elem->elem, e, dq->esize - __RTE_QSBR_TOKEN_SIZE);
- ]
350 : : /* Check the status as enqueue might fail since the other threads
351 : : * might have used up the freed space.
352 : : * Enqueue uses the configured flags when the DQ was created.
353 : : */
354 [ - + + - : 1099 : if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
- ]
355 : 8 : RCU_LOG(ERR, "Enqueue failed");
356 : : /* Note that the token generated above is not used.
357 : : * Other than wasting tokens, it should not cause any
358 : : * other issues.
359 : : */
360 : 8 : RCU_LOG(INFO, "Skipped enqueuing token = %" PRIu64, dq_elem->token);
361 : :
362 : 8 : rte_errno = ENOSPC;
363 : 8 : return 1;
364 : : }
365 : :
366 : 1091 : RCU_LOG(INFO, "Enqueued token = %" PRIu64, dq_elem->token);
367 : :
368 : 1091 : return 0;
369 : : }
370 : :
371 : : /* Reclaim resources from the defer queue. */
372 : : int
373 : 1064 : rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
374 : : unsigned int *freed, unsigned int *pending,
375 : : unsigned int *available)
376 : 1064 : {
377 : : uint32_t cnt;
378 : : __rte_rcu_qsbr_dq_elem_t *dq_elem;
379 : :
380 [ + + ]: 1064 : if (dq == NULL || n == 0) {
381 : 2 : RCU_LOG(ERR, "Invalid input parameter");
382 : 2 : rte_errno = EINVAL;
383 : :
384 : 2 : return 1;
385 : : }
386 : :
387 : : cnt = 0;
388 : :
389 : 1062 : char data[dq->esize];
390 : : /* Check reader threads quiescent state and reclaim resources */
391 [ + + + + ]: 4236 : while (cnt < n &&
392 [ - + - ]: 2083 : rte_ring_dequeue_bulk_elem_start(dq->r, &data,
393 : : dq->esize, 1, available) != 0) {
394 : : dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
395 : :
396 : : /* Reclaim the resource */
397 [ + + ]: 2065 : if (rte_rcu_qsbr_check(dq->v, dq_elem->token, false) != 1) {
398 [ - + - ]: 974 : rte_ring_dequeue_elem_finish(dq->r, 0);
399 : : break;
400 : : }
401 [ - + - ]: 1091 : rte_ring_dequeue_elem_finish(dq->r, 1);
402 : :
403 : 1091 : RCU_LOG(INFO, "Reclaimed token = %" PRIu64, dq_elem->token);
404 : :
405 : 1091 : dq->free_fn(dq->p, dq_elem->elem, 1);
406 : :
407 : 1091 : cnt++;
408 : : }
409 : :
410 : 1062 : RCU_LOG(INFO, "Reclaimed %u resources", cnt);
411 : :
412 [ + + ]: 1062 : if (freed != NULL)
413 : 1 : *freed = cnt;
414 [ + + ]: 1062 : if (pending != NULL)
415 : 20 : *pending = rte_ring_count(dq->r);
416 : :
417 : : return 0;
418 : : }
419 : :
420 : : /* Delete a defer queue. */
421 : : int
422 : 25 : rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
423 : : {
424 : : unsigned int pending;
425 : :
426 [ + + ]: 25 : if (dq == NULL) {
427 : 6 : RCU_LOG(DEBUG, "Invalid input parameter");
428 : :
429 : 6 : return 0;
430 : : }
431 : :
432 : : /* Reclaim all the resources */
433 : 19 : rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending, NULL);
434 [ + + ]: 19 : if (pending != 0) {
435 : 4 : rte_errno = EAGAIN;
436 : :
437 : 4 : return 1;
438 : : }
439 : :
440 : 15 : rte_ring_free(dq->r);
441 : 15 : rte_free(dq);
442 : :
443 : 15 : return 0;
444 : : }
445 : :
446 [ - + ]: 251 : RTE_LOG_REGISTER_DEFAULT(rte_rcu_log_type, ERR);
|