Branch data Line data Source code
1 : : /* SPDX-License-Identifier: BSD-3-Clause
2 : : * Copyright(c) 2018 Intel Corporation
3 : : */
4 : :
5 : : #include <stdlib.h>
6 : : #include <string.h>
7 : : #include <pthread.h>
8 : : #include <sys/time.h>
9 : :
10 : : #include <rte_errno.h>
11 : : #include <rte_string_fns.h>
12 : :
13 : : #include "eal_memalloc.h"
14 : : #include "eal_memcfg.h"
15 : : #include "eal_private.h"
16 : :
17 : : #include "malloc_elem.h"
18 : : #include "malloc_mp.h"
19 : :
20 : : #define MP_ACTION_SYNC "mp_malloc_sync"
21 : : /**< request sent by primary process to notify of changes in memory map */
22 : : #define MP_ACTION_ROLLBACK "mp_malloc_rollback"
23 : : /**< request sent by primary process to notify of changes in memory map. this is
24 : : * essentially a regular sync request, but we cannot send sync requests while
25 : : * another one is in progress, and we might have to - therefore, we do this as
26 : : * a separate callback.
27 : : */
28 : : #define MP_ACTION_REQUEST "mp_malloc_request"
29 : : /**< request sent by secondary process to ask for allocation/deallocation */
30 : : #define MP_ACTION_RESPONSE "mp_malloc_response"
31 : : /**< response sent to secondary process to indicate result of request */
32 : :
33 : : /* forward declarations */
34 : : static int
35 : : handle_sync_response(const struct rte_mp_msg *request,
36 : : const struct rte_mp_reply *reply);
37 : : static int
38 : : handle_rollback_response(const struct rte_mp_msg *request,
39 : : const struct rte_mp_reply *reply);
40 : :
41 : : #define MP_TIMEOUT_S 5 /**< 5 seconds timeouts */
42 : :
43 : : /* when we're allocating, we need to store some state to ensure that we can
44 : : * roll back later
45 : : */
46 : : struct primary_alloc_req_state {
47 : : struct malloc_heap *heap;
48 : : struct rte_memseg **ms;
49 : : int ms_len;
50 : : struct malloc_elem *elem;
51 : : void *map_addr;
52 : : size_t map_len;
53 : : };
54 : :
55 : : enum req_state {
56 : : REQ_STATE_INACTIVE = 0,
57 : : REQ_STATE_ACTIVE,
58 : : REQ_STATE_COMPLETE
59 : : };
60 : :
61 : : struct mp_request {
62 : : TAILQ_ENTRY(mp_request) next;
63 : : struct malloc_mp_req user_req; /**< contents of request */
64 : : pthread_cond_t cond; /**< variable we use to time out on this request */
65 : : enum req_state state; /**< indicate status of this request */
66 : : struct primary_alloc_req_state alloc_state;
67 : : };
68 : :
69 : : /*
70 : : * We could've used just a single request, but it may be possible for
71 : : * secondaries to timeout earlier than the primary, and send a new request while
72 : : * primary is still expecting replies to the old one. Therefore, each new
73 : : * request will get assigned a new ID, which is how we will distinguish between
74 : : * expected and unexpected messages.
75 : : */
76 : : TAILQ_HEAD(mp_request_list, mp_request);
77 : : static struct {
78 : : struct mp_request_list list;
79 : : pthread_mutex_t lock;
80 : : } mp_request_list = {
81 : : .list = TAILQ_HEAD_INITIALIZER(mp_request_list.list),
82 : : .lock = PTHREAD_MUTEX_INITIALIZER
83 : : };
84 : :
85 : : /**
86 : : * General workflow is the following:
87 : : *
88 : : * Allocation:
89 : : * S: send request to primary
90 : : * P: attempt to allocate memory
91 : : * if failed, sendmsg failure
92 : : * if success, send sync request
93 : : * S: if received msg of failure, quit
94 : : * if received sync request, synchronize memory map and reply with result
95 : : * P: if received sync request result
96 : : * if success, sendmsg success
97 : : * if failure, roll back allocation and send a rollback request
98 : : * S: if received msg of success, quit
99 : : * if received rollback request, synchronize memory map and reply with result
100 : : * P: if received sync request result
101 : : * sendmsg sync request result
102 : : * S: if received msg, quit
103 : : *
104 : : * Aside from timeouts, there are three points where we can quit:
105 : : * - if allocation failed straight away
106 : : * - if allocation and sync request succeeded
107 : : * - if allocation succeeded, sync request failed, allocation rolled back and
108 : : * rollback request received (irrespective of whether it succeeded or failed)
109 : : *
110 : : * Deallocation:
111 : : * S: send request to primary
112 : : * P: attempt to deallocate memory
113 : : * if failed, sendmsg failure
114 : : * if success, send sync request
115 : : * S: if received msg of failure, quit
116 : : * if received sync request, synchronize memory map and reply with result
117 : : * P: if received sync request result
118 : : * sendmsg sync request result
119 : : * S: if received msg, quit
120 : : *
121 : : * There is no "rollback" from deallocation, as it's safe to have some memory
122 : : * mapped in some processes - it's absent from the heap, so it won't get used.
123 : : */
124 : :
125 : : static struct mp_request *
126 : : find_request_by_id(uint64_t id)
127 : : {
128 : : struct mp_request *req;
129 [ + - - - : 1001 : TAILQ_FOREACH(req, &mp_request_list.list, next) {
+ - - + -
+ ]
130 [ - + - - : 2 : if (req->user_req.id == id)
- + - - -
- ]
131 : : break;
132 : : }
133 : : return req;
134 : : }
135 : :
136 : : /* this ID is, like, totally guaranteed to be absolutely unique. pinky swear. */
137 : : static uint64_t
138 : 998 : get_unique_id(void)
139 : : {
140 : : uint64_t id;
141 : : do {
142 : 998 : id = rte_rand();
143 [ - + ]: 998 : } while (find_request_by_id(id) != NULL);
144 : 998 : return id;
145 : : }
146 : :
147 : : /* secondary will respond to sync requests thusly */
148 : : static int
149 : 1 : handle_sync(const struct rte_mp_msg *msg, const void *peer)
150 : : {
151 : : struct rte_mp_msg reply;
152 : : const struct malloc_mp_req *req =
153 : : (const struct malloc_mp_req *)msg->param;
154 : : struct malloc_mp_req *resp =
155 : : (struct malloc_mp_req *)reply.param;
156 : : int ret;
157 : :
158 [ - + ]: 1 : if (req->t != REQ_TYPE_SYNC) {
159 : 0 : EAL_LOG(ERR, "Unexpected request from primary");
160 : 0 : return -1;
161 : : }
162 : :
163 : : memset(&reply, 0, sizeof(reply));
164 : :
165 : : reply.num_fds = 0;
166 : 1 : strlcpy(reply.name, msg->name, sizeof(reply.name));
167 : 1 : reply.len_param = sizeof(*resp);
168 : :
169 : 1 : ret = eal_memalloc_sync_with_primary();
170 : :
171 : 1 : resp->t = REQ_TYPE_SYNC;
172 : 1 : resp->id = req->id;
173 : 1 : resp->result = ret == 0 ? REQ_RESULT_SUCCESS : REQ_RESULT_FAIL;
174 : :
175 : 1 : return rte_mp_reply(&reply, peer);
176 : : }
177 : :
178 : : static int
179 : 0 : handle_free_request(const struct malloc_mp_req *m)
180 : : {
181 : : const struct rte_memseg_list *msl;
182 : : void *start, *end;
183 : : size_t len;
184 : :
185 : 0 : len = m->free_req.len;
186 : 0 : start = m->free_req.addr;
187 : 0 : end = RTE_PTR_ADD(start, len - 1);
188 : :
189 : : /* check if the requested memory actually exists */
190 : 0 : msl = rte_mem_virt2memseg_list(start);
191 [ # # ]: 0 : if (msl == NULL) {
192 : 0 : EAL_LOG(ERR, "Requested to free unknown memory");
193 : 0 : return -1;
194 : : }
195 : :
196 : : /* check if end is within the same memory region */
197 [ # # ]: 0 : if (rte_mem_virt2memseg_list(end) != msl) {
198 : 0 : EAL_LOG(ERR, "Requested to free memory spanning multiple regions");
199 : 0 : return -1;
200 : : }
201 : :
202 : : /* we're supposed to only free memory that's not external */
203 [ # # ]: 0 : if (msl->external) {
204 : 0 : EAL_LOG(ERR, "Requested to free external memory");
205 : 0 : return -1;
206 : : }
207 : :
208 : : /* now that we've validated the request, announce it */
209 : 0 : eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
210 : 0 : m->free_req.addr, m->free_req.len);
211 : :
212 : : /* now, do the actual freeing */
213 : 0 : return malloc_heap_free_pages(m->free_req.addr, m->free_req.len);
214 : : }
215 : :
216 : : static int
217 : 1 : handle_alloc_request(const struct malloc_mp_req *m,
218 : : struct mp_request *req)
219 : : {
220 : 1 : struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
221 : : const struct malloc_req_alloc *ar = &m->alloc_req;
222 : : struct malloc_heap *heap;
223 : : struct malloc_elem *elem;
224 : : struct rte_memseg **ms;
225 : : size_t alloc_sz;
226 : : int n_segs;
227 : : void *map_addr;
228 : :
229 : : /* this is checked by the API, but we need to prevent divide by zero */
230 [ + - + - ]: 1 : if (ar->page_sz == 0 || !rte_is_power_of_2(ar->page_sz)) {
231 : 0 : EAL_LOG(ERR, "Attempting to allocate with invalid page size");
232 : 0 : return -1;
233 : : }
234 : :
235 : : /* heap idx is index into the heap array, not socket ID */
236 [ - + ]: 1 : if (ar->malloc_heap_idx >= RTE_MAX_HEAPS) {
237 : 0 : EAL_LOG(ERR, "Attempting to allocate from invalid heap");
238 : 0 : return -1;
239 : : }
240 : :
241 : 1 : heap = &mcfg->malloc_heaps[ar->malloc_heap_idx];
242 : :
243 : : /*
244 : : * for allocations, we must only use internal heaps, but since the
245 : : * rte_malloc_heap_socket_is_external() is thread-safe and we're already
246 : : * read-locked, we'll have to take advantage of the fact that internal
247 : : * socket ID's are always lower than RTE_MAX_NUMA_NODES.
248 : : */
249 [ - + ]: 1 : if (heap->socket_id >= RTE_MAX_NUMA_NODES) {
250 : 0 : EAL_LOG(ERR, "Attempting to allocate from external heap");
251 : 0 : return -1;
252 : : }
253 : :
254 : 1 : alloc_sz = RTE_ALIGN_CEIL(RTE_ALIGN_CEIL(ar->elt_size, ar->align) +
255 : : MALLOC_ELEM_OVERHEAD, ar->page_sz);
256 : 1 : n_segs = alloc_sz / ar->page_sz;
257 : :
258 : : /* we can't know in advance how many pages we'll need, so we malloc */
259 : 1 : ms = malloc(sizeof(*ms) * n_segs);
260 [ - + ]: 1 : if (ms == NULL) {
261 : 0 : EAL_LOG(ERR, "Couldn't allocate memory for request state");
262 : 0 : return -1;
263 : : }
264 : : memset(ms, 0, sizeof(*ms) * n_segs);
265 : :
266 : 1 : elem = alloc_pages_on_heap(heap, ar->page_sz, ar->elt_size, ar->socket,
267 : 1 : ar->flags, ar->align, ar->bound, ar->contig, ms,
268 : : n_segs);
269 : :
270 [ - + ]: 1 : if (elem == NULL)
271 : 0 : goto fail;
272 : :
273 : 1 : map_addr = ms[0]->addr;
274 : :
275 : 1 : eal_memalloc_mem_event_notify(RTE_MEM_EVENT_ALLOC, map_addr, alloc_sz);
276 : :
277 : : /* we have succeeded in allocating memory, but we still need to sync
278 : : * with other processes. however, since DPDK IPC is single-threaded, we
279 : : * send an asynchronous request and exit this callback.
280 : : */
281 : :
282 : 1 : req->alloc_state.ms = ms;
283 : 1 : req->alloc_state.ms_len = n_segs;
284 : 1 : req->alloc_state.map_addr = map_addr;
285 : 1 : req->alloc_state.map_len = alloc_sz;
286 : 1 : req->alloc_state.elem = elem;
287 : 1 : req->alloc_state.heap = heap;
288 : :
289 : 1 : return 0;
290 : : fail:
291 : 0 : free(ms);
292 : 0 : return -1;
293 : : }
294 : :
295 : : /* first stage of primary handling requests from secondary */
296 : : static int
297 : 1 : handle_request(const struct rte_mp_msg *msg, const void *peer __rte_unused)
298 : : {
299 : 1 : const struct malloc_mp_req *m =
300 : : (const struct malloc_mp_req *)msg->param;
301 : : struct mp_request *entry;
302 : : int ret;
303 : :
304 : : /* lock access to request */
305 : 1 : pthread_mutex_lock(&mp_request_list.lock);
306 : :
307 : : /* make sure it's not a dupe */
308 : 1 : entry = find_request_by_id(m->id);
309 [ - + ]: 1 : if (entry != NULL) {
310 : 0 : EAL_LOG(ERR, "Duplicate request id");
311 : 0 : goto fail;
312 : : }
313 : :
314 : 1 : entry = malloc(sizeof(*entry));
315 [ - + ]: 1 : if (entry == NULL) {
316 : 0 : EAL_LOG(ERR, "Unable to allocate memory for request");
317 : 0 : goto fail;
318 : : }
319 : :
320 : : /* erase all data */
321 : : memset(entry, 0, sizeof(*entry));
322 : :
323 [ + - ]: 1 : if (m->t == REQ_TYPE_ALLOC) {
324 : 1 : ret = handle_alloc_request(m, entry);
325 [ # # ]: 0 : } else if (m->t == REQ_TYPE_FREE) {
326 : 0 : ret = handle_free_request(m);
327 : : } else {
328 : 0 : EAL_LOG(ERR, "Unexpected request from secondary");
329 : 0 : goto fail;
330 : : }
331 : :
332 [ - + ]: 1 : if (ret != 0) {
333 : : struct rte_mp_msg resp_msg;
334 : : struct malloc_mp_req *resp =
335 : : (struct malloc_mp_req *)resp_msg.param;
336 : :
337 : : /* send failure message straight away */
338 : 0 : resp_msg.num_fds = 0;
339 : 0 : resp_msg.len_param = sizeof(*resp);
340 : : strlcpy(resp_msg.name, MP_ACTION_RESPONSE,
341 : : sizeof(resp_msg.name));
342 : :
343 : 0 : resp->t = m->t;
344 : 0 : resp->result = REQ_RESULT_FAIL;
345 : 0 : resp->id = m->id;
346 : :
347 [ # # ]: 0 : if (rte_mp_sendmsg(&resp_msg)) {
348 : 0 : EAL_LOG(ERR, "Couldn't send response");
349 : 0 : goto fail;
350 : : }
351 : : /* we did not modify the request */
352 : 0 : free(entry);
353 : : } else {
354 : : struct rte_mp_msg sr_msg;
355 : : struct malloc_mp_req *sr =
356 : : (struct malloc_mp_req *)sr_msg.param;
357 : : struct timespec ts;
358 : :
359 : : memset(&sr_msg, 0, sizeof(sr_msg));
360 : :
361 : : /* we can do something, so send sync request asynchronously */
362 : : sr_msg.num_fds = 0;
363 : 1 : sr_msg.len_param = sizeof(*sr);
364 : : strlcpy(sr_msg.name, MP_ACTION_SYNC, sizeof(sr_msg.name));
365 : :
366 : 1 : ts.tv_nsec = 0;
367 : 1 : ts.tv_sec = MP_TIMEOUT_S;
368 : :
369 : : /* sync requests carry no data */
370 : 1 : sr->t = REQ_TYPE_SYNC;
371 : 1 : sr->id = m->id;
372 : :
373 : : /* there may be stray timeout still waiting */
374 : : do {
375 : 1 : ret = rte_mp_request_async(&sr_msg, &ts,
376 : : handle_sync_response);
377 [ - + - - ]: 1 : } while (ret != 0 && rte_errno == EEXIST);
378 [ - + ]: 1 : if (ret != 0) {
379 : 0 : EAL_LOG(ERR, "Couldn't send sync request");
380 [ # # ]: 0 : if (m->t == REQ_TYPE_ALLOC)
381 : 0 : free(entry->alloc_state.ms);
382 : 0 : goto fail;
383 : : }
384 : :
385 : : /* mark request as in progress */
386 : 1 : memcpy(&entry->user_req, m, sizeof(*m));
387 : 1 : entry->state = REQ_STATE_ACTIVE;
388 : :
389 : 1 : TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
390 : : }
391 : 1 : pthread_mutex_unlock(&mp_request_list.lock);
392 : 1 : return 0;
393 : 0 : fail:
394 : 0 : pthread_mutex_unlock(&mp_request_list.lock);
395 : 0 : free(entry);
396 : 0 : return -1;
397 : : }
398 : :
399 : : /* callback for asynchronous sync requests for primary. this will either do a
400 : : * sendmsg with results, or trigger rollback request.
401 : : */
402 : : static int
403 : 1 : handle_sync_response(const struct rte_mp_msg *request,
404 : : const struct rte_mp_reply *reply)
405 : : {
406 : : enum malloc_req_result result;
407 : : struct mp_request *entry;
408 : : const struct malloc_mp_req *mpreq =
409 : : (const struct malloc_mp_req *)request->param;
410 : : int i;
411 : :
412 : : /* lock the request */
413 : 1 : pthread_mutex_lock(&mp_request_list.lock);
414 : :
415 : 1 : entry = find_request_by_id(mpreq->id);
416 [ - + ]: 1 : if (entry == NULL) {
417 : 0 : EAL_LOG(ERR, "Wrong request ID");
418 : 0 : goto fail;
419 : : }
420 : :
421 : : result = REQ_RESULT_SUCCESS;
422 : :
423 [ - + ]: 1 : if (reply->nb_received != reply->nb_sent)
424 : : result = REQ_RESULT_FAIL;
425 : :
426 [ + + ]: 2 : for (i = 0; i < reply->nb_received; i++) {
427 : : struct malloc_mp_req *resp =
428 : 1 : (struct malloc_mp_req *)reply->msgs[i].param;
429 : :
430 [ - + ]: 1 : if (resp->t != REQ_TYPE_SYNC) {
431 : 0 : EAL_LOG(ERR, "Unexpected response to sync request");
432 : : result = REQ_RESULT_FAIL;
433 : 0 : break;
434 : : }
435 [ - + ]: 1 : if (resp->id != entry->user_req.id) {
436 : 0 : EAL_LOG(ERR, "Response to wrong sync request");
437 : : result = REQ_RESULT_FAIL;
438 : 0 : break;
439 : : }
440 [ + - ]: 1 : if (resp->result == REQ_RESULT_FAIL) {
441 : : result = REQ_RESULT_FAIL;
442 : : break;
443 : : }
444 : : }
445 : :
446 [ - + ]: 1 : if (entry->user_req.t == REQ_TYPE_FREE) {
447 : : struct rte_mp_msg msg;
448 : : struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
449 : :
450 : : memset(&msg, 0, sizeof(msg));
451 : :
452 : : /* this is a free request, just sendmsg result */
453 : 0 : resp->t = REQ_TYPE_FREE;
454 : 0 : resp->result = result;
455 : 0 : resp->id = entry->user_req.id;
456 : : msg.num_fds = 0;
457 : 0 : msg.len_param = sizeof(*resp);
458 : : strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
459 : :
460 [ # # ]: 0 : if (rte_mp_sendmsg(&msg))
461 : 0 : EAL_LOG(ERR, "Could not send message to secondary process");
462 : :
463 [ # # ]: 0 : TAILQ_REMOVE(&mp_request_list.list, entry, next);
464 : 0 : free(entry);
465 [ + - + - ]: 1 : } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
466 : 1 : result == REQ_RESULT_SUCCESS) {
467 : 1 : struct malloc_heap *heap = entry->alloc_state.heap;
468 : : struct rte_mp_msg msg;
469 : : struct malloc_mp_req *resp =
470 : : (struct malloc_mp_req *)msg.param;
471 : :
472 : : memset(&msg, 0, sizeof(msg));
473 : :
474 : 1 : heap->total_size += entry->alloc_state.map_len;
475 : :
476 : : /* result is success, so just notify secondary about this */
477 : : resp->t = REQ_TYPE_ALLOC;
478 : 1 : resp->result = result;
479 : 1 : resp->id = entry->user_req.id;
480 : : msg.num_fds = 0;
481 : 1 : msg.len_param = sizeof(*resp);
482 : : strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
483 : :
484 [ - + ]: 1 : if (rte_mp_sendmsg(&msg))
485 : 0 : EAL_LOG(ERR, "Could not send message to secondary process");
486 : :
487 [ - + ]: 1 : TAILQ_REMOVE(&mp_request_list.list, entry, next);
488 : 1 : free(entry->alloc_state.ms);
489 : 1 : free(entry);
490 [ # # # # ]: 0 : } else if (entry->user_req.t == REQ_TYPE_ALLOC &&
491 : 0 : result == REQ_RESULT_FAIL) {
492 : : struct rte_mp_msg rb_msg;
493 : : struct malloc_mp_req *rb =
494 : : (struct malloc_mp_req *)rb_msg.param;
495 : : struct timespec ts;
496 : : struct primary_alloc_req_state *state =
497 : : &entry->alloc_state;
498 : : int ret;
499 : :
500 : : memset(&rb_msg, 0, sizeof(rb_msg));
501 : :
502 : : /* we've failed to sync, so do a rollback */
503 : 0 : eal_memalloc_mem_event_notify(RTE_MEM_EVENT_FREE,
504 : 0 : state->map_addr, state->map_len);
505 : :
506 : 0 : rollback_expand_heap(state->ms, state->ms_len, state->elem,
507 : : state->map_addr, state->map_len);
508 : :
509 : : /* send rollback request */
510 : 0 : rb_msg.num_fds = 0;
511 : 0 : rb_msg.len_param = sizeof(*rb);
512 : : strlcpy(rb_msg.name, MP_ACTION_ROLLBACK, sizeof(rb_msg.name));
513 : :
514 : 0 : ts.tv_nsec = 0;
515 : 0 : ts.tv_sec = MP_TIMEOUT_S;
516 : :
517 : : /* sync requests carry no data */
518 : 0 : rb->t = REQ_TYPE_SYNC;
519 : 0 : rb->id = entry->user_req.id;
520 : :
521 : : /* there may be stray timeout still waiting */
522 : : do {
523 : 0 : ret = rte_mp_request_async(&rb_msg, &ts,
524 : : handle_rollback_response);
525 [ # # # # ]: 0 : } while (ret != 0 && rte_errno == EEXIST);
526 [ # # ]: 0 : if (ret != 0) {
527 : 0 : EAL_LOG(ERR, "Could not send rollback request to secondary process");
528 : :
529 : : /* we couldn't send rollback request, but that's OK -
530 : : * secondary will time out, and memory has been removed
531 : : * from heap anyway.
532 : : */
533 [ # # ]: 0 : TAILQ_REMOVE(&mp_request_list.list, entry, next);
534 : 0 : free(state->ms);
535 : 0 : free(entry);
536 : 0 : goto fail;
537 : : }
538 : : } else {
539 : 0 : EAL_LOG(ERR, " to sync request of unknown type");
540 : 0 : goto fail;
541 : : }
542 : :
543 : 1 : pthread_mutex_unlock(&mp_request_list.lock);
544 : 1 : return 0;
545 : 0 : fail:
546 : 0 : pthread_mutex_unlock(&mp_request_list.lock);
547 : 0 : return -1;
548 : : }
549 : :
550 : : static int
551 : 0 : handle_rollback_response(const struct rte_mp_msg *request,
552 : : const struct rte_mp_reply *reply __rte_unused)
553 : : {
554 : : struct rte_mp_msg msg;
555 : : struct malloc_mp_req *resp = (struct malloc_mp_req *)msg.param;
556 : : const struct malloc_mp_req *mpreq =
557 : : (const struct malloc_mp_req *)request->param;
558 : : struct mp_request *entry;
559 : :
560 : : /* lock the request */
561 : 0 : pthread_mutex_lock(&mp_request_list.lock);
562 : :
563 : : memset(&msg, 0, sizeof(msg));
564 : :
565 : 0 : entry = find_request_by_id(mpreq->id);
566 [ # # ]: 0 : if (entry == NULL) {
567 : 0 : EAL_LOG(ERR, "Wrong request ID");
568 : 0 : goto fail;
569 : : }
570 : :
571 [ # # ]: 0 : if (entry->user_req.t != REQ_TYPE_ALLOC) {
572 : 0 : EAL_LOG(ERR, "Unexpected active request");
573 : 0 : goto fail;
574 : : }
575 : :
576 : : /* we don't care if rollback succeeded, request still failed */
577 : : resp->t = REQ_TYPE_ALLOC;
578 : 0 : resp->result = REQ_RESULT_FAIL;
579 : 0 : resp->id = mpreq->id;
580 : : msg.num_fds = 0;
581 : 0 : msg.len_param = sizeof(*resp);
582 : : strlcpy(msg.name, MP_ACTION_RESPONSE, sizeof(msg.name));
583 : :
584 [ # # ]: 0 : if (rte_mp_sendmsg(&msg))
585 : 0 : EAL_LOG(ERR, "Could not send message to secondary process");
586 : :
587 : : /* clean up */
588 [ # # ]: 0 : TAILQ_REMOVE(&mp_request_list.list, entry, next);
589 : 0 : free(entry->alloc_state.ms);
590 : 0 : free(entry);
591 : :
592 : 0 : pthread_mutex_unlock(&mp_request_list.lock);
593 : 0 : return 0;
594 : 0 : fail:
595 : 0 : pthread_mutex_unlock(&mp_request_list.lock);
596 : 0 : return -1;
597 : : }
598 : :
599 : : /* final stage of the request from secondary */
600 : : static int
601 : 1 : handle_response(const struct rte_mp_msg *msg, const void *peer __rte_unused)
602 : : {
603 : : const struct malloc_mp_req *m =
604 : : (const struct malloc_mp_req *)msg->param;
605 : : struct mp_request *entry;
606 : :
607 : 1 : pthread_mutex_lock(&mp_request_list.lock);
608 : :
609 : 1 : entry = find_request_by_id(m->id);
610 [ + - ]: 1 : if (entry != NULL) {
611 : : /* update request status */
612 : 1 : entry->user_req.result = m->result;
613 : :
614 : 1 : entry->state = REQ_STATE_COMPLETE;
615 : :
616 : : /* trigger thread wakeup */
617 : 1 : pthread_cond_signal(&entry->cond);
618 : : }
619 : :
620 : 1 : pthread_mutex_unlock(&mp_request_list.lock);
621 : :
622 : 1 : return 0;
623 : : }
624 : :
625 : : /* synchronously request memory map sync, this is only called whenever primary
626 : : * process initiates the allocation.
627 : : */
628 : : int
629 : 997 : request_sync(void)
630 : : {
631 : : struct rte_mp_msg msg;
632 : : struct rte_mp_reply reply;
633 : : struct malloc_mp_req *req = (struct malloc_mp_req *)msg.param;
634 : : struct timespec ts;
635 : : int i, ret = -1;
636 : :
637 : : memset(&msg, 0, sizeof(msg));
638 : : memset(&reply, 0, sizeof(reply));
639 : :
640 : : /* no need to create tailq entries as this is entirely synchronous */
641 : :
642 : : msg.num_fds = 0;
643 : 997 : msg.len_param = sizeof(*req);
644 : : strlcpy(msg.name, MP_ACTION_SYNC, sizeof(msg.name));
645 : :
646 : : /* sync request carries no data */
647 : 997 : req->t = REQ_TYPE_SYNC;
648 : 997 : req->id = get_unique_id();
649 : :
650 : 997 : ts.tv_nsec = 0;
651 : 997 : ts.tv_sec = MP_TIMEOUT_S;
652 : :
653 : : /* there may be stray timeout still waiting */
654 : : do {
655 : 997 : ret = rte_mp_request_sync(&msg, &reply, &ts);
656 [ + + - + ]: 997 : } while (ret != 0 && rte_errno == EEXIST);
657 [ + + ]: 997 : if (ret != 0) {
658 : : /* if IPC is unsupported, behave as if the call succeeded */
659 [ - + ]: 3 : if (rte_errno != ENOTSUP)
660 : 0 : EAL_LOG(ERR, "Could not send sync request to secondary process");
661 : : else
662 : : ret = 0;
663 : 3 : goto out;
664 : : }
665 : :
666 [ + - ]: 994 : if (reply.nb_received != reply.nb_sent) {
667 : 0 : EAL_LOG(ERR, "Not all secondaries have responded");
668 : 0 : goto out;
669 : : }
670 : :
671 [ - + ]: 994 : for (i = 0; i < reply.nb_received; i++) {
672 : : struct malloc_mp_req *resp =
673 : 0 : (struct malloc_mp_req *)reply.msgs[i].param;
674 [ # # ]: 0 : if (resp->t != REQ_TYPE_SYNC) {
675 : 0 : EAL_LOG(ERR, "Unexpected response from secondary");
676 : 0 : goto out;
677 : : }
678 [ # # ]: 0 : if (resp->id != req->id) {
679 : 0 : EAL_LOG(ERR, "Wrong request ID");
680 : 0 : goto out;
681 : : }
682 [ # # ]: 0 : if (resp->result != REQ_RESULT_SUCCESS) {
683 : 0 : EAL_LOG(ERR, "Secondary process failed to synchronize");
684 : 0 : goto out;
685 : : }
686 : : }
687 : :
688 : : ret = 0;
689 : 997 : out:
690 : 997 : free(reply.msgs);
691 : 997 : return ret;
692 : : }
693 : :
694 : : /* this is a synchronous wrapper around a bunch of asynchronous requests to
695 : : * primary process. this will initiate a request and wait until responses come.
696 : : */
697 : : int
698 : 1 : request_to_primary(struct malloc_mp_req *user_req)
699 : : {
700 : : struct rte_mp_msg msg;
701 : : struct malloc_mp_req *msg_req = (struct malloc_mp_req *)msg.param;
702 : : struct mp_request *entry;
703 : : struct timespec ts;
704 : : struct timeval now;
705 : : int ret;
706 : :
707 : : memset(&msg, 0, sizeof(msg));
708 : : memset(&ts, 0, sizeof(ts));
709 : :
710 : 1 : pthread_mutex_lock(&mp_request_list.lock);
711 : :
712 : 1 : entry = malloc(sizeof(*entry));
713 [ - + ]: 1 : if (entry == NULL) {
714 : 0 : EAL_LOG(ERR, "Cannot allocate memory for request");
715 : 0 : goto fail;
716 : : }
717 : :
718 : : memset(entry, 0, sizeof(*entry));
719 : :
720 [ - + ]: 1 : if (gettimeofday(&now, NULL) < 0) {
721 : 0 : EAL_LOG(ERR, "Cannot get current time");
722 : 0 : goto fail;
723 : : }
724 : :
725 : 1 : ts.tv_nsec = (now.tv_usec * 1000) % 1000000000;
726 : 1 : ts.tv_sec = now.tv_sec + MP_TIMEOUT_S +
727 : 1 : (now.tv_usec * 1000) / 1000000000;
728 : :
729 : : /* initialize the request */
730 : 1 : pthread_cond_init(&entry->cond, NULL);
731 : :
732 : 1 : msg.num_fds = 0;
733 : 1 : msg.len_param = sizeof(*msg_req);
734 : : strlcpy(msg.name, MP_ACTION_REQUEST, sizeof(msg.name));
735 : :
736 : : /* (attempt to) get a unique id */
737 : 1 : user_req->id = get_unique_id();
738 : :
739 : : /* copy contents of user request into the message */
740 : : memcpy(msg_req, user_req, sizeof(*msg_req));
741 : :
742 [ - + ]: 1 : if (rte_mp_sendmsg(&msg)) {
743 : 0 : EAL_LOG(ERR, "Cannot send message to primary");
744 : 0 : goto fail;
745 : : }
746 : :
747 : : /* copy contents of user request into active request */
748 : 1 : memcpy(&entry->user_req, user_req, sizeof(*user_req));
749 : :
750 : : /* mark request as in progress */
751 : 1 : entry->state = REQ_STATE_ACTIVE;
752 : :
753 : 1 : TAILQ_INSERT_TAIL(&mp_request_list.list, entry, next);
754 : :
755 : : /* finally, wait on timeout */
756 : : do {
757 : 1 : ret = pthread_cond_timedwait(&entry->cond,
758 : : &mp_request_list.lock, &ts);
759 [ - + ]: 1 : } while ((ret != 0 && ret != ETIMEDOUT) &&
760 [ # # ]: 0 : entry->state == REQ_STATE_ACTIVE);
761 : :
762 [ - + ]: 1 : if (entry->state != REQ_STATE_COMPLETE) {
763 : 0 : EAL_LOG(ERR, "Request timed out");
764 : : ret = -1;
765 : : } else {
766 : : ret = 0;
767 : 1 : user_req->result = entry->user_req.result;
768 : : }
769 [ - + ]: 1 : TAILQ_REMOVE(&mp_request_list.list, entry, next);
770 : 1 : free(entry);
771 : :
772 : 1 : pthread_mutex_unlock(&mp_request_list.lock);
773 : 1 : return ret;
774 : 0 : fail:
775 : 0 : pthread_mutex_unlock(&mp_request_list.lock);
776 : 0 : free(entry);
777 : 0 : return -1;
778 : : }
779 : :
780 : : int
781 : 179 : register_mp_requests(void)
782 : : {
783 [ + + ]: 179 : if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
784 : : /* it's OK for primary to not support IPC */
785 [ + + ]: 154 : if (rte_mp_action_register(MP_ACTION_REQUEST, handle_request) &&
786 [ - + ]: 7 : rte_errno != ENOTSUP) {
787 : 0 : EAL_LOG(ERR, "Couldn't register '%s' action",
788 : : MP_ACTION_REQUEST);
789 : 0 : return -1;
790 : : }
791 : : } else {
792 [ - + ]: 25 : if (rte_mp_action_register(MP_ACTION_SYNC, handle_sync)) {
793 : 0 : EAL_LOG(ERR, "Couldn't register '%s' action",
794 : : MP_ACTION_SYNC);
795 : 0 : return -1;
796 : : }
797 [ - + ]: 25 : if (rte_mp_action_register(MP_ACTION_ROLLBACK, handle_sync)) {
798 : 0 : EAL_LOG(ERR, "Couldn't register '%s' action",
799 : : MP_ACTION_SYNC);
800 : 0 : return -1;
801 : : }
802 [ - + ]: 25 : if (rte_mp_action_register(MP_ACTION_RESPONSE,
803 : : handle_response)) {
804 : 0 : EAL_LOG(ERR, "Couldn't register '%s' action",
805 : : MP_ACTION_RESPONSE);
806 : 0 : return -1;
807 : : }
808 : : }
809 : : return 0;
810 : : }
811 : :
812 : : void
813 : 251 : unregister_mp_requests(void)
814 : : {
815 [ + + ]: 251 : if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
816 : 223 : rte_mp_action_unregister(MP_ACTION_REQUEST);
817 : : } else {
818 : 28 : rte_mp_action_unregister(MP_ACTION_SYNC);
819 : 28 : rte_mp_action_unregister(MP_ACTION_ROLLBACK);
820 : 28 : rte_mp_action_unregister(MP_ACTION_RESPONSE);
821 : : }
822 : 251 : }
|