]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/lib/event/reactor.c
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / spdk / lib / event / reactor.c
1 /*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34 #include "spdk_internal/event.h"
35
36 #include <assert.h>
37 #include <stdio.h>
38 #include <stdbool.h>
39 #include <string.h>
40 #include <unistd.h>
41
42 #ifdef __linux__
43 #include <sys/prctl.h>
44 #endif
45
46 #ifdef __FreeBSD__
47 #include <pthread_np.h>
48 #endif
49
50 #include <rte_config.h>
51 #include <rte_ring.h>
52
53 #include "spdk/log.h"
54 #include "spdk/io_channel.h"
55 #include "spdk/env.h"
56
57 #define SPDK_MAX_SOCKET 64
58
59 #define SPDK_REACTOR_SPIN_TIME_US 1000
60 #define SPDK_TIMER_POLL_ITERATIONS 5
61 #define SPDK_EVENT_BATCH_SIZE 8
62
63 enum spdk_poller_state {
64 /* The poller is registered with a reactor but not currently executing its fn. */
65 SPDK_POLLER_STATE_WAITING,
66
67 /* The poller is currently running its fn. */
68 SPDK_POLLER_STATE_RUNNING,
69
70 /* The poller was unregistered during the execution of its fn. */
71 SPDK_POLLER_STATE_UNREGISTERED,
72 };
73
74 struct spdk_poller {
75 TAILQ_ENTRY(spdk_poller) tailq;
76 uint32_t lcore;
77
78 /* Current state of the poller; should only be accessed from the poller's thread. */
79 enum spdk_poller_state state;
80
81 uint64_t period_ticks;
82 uint64_t next_run_tick;
83 spdk_poller_fn fn;
84 void *arg;
85
86 struct spdk_event *unregister_complete_event;
87 };
88
89 enum spdk_reactor_state {
90 SPDK_REACTOR_STATE_INVALID = 0,
91 SPDK_REACTOR_STATE_INITIALIZED = 1,
92 SPDK_REACTOR_STATE_RUNNING = 2,
93 SPDK_REACTOR_STATE_EXITING = 3,
94 SPDK_REACTOR_STATE_SHUTDOWN = 4,
95 };
96
97 struct spdk_reactor {
98 /* Logical core number for this reactor. */
99 uint32_t lcore;
100
101 /* Socket ID for this reactor. */
102 uint32_t socket_id;
103
104 /*
105 * Contains pollers actively running on this reactor. Pollers
106 * are run round-robin. The reactor takes one poller from the head
107 * of the ring, executes it, then puts it back at the tail of
108 * the ring.
109 */
110 TAILQ_HEAD(, spdk_poller) active_pollers;
111
112 /**
113 * Contains pollers running on this reactor with a periodic timer.
114 */
115 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers;
116
117 struct rte_ring *events;
118
119 /* Pointer to the per-socket g_spdk_event_mempool for this reactor. */
120 struct spdk_mempool *event_mempool;
121
122 uint64_t max_delay_us;
123 } __attribute__((aligned(64)));
124
125 static struct spdk_reactor g_reactors[RTE_MAX_LCORE];
126
127 static enum spdk_reactor_state g_reactor_state = SPDK_REACTOR_STATE_INVALID;
128
129 static void spdk_reactor_construct(struct spdk_reactor *w, uint32_t lcore,
130 uint64_t max_delay_us);
131
132 static struct spdk_mempool *g_spdk_event_mempool[SPDK_MAX_SOCKET];
133
134 /** \file
135
136 */
137
138 static struct spdk_reactor *
139 spdk_reactor_get(uint32_t lcore)
140 {
141 struct spdk_reactor *reactor;
142 reactor = &g_reactors[lcore];
143 return reactor;
144 }
145
146 struct spdk_event *
147 spdk_event_allocate(uint32_t lcore, spdk_event_fn fn, void *arg1, void *arg2)
148 {
149 struct spdk_event *event = NULL;
150 struct spdk_reactor *reactor = spdk_reactor_get(lcore);
151
152 event = spdk_mempool_get(reactor->event_mempool);
153 if (event == NULL) {
154 assert(false);
155 return NULL;
156 }
157
158 event->lcore = lcore;
159 event->fn = fn;
160 event->arg1 = arg1;
161 event->arg2 = arg2;
162
163 return event;
164 }
165
166 void
167 spdk_event_call(struct spdk_event *event)
168 {
169 int rc;
170 struct spdk_reactor *reactor;
171
172 reactor = spdk_reactor_get(event->lcore);
173
174 assert(reactor->events != NULL);
175 rc = rte_ring_mp_enqueue(reactor->events, event);
176 if (rc != 0) {
177 assert(false);
178 }
179 }
180
181 static inline uint32_t
182 _spdk_event_queue_run_batch(struct spdk_reactor *reactor)
183 {
184 unsigned count, i;
185 void *events[SPDK_EVENT_BATCH_SIZE];
186
187 #ifdef DEBUG
188 /*
189 * rte_ring_dequeue_burst() fills events and returns how many entries it wrote,
190 * so we will never actually read uninitialized data from events, but just to be sure
191 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
192 */
193 memset(events, 0, sizeof(events));
194 #endif
195
196 count = rte_ring_sc_dequeue_burst(reactor->events, events, SPDK_EVENT_BATCH_SIZE);
197 if (count == 0) {
198 return 0;
199 }
200
201 for (i = 0; i < count; i++) {
202 struct spdk_event *event = events[i];
203
204 assert(event != NULL);
205 event->fn(event->arg1, event->arg2);
206 }
207
208 spdk_mempool_put_bulk(reactor->event_mempool, events, count);
209
210 return count;
211 }
212
213 uint32_t
214 spdk_event_queue_run_batch(uint32_t lcore)
215 {
216 return _spdk_event_queue_run_batch(spdk_reactor_get(lcore));
217 }
218
219 /**
220
221 \brief Set current reactor thread name to "reactor <cpu #>".
222
223 This makes the reactor threads distinguishable in top and gdb.
224
225 */
226 static void set_reactor_thread_name(uint32_t lcore)
227 {
228 char thread_name[16];
229
230 snprintf(thread_name, sizeof(thread_name), "reactor_%u", lcore);
231
232 #if defined(__linux__)
233 prctl(PR_SET_NAME, thread_name, 0, 0, 0);
234 #elif defined(__FreeBSD__)
235 pthread_set_name_np(pthread_self(), thread_name);
236 #else
237 #error missing platform support for thread name
238 #endif
239 }
240
241 static void
242 spdk_poller_insert_timer(struct spdk_reactor *reactor, struct spdk_poller *poller, uint64_t now)
243 {
244 struct spdk_poller *iter;
245 uint64_t next_run_tick;
246
247 next_run_tick = now + poller->period_ticks;
248 poller->next_run_tick = next_run_tick;
249
250 /*
251 * Insert poller in the reactor's timer_pollers list in sorted order by next scheduled
252 * run time.
253 */
254 TAILQ_FOREACH_REVERSE(iter, &reactor->timer_pollers, timer_pollers_head, tailq) {
255 if (iter->next_run_tick <= next_run_tick) {
256 TAILQ_INSERT_AFTER(&reactor->timer_pollers, iter, poller, tailq);
257 return;
258 }
259 }
260
261 /* No earlier pollers were found, so this poller must be the new head */
262 TAILQ_INSERT_HEAD(&reactor->timer_pollers, poller, tailq);
263 }
264
265 static void
266 _spdk_poller_unregister_complete(struct spdk_poller *poller)
267 {
268 if (poller->unregister_complete_event) {
269 spdk_event_call(poller->unregister_complete_event);
270 }
271
272 free(poller);
273 }
274
275 /**
276
277 \brief This is the main function of the reactor thread.
278
279 \code
280
281 while (1)
282 if (events to run)
283 dequeue and run a batch of events
284
285 if (active pollers)
286 run the first poller in the list and move it to the back
287
288 if (first timer poller has expired)
289 run the first timer poller and reinsert it in the timer list
290
291 if (idle for at least SPDK_REACTOR_SPIN_TIME_US)
292 sleep until next timer poller is scheduled to expire
293 \endcode
294
295 */
296 static int
297 _spdk_reactor_run(void *arg)
298 {
299 struct spdk_reactor *reactor = arg;
300 struct spdk_poller *poller;
301 uint32_t event_count;
302 uint64_t idle_started, now;
303 uint64_t spin_cycles, sleep_cycles;
304 uint32_t sleep_us;
305 uint32_t timer_poll_count;
306
307 spdk_allocate_thread();
308 set_reactor_thread_name(reactor->lcore);
309 SPDK_NOTICELOG("Reactor started on core %u on socket %u\n", reactor->lcore,
310 reactor->socket_id);
311
312 spin_cycles = SPDK_REACTOR_SPIN_TIME_US * spdk_get_ticks_hz() / 1000000ULL;
313 sleep_cycles = reactor->max_delay_us * spdk_get_ticks_hz() / 1000000ULL;
314 idle_started = 0;
315 timer_poll_count = 0;
316
317 while (1) {
318 bool took_action = false;
319
320 event_count = _spdk_event_queue_run_batch(reactor);
321 if (event_count > 0) {
322 took_action = true;
323 }
324
325 poller = TAILQ_FIRST(&reactor->active_pollers);
326 if (poller) {
327 TAILQ_REMOVE(&reactor->active_pollers, poller, tailq);
328 poller->state = SPDK_POLLER_STATE_RUNNING;
329 poller->fn(poller->arg);
330 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
331 _spdk_poller_unregister_complete(poller);
332 } else {
333 poller->state = SPDK_POLLER_STATE_WAITING;
334 TAILQ_INSERT_TAIL(&reactor->active_pollers, poller, tailq);
335 }
336 took_action = true;
337 }
338
339 if (timer_poll_count >= SPDK_TIMER_POLL_ITERATIONS) {
340 poller = TAILQ_FIRST(&reactor->timer_pollers);
341 if (poller) {
342 now = spdk_get_ticks();
343
344 if (now >= poller->next_run_tick) {
345 TAILQ_REMOVE(&reactor->timer_pollers, poller, tailq);
346 poller->state = SPDK_POLLER_STATE_RUNNING;
347 poller->fn(poller->arg);
348 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
349 _spdk_poller_unregister_complete(poller);
350 } else {
351 poller->state = SPDK_POLLER_STATE_WAITING;
352 spdk_poller_insert_timer(reactor, poller, now);
353 }
354 took_action = true;
355 }
356 }
357 timer_poll_count = 0;
358 } else {
359 timer_poll_count++;
360 }
361
362 if (took_action) {
363 /* We were busy this loop iteration. Reset the idle timer. */
364 idle_started = 0;
365 } else if (idle_started == 0) {
366 /* We were previously busy, but this loop we took no actions. */
367 idle_started = spdk_get_ticks();
368 }
369
370 /* Determine if the thread can sleep */
371 if (sleep_cycles && idle_started) {
372 now = spdk_get_ticks();
373 if (now >= (idle_started + spin_cycles)) {
374 sleep_us = reactor->max_delay_us;
375
376 poller = TAILQ_FIRST(&reactor->timer_pollers);
377 if (poller) {
378 /* There are timers registered, so don't sleep beyond
379 * when the next timer should fire */
380 if (poller->next_run_tick < (now + sleep_cycles)) {
381 if (poller->next_run_tick <= now) {
382 sleep_us = 0;
383 } else {
384 sleep_us = ((poller->next_run_tick - now) * 1000000ULL) / spdk_get_ticks_hz();
385 }
386 }
387 }
388
389 if (sleep_us > 0) {
390 usleep(sleep_us);
391 }
392
393 /* After sleeping, always poll for timers */
394 timer_poll_count = SPDK_TIMER_POLL_ITERATIONS;
395 }
396 }
397
398 if (g_reactor_state != SPDK_REACTOR_STATE_RUNNING) {
399 break;
400 }
401 }
402
403 spdk_free_thread();
404 return 0;
405 }
406
407 static void
408 spdk_reactor_construct(struct spdk_reactor *reactor, uint32_t lcore, uint64_t max_delay_us)
409 {
410 char ring_name[64];
411
412 reactor->lcore = lcore;
413 reactor->socket_id = spdk_env_get_socket_id(lcore);
414 assert(reactor->socket_id < SPDK_MAX_SOCKET);
415 reactor->max_delay_us = max_delay_us;
416
417 TAILQ_INIT(&reactor->active_pollers);
418 TAILQ_INIT(&reactor->timer_pollers);
419
420 snprintf(ring_name, sizeof(ring_name) - 1, "spdk_event_queue_%u", lcore);
421 reactor->events =
422 rte_ring_create(ring_name, 65536, reactor->socket_id, RING_F_SC_DEQ);
423 assert(reactor->events != NULL);
424
425 reactor->event_mempool = g_spdk_event_mempool[reactor->socket_id];
426 }
427
428 static void
429 spdk_reactor_start(struct spdk_reactor *reactor)
430 {
431 if (reactor->lcore != rte_get_master_lcore()) {
432 switch (rte_eal_get_lcore_state(reactor->lcore)) {
433 case FINISHED:
434 rte_eal_wait_lcore(reactor->lcore);
435 /* drop through */
436 case WAIT:
437 rte_eal_remote_launch(_spdk_reactor_run, (void *)reactor, reactor->lcore);
438 break;
439 case RUNNING:
440 printf("Something already running on lcore %d\n", reactor->lcore);
441 break;
442 }
443 } else {
444 _spdk_reactor_run(reactor);
445 }
446 }
447
448 int
449 spdk_app_get_core_count(void)
450 {
451 return spdk_env_get_core_count();
452 }
453
454 uint32_t
455 spdk_app_get_current_core(void)
456 {
457 return spdk_env_get_current_core();
458 }
459
460 int
461 spdk_app_parse_core_mask(const char *mask, uint64_t *cpumask)
462 {
463 unsigned int i;
464 char *end;
465
466 if (mask == NULL || cpumask == NULL) {
467 return -1;
468 }
469
470 errno = 0;
471 *cpumask = strtoull(mask, &end, 16);
472 if (*end != '\0' || errno) {
473 return -1;
474 }
475
476 for (i = 0; i < RTE_MAX_LCORE && i < 64; i++) {
477 if ((*cpumask & (1ULL << i)) && !rte_lcore_is_enabled(i)) {
478 *cpumask &= ~(1ULL << i);
479 }
480 }
481
482 return 0;
483 }
484
485 uint64_t
486 spdk_app_get_core_mask(void)
487 {
488 uint32_t i;
489 uint64_t mask = 0;
490
491 SPDK_ENV_FOREACH_CORE(i) {
492 mask |= 1ULL << i;
493 }
494
495 return mask;
496 }
497
498
499 static uint64_t
500 spdk_reactor_get_socket_mask(void)
501 {
502 uint32_t i;
503 uint32_t socket_id;
504 uint64_t socket_info = 0;
505
506 SPDK_ENV_FOREACH_CORE(i) {
507 socket_id = spdk_env_get_socket_id(i);
508 socket_info |= (1ULL << socket_id);
509 }
510
511 return socket_info;
512 }
513
514 void
515 spdk_reactors_start(void)
516 {
517 struct spdk_reactor *reactor;
518 uint32_t i, current_core;
519
520 assert(rte_get_master_lcore() == rte_lcore_id());
521
522 g_reactor_state = SPDK_REACTOR_STATE_RUNNING;
523
524 current_core = spdk_env_get_current_core();
525 SPDK_ENV_FOREACH_CORE(i) {
526 if (i != current_core) {
527 reactor = spdk_reactor_get(i);
528 spdk_reactor_start(reactor);
529 }
530 }
531
532 /* Start the master reactor */
533 reactor = spdk_reactor_get(current_core);
534 spdk_reactor_start(reactor);
535
536 rte_eal_mp_wait_lcore();
537
538 g_reactor_state = SPDK_REACTOR_STATE_SHUTDOWN;
539 }
540
541 void spdk_reactors_stop(void)
542 {
543 g_reactor_state = SPDK_REACTOR_STATE_EXITING;
544 }
545
546 int
547 spdk_reactors_init(unsigned int max_delay_us)
548 {
549 uint32_t i, j;
550 struct spdk_reactor *reactor;
551 uint64_t socket_mask = 0x0;
552 uint8_t socket_count = 0;
553 char mempool_name[32];
554
555 socket_mask = spdk_reactor_get_socket_mask();
556 printf("Occupied cpu socket mask is 0x%lx\n", socket_mask);
557
558 for (i = 0; i < SPDK_MAX_SOCKET; i++) {
559 if ((1ULL << i) & socket_mask) {
560 socket_count++;
561 }
562 }
563 if (socket_count == 0) {
564 printf("No sockets occupied (internal error)\n");
565 return -1;
566 }
567
568 for (i = 0; i < SPDK_MAX_SOCKET; i++) {
569 if ((1ULL << i) & socket_mask) {
570 snprintf(mempool_name, sizeof(mempool_name), "spdk_event_mempool_%d", i);
571 g_spdk_event_mempool[i] = spdk_mempool_create(mempool_name,
572 (262144 / socket_count),
573 sizeof(struct spdk_event), -1, i);
574
575 if (g_spdk_event_mempool[i] == NULL) {
576 SPDK_ERRLOG("spdk_event_mempool creation failed on socket %d\n", i);
577
578 /*
579 * Instead of failing the operation directly, try to create
580 * the mempool on any available sockets in the case that
581 * memory is not evenly installed on all sockets. If still
582 * fails, free all allocated memory and exits.
583 */
584 g_spdk_event_mempool[i] = spdk_mempool_create(
585 mempool_name,
586 (262144 / socket_count),
587 sizeof(struct spdk_event), -1,
588 SPDK_ENV_SOCKET_ID_ANY);
589
590 if (g_spdk_event_mempool[i] == NULL) {
591 for (j = i - 1; j < i; j--) {
592 if (g_spdk_event_mempool[j] != NULL) {
593 spdk_mempool_free(g_spdk_event_mempool[j]);
594 }
595 }
596 SPDK_ERRLOG("spdk_event_mempool creation failed\n");
597 return -1;
598 }
599 }
600 }
601 }
602
603 SPDK_ENV_FOREACH_CORE(i) {
604 reactor = spdk_reactor_get(i);
605 spdk_reactor_construct(reactor, i, max_delay_us);
606 }
607
608 g_reactor_state = SPDK_REACTOR_STATE_INITIALIZED;
609
610 return 0;
611 }
612
613 int
614 spdk_reactors_fini(void)
615 {
616 uint32_t i;
617 uint64_t socket_mask;
618 struct spdk_reactor *reactor;
619
620 SPDK_ENV_FOREACH_CORE(i) {
621 reactor = spdk_reactor_get(i);
622 if (reactor->events != NULL) {
623 rte_ring_free(reactor->events);
624 }
625 }
626
627 socket_mask = spdk_reactor_get_socket_mask();
628 for (i = 0; i < SPDK_MAX_SOCKET; i++) {
629 if ((1ULL << i) & socket_mask && g_spdk_event_mempool[i] != NULL) {
630 spdk_mempool_free(g_spdk_event_mempool[i]);
631 }
632 }
633
634 return 0;
635 }
636
637 static void
638 _spdk_poller_register(struct spdk_reactor *reactor, struct spdk_poller *poller)
639 {
640 if (poller->period_ticks) {
641 spdk_poller_insert_timer(reactor, poller, spdk_get_ticks());
642 } else {
643 TAILQ_INSERT_TAIL(&reactor->active_pollers, poller, tailq);
644 }
645 }
646
647 static void
648 _spdk_event_add_poller(void *arg1, void *arg2)
649 {
650 struct spdk_reactor *reactor = arg1;
651 struct spdk_poller *poller = arg2;
652
653 _spdk_poller_register(reactor, poller);
654 }
655
656 void
657 spdk_poller_register(struct spdk_poller **ppoller, spdk_poller_fn fn, void *arg,
658 uint32_t lcore, uint64_t period_microseconds)
659 {
660 struct spdk_poller *poller;
661 struct spdk_reactor *reactor;
662
663 poller = calloc(1, sizeof(*poller));
664 if (poller == NULL) {
665 SPDK_ERRLOG("Poller memory allocation failed\n");
666 abort();
667 }
668
669 poller->lcore = lcore;
670 poller->state = SPDK_POLLER_STATE_WAITING;
671 poller->fn = fn;
672 poller->arg = arg;
673
674 if (period_microseconds) {
675 poller->period_ticks = (spdk_get_ticks_hz() * period_microseconds) / 1000000ULL;
676 } else {
677 poller->period_ticks = 0;
678 }
679
680 if (*ppoller != NULL) {
681 SPDK_ERRLOG("Attempted reuse of poller pointer\n");
682 abort();
683 }
684
685 if (lcore >= RTE_MAX_LCORE) {
686 SPDK_ERRLOG("Attempted use lcore %u larger than max lcore %u\n",
687 lcore, RTE_MAX_LCORE - 1);
688 abort();
689 }
690
691 *ppoller = poller;
692 reactor = spdk_reactor_get(lcore);
693
694 if (lcore == spdk_env_get_current_core()) {
695 /*
696 * The poller is registered to run on the current core, so call the add function
697 * directly.
698 */
699 _spdk_poller_register(reactor, poller);
700 } else {
701 /*
702 * The poller is registered to run on a different core.
703 * Schedule an event to run on the poller's core that will add the poller.
704 */
705 spdk_event_call(spdk_event_allocate(lcore, _spdk_event_add_poller, reactor, poller));
706 }
707 }
708
709 static void
710 _spdk_poller_unregister(struct spdk_reactor *reactor, struct spdk_poller *poller,
711 struct spdk_event *next)
712 {
713 assert(poller->lcore == reactor->lcore);
714 assert(poller->lcore == spdk_env_get_current_core());
715
716 poller->unregister_complete_event = next;
717
718 if (poller->state == SPDK_POLLER_STATE_RUNNING) {
719 /*
720 * We are being called from the poller_fn, so set the state to unregistered
721 * and let the reactor loop free the poller.
722 */
723 poller->state = SPDK_POLLER_STATE_UNREGISTERED;
724 } else {
725 /* Poller is not running currently, so just free it. */
726 if (poller->period_ticks) {
727 TAILQ_REMOVE(&reactor->timer_pollers, poller, tailq);
728 } else {
729 TAILQ_REMOVE(&reactor->active_pollers, poller, tailq);
730 }
731
732 _spdk_poller_unregister_complete(poller);
733 }
734 }
735
736 static void
737 _spdk_event_remove_poller(void *arg1, void *arg2)
738 {
739 struct spdk_poller *poller = arg1;
740 struct spdk_reactor *reactor = spdk_reactor_get(poller->lcore);
741 struct spdk_event *next = arg2;
742
743 _spdk_poller_unregister(reactor, poller, next);
744 }
745
746 void
747 spdk_poller_unregister(struct spdk_poller **ppoller,
748 struct spdk_event *complete)
749 {
750 struct spdk_poller *poller;
751 uint32_t lcore;
752
753 poller = *ppoller;
754
755 *ppoller = NULL;
756
757 if (poller == NULL) {
758 if (complete) {
759 spdk_event_call(complete);
760 }
761 return;
762 }
763
764 lcore = poller->lcore;
765
766 if (lcore == spdk_env_get_current_core()) {
767 /*
768 * The poller is registered on the current core, so call the remove function
769 * directly.
770 */
771 _spdk_poller_unregister(spdk_reactor_get(lcore), poller, complete);
772 } else {
773 /*
774 * The poller is registered on a different core.
775 * Schedule an event to run on the poller's core that will remove the poller.
776 */
777 spdk_event_call(spdk_event_allocate(lcore, _spdk_event_remove_poller, poller, complete));
778 }
779 }