4 * Copyright (c) Intel Corporation.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 #include "spdk_internal/event.h"
43 #include <sys/prctl.h>
47 #include <pthread_np.h>
50 #include <rte_config.h>
54 #include "spdk/io_channel.h"
57 #define SPDK_MAX_SOCKET 64
59 #define SPDK_REACTOR_SPIN_TIME_US 1000
60 #define SPDK_TIMER_POLL_ITERATIONS 5
61 #define SPDK_EVENT_BATCH_SIZE 8
63 enum spdk_poller_state
{
64 /* The poller is registered with a reactor but not currently executing its fn. */
65 SPDK_POLLER_STATE_WAITING
,
67 /* The poller is currently running its fn. */
68 SPDK_POLLER_STATE_RUNNING
,
70 /* The poller was unregistered during the execution of its fn. */
71 SPDK_POLLER_STATE_UNREGISTERED
,
75 TAILQ_ENTRY(spdk_poller
) tailq
;
78 /* Current state of the poller; should only be accessed from the poller's thread. */
79 enum spdk_poller_state state
;
81 uint64_t period_ticks
;
82 uint64_t next_run_tick
;
86 struct spdk_event
*unregister_complete_event
;
89 enum spdk_reactor_state
{
90 SPDK_REACTOR_STATE_INVALID
= 0,
91 SPDK_REACTOR_STATE_INITIALIZED
= 1,
92 SPDK_REACTOR_STATE_RUNNING
= 2,
93 SPDK_REACTOR_STATE_EXITING
= 3,
94 SPDK_REACTOR_STATE_SHUTDOWN
= 4,
98 /* Logical core number for this reactor. */
101 /* Socket ID for this reactor. */
105 * Contains pollers actively running on this reactor. Pollers
106 * are run round-robin. The reactor takes one poller from the head
107 * of the ring, executes it, then puts it back at the tail of
110 TAILQ_HEAD(, spdk_poller
) active_pollers
;
113 * Contains pollers running on this reactor with a periodic timer.
115 TAILQ_HEAD(timer_pollers_head
, spdk_poller
) timer_pollers
;
117 struct rte_ring
*events
;
119 /* Pointer to the per-socket g_spdk_event_mempool for this reactor. */
120 struct spdk_mempool
*event_mempool
;
122 uint64_t max_delay_us
;
123 } __attribute__((aligned(64)));
125 static struct spdk_reactor g_reactors
[RTE_MAX_LCORE
];
127 static enum spdk_reactor_state g_reactor_state
= SPDK_REACTOR_STATE_INVALID
;
129 static void spdk_reactor_construct(struct spdk_reactor
*w
, uint32_t lcore
,
130 uint64_t max_delay_us
);
132 static struct spdk_mempool
*g_spdk_event_mempool
[SPDK_MAX_SOCKET
];
138 static struct spdk_reactor
*
139 spdk_reactor_get(uint32_t lcore
)
141 struct spdk_reactor
*reactor
;
142 reactor
= &g_reactors
[lcore
];
147 spdk_event_allocate(uint32_t lcore
, spdk_event_fn fn
, void *arg1
, void *arg2
)
149 struct spdk_event
*event
= NULL
;
150 struct spdk_reactor
*reactor
= spdk_reactor_get(lcore
);
152 event
= spdk_mempool_get(reactor
->event_mempool
);
158 event
->lcore
= lcore
;
167 spdk_event_call(struct spdk_event
*event
)
170 struct spdk_reactor
*reactor
;
172 reactor
= spdk_reactor_get(event
->lcore
);
174 assert(reactor
->events
!= NULL
);
175 rc
= rte_ring_mp_enqueue(reactor
->events
, event
);
181 static inline uint32_t
182 _spdk_event_queue_run_batch(struct spdk_reactor
*reactor
)
185 void *events
[SPDK_EVENT_BATCH_SIZE
];
189 * rte_ring_dequeue_burst() fills events and returns how many entries it wrote,
190 * so we will never actually read uninitialized data from events, but just to be sure
191 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
193 memset(events
, 0, sizeof(events
));
196 count
= rte_ring_sc_dequeue_burst(reactor
->events
, events
, SPDK_EVENT_BATCH_SIZE
);
201 for (i
= 0; i
< count
; i
++) {
202 struct spdk_event
*event
= events
[i
];
204 assert(event
!= NULL
);
205 event
->fn(event
->arg1
, event
->arg2
);
208 spdk_mempool_put_bulk(reactor
->event_mempool
, events
, count
);
214 spdk_event_queue_run_batch(uint32_t lcore
)
216 return _spdk_event_queue_run_batch(spdk_reactor_get(lcore
));
221 \brief Set current reactor thread name to "reactor <cpu #>".
223 This makes the reactor threads distinguishable in top and gdb.
226 static void set_reactor_thread_name(uint32_t lcore
)
228 char thread_name
[16];
230 snprintf(thread_name
, sizeof(thread_name
), "reactor_%u", lcore
);
232 #if defined(__linux__)
233 prctl(PR_SET_NAME
, thread_name
, 0, 0, 0);
234 #elif defined(__FreeBSD__)
235 pthread_set_name_np(pthread_self(), thread_name
);
237 #error missing platform support for thread name
242 spdk_poller_insert_timer(struct spdk_reactor
*reactor
, struct spdk_poller
*poller
, uint64_t now
)
244 struct spdk_poller
*iter
;
245 uint64_t next_run_tick
;
247 next_run_tick
= now
+ poller
->period_ticks
;
248 poller
->next_run_tick
= next_run_tick
;
251 * Insert poller in the reactor's timer_pollers list in sorted order by next scheduled
254 TAILQ_FOREACH_REVERSE(iter
, &reactor
->timer_pollers
, timer_pollers_head
, tailq
) {
255 if (iter
->next_run_tick
<= next_run_tick
) {
256 TAILQ_INSERT_AFTER(&reactor
->timer_pollers
, iter
, poller
, tailq
);
261 /* No earlier pollers were found, so this poller must be the new head */
262 TAILQ_INSERT_HEAD(&reactor
->timer_pollers
, poller
, tailq
);
266 _spdk_poller_unregister_complete(struct spdk_poller
*poller
)
268 if (poller
->unregister_complete_event
) {
269 spdk_event_call(poller
->unregister_complete_event
);
277 \brief This is the main function of the reactor thread.
283 dequeue and run a batch of events
286 run the first poller in the list and move it to the back
288 if (first timer poller has expired)
289 run the first timer poller and reinsert it in the timer list
291 if (idle for at least SPDK_REACTOR_SPIN_TIME_US)
292 sleep until next timer poller is scheduled to expire
297 _spdk_reactor_run(void *arg
)
299 struct spdk_reactor
*reactor
= arg
;
300 struct spdk_poller
*poller
;
301 uint32_t event_count
;
302 uint64_t idle_started
, now
;
303 uint64_t spin_cycles
, sleep_cycles
;
305 uint32_t timer_poll_count
;
307 spdk_allocate_thread();
308 set_reactor_thread_name(reactor
->lcore
);
309 SPDK_NOTICELOG("Reactor started on core %u on socket %u\n", reactor
->lcore
,
312 spin_cycles
= SPDK_REACTOR_SPIN_TIME_US
* spdk_get_ticks_hz() / 1000000ULL;
313 sleep_cycles
= reactor
->max_delay_us
* spdk_get_ticks_hz() / 1000000ULL;
315 timer_poll_count
= 0;
318 bool took_action
= false;
320 event_count
= _spdk_event_queue_run_batch(reactor
);
321 if (event_count
> 0) {
325 poller
= TAILQ_FIRST(&reactor
->active_pollers
);
327 TAILQ_REMOVE(&reactor
->active_pollers
, poller
, tailq
);
328 poller
->state
= SPDK_POLLER_STATE_RUNNING
;
329 poller
->fn(poller
->arg
);
330 if (poller
->state
== SPDK_POLLER_STATE_UNREGISTERED
) {
331 _spdk_poller_unregister_complete(poller
);
333 poller
->state
= SPDK_POLLER_STATE_WAITING
;
334 TAILQ_INSERT_TAIL(&reactor
->active_pollers
, poller
, tailq
);
339 if (timer_poll_count
>= SPDK_TIMER_POLL_ITERATIONS
) {
340 poller
= TAILQ_FIRST(&reactor
->timer_pollers
);
342 now
= spdk_get_ticks();
344 if (now
>= poller
->next_run_tick
) {
345 TAILQ_REMOVE(&reactor
->timer_pollers
, poller
, tailq
);
346 poller
->state
= SPDK_POLLER_STATE_RUNNING
;
347 poller
->fn(poller
->arg
);
348 if (poller
->state
== SPDK_POLLER_STATE_UNREGISTERED
) {
349 _spdk_poller_unregister_complete(poller
);
351 poller
->state
= SPDK_POLLER_STATE_WAITING
;
352 spdk_poller_insert_timer(reactor
, poller
, now
);
357 timer_poll_count
= 0;
363 /* We were busy this loop iteration. Reset the idle timer. */
365 } else if (idle_started
== 0) {
366 /* We were previously busy, but this loop we took no actions. */
367 idle_started
= spdk_get_ticks();
370 /* Determine if the thread can sleep */
371 if (sleep_cycles
&& idle_started
) {
372 now
= spdk_get_ticks();
373 if (now
>= (idle_started
+ spin_cycles
)) {
374 sleep_us
= reactor
->max_delay_us
;
376 poller
= TAILQ_FIRST(&reactor
->timer_pollers
);
378 /* There are timers registered, so don't sleep beyond
379 * when the next timer should fire */
380 if (poller
->next_run_tick
< (now
+ sleep_cycles
)) {
381 if (poller
->next_run_tick
<= now
) {
384 sleep_us
= ((poller
->next_run_tick
- now
) * 1000000ULL) / spdk_get_ticks_hz();
393 /* After sleeping, always poll for timers */
394 timer_poll_count
= SPDK_TIMER_POLL_ITERATIONS
;
398 if (g_reactor_state
!= SPDK_REACTOR_STATE_RUNNING
) {
408 spdk_reactor_construct(struct spdk_reactor
*reactor
, uint32_t lcore
, uint64_t max_delay_us
)
412 reactor
->lcore
= lcore
;
413 reactor
->socket_id
= spdk_env_get_socket_id(lcore
);
414 assert(reactor
->socket_id
< SPDK_MAX_SOCKET
);
415 reactor
->max_delay_us
= max_delay_us
;
417 TAILQ_INIT(&reactor
->active_pollers
);
418 TAILQ_INIT(&reactor
->timer_pollers
);
420 snprintf(ring_name
, sizeof(ring_name
) - 1, "spdk_event_queue_%u", lcore
);
422 rte_ring_create(ring_name
, 65536, reactor
->socket_id
, RING_F_SC_DEQ
);
423 assert(reactor
->events
!= NULL
);
425 reactor
->event_mempool
= g_spdk_event_mempool
[reactor
->socket_id
];
429 spdk_reactor_start(struct spdk_reactor
*reactor
)
431 if (reactor
->lcore
!= rte_get_master_lcore()) {
432 switch (rte_eal_get_lcore_state(reactor
->lcore
)) {
434 rte_eal_wait_lcore(reactor
->lcore
);
437 rte_eal_remote_launch(_spdk_reactor_run
, (void *)reactor
, reactor
->lcore
);
440 printf("Something already running on lcore %d\n", reactor
->lcore
);
444 _spdk_reactor_run(reactor
);
449 spdk_app_get_core_count(void)
451 return spdk_env_get_core_count();
455 spdk_app_get_current_core(void)
457 return spdk_env_get_current_core();
461 spdk_app_parse_core_mask(const char *mask
, uint64_t *cpumask
)
466 if (mask
== NULL
|| cpumask
== NULL
) {
471 *cpumask
= strtoull(mask
, &end
, 16);
472 if (*end
!= '\0' || errno
) {
476 for (i
= 0; i
< RTE_MAX_LCORE
&& i
< 64; i
++) {
477 if ((*cpumask
& (1ULL << i
)) && !rte_lcore_is_enabled(i
)) {
478 *cpumask
&= ~(1ULL << i
);
486 spdk_app_get_core_mask(void)
491 SPDK_ENV_FOREACH_CORE(i
) {
500 spdk_reactor_get_socket_mask(void)
504 uint64_t socket_info
= 0;
506 SPDK_ENV_FOREACH_CORE(i
) {
507 socket_id
= spdk_env_get_socket_id(i
);
508 socket_info
|= (1ULL << socket_id
);
515 spdk_reactors_start(void)
517 struct spdk_reactor
*reactor
;
518 uint32_t i
, current_core
;
520 assert(rte_get_master_lcore() == rte_lcore_id());
522 g_reactor_state
= SPDK_REACTOR_STATE_RUNNING
;
524 current_core
= spdk_env_get_current_core();
525 SPDK_ENV_FOREACH_CORE(i
) {
526 if (i
!= current_core
) {
527 reactor
= spdk_reactor_get(i
);
528 spdk_reactor_start(reactor
);
532 /* Start the master reactor */
533 reactor
= spdk_reactor_get(current_core
);
534 spdk_reactor_start(reactor
);
536 rte_eal_mp_wait_lcore();
538 g_reactor_state
= SPDK_REACTOR_STATE_SHUTDOWN
;
541 void spdk_reactors_stop(void)
543 g_reactor_state
= SPDK_REACTOR_STATE_EXITING
;
547 spdk_reactors_init(unsigned int max_delay_us
)
550 struct spdk_reactor
*reactor
;
551 uint64_t socket_mask
= 0x0;
552 uint8_t socket_count
= 0;
553 char mempool_name
[32];
555 socket_mask
= spdk_reactor_get_socket_mask();
556 printf("Occupied cpu socket mask is 0x%lx\n", socket_mask
);
558 for (i
= 0; i
< SPDK_MAX_SOCKET
; i
++) {
559 if ((1ULL << i
) & socket_mask
) {
563 if (socket_count
== 0) {
564 printf("No sockets occupied (internal error)\n");
568 for (i
= 0; i
< SPDK_MAX_SOCKET
; i
++) {
569 if ((1ULL << i
) & socket_mask
) {
570 snprintf(mempool_name
, sizeof(mempool_name
), "spdk_event_mempool_%d", i
);
571 g_spdk_event_mempool
[i
] = spdk_mempool_create(mempool_name
,
572 (262144 / socket_count
),
573 sizeof(struct spdk_event
), -1, i
);
575 if (g_spdk_event_mempool
[i
] == NULL
) {
576 SPDK_ERRLOG("spdk_event_mempool creation failed on socket %d\n", i
);
579 * Instead of failing the operation directly, try to create
580 * the mempool on any available sockets in the case that
581 * memory is not evenly installed on all sockets. If still
582 * fails, free all allocated memory and exits.
584 g_spdk_event_mempool
[i
] = spdk_mempool_create(
586 (262144 / socket_count
),
587 sizeof(struct spdk_event
), -1,
588 SPDK_ENV_SOCKET_ID_ANY
);
590 if (g_spdk_event_mempool
[i
] == NULL
) {
591 for (j
= i
- 1; j
< i
; j
--) {
592 if (g_spdk_event_mempool
[j
] != NULL
) {
593 spdk_mempool_free(g_spdk_event_mempool
[j
]);
596 SPDK_ERRLOG("spdk_event_mempool creation failed\n");
603 SPDK_ENV_FOREACH_CORE(i
) {
604 reactor
= spdk_reactor_get(i
);
605 spdk_reactor_construct(reactor
, i
, max_delay_us
);
608 g_reactor_state
= SPDK_REACTOR_STATE_INITIALIZED
;
614 spdk_reactors_fini(void)
617 uint64_t socket_mask
;
618 struct spdk_reactor
*reactor
;
620 SPDK_ENV_FOREACH_CORE(i
) {
621 reactor
= spdk_reactor_get(i
);
622 if (reactor
->events
!= NULL
) {
623 rte_ring_free(reactor
->events
);
627 socket_mask
= spdk_reactor_get_socket_mask();
628 for (i
= 0; i
< SPDK_MAX_SOCKET
; i
++) {
629 if ((1ULL << i
) & socket_mask
&& g_spdk_event_mempool
[i
] != NULL
) {
630 spdk_mempool_free(g_spdk_event_mempool
[i
]);
638 _spdk_poller_register(struct spdk_reactor
*reactor
, struct spdk_poller
*poller
)
640 if (poller
->period_ticks
) {
641 spdk_poller_insert_timer(reactor
, poller
, spdk_get_ticks());
643 TAILQ_INSERT_TAIL(&reactor
->active_pollers
, poller
, tailq
);
648 _spdk_event_add_poller(void *arg1
, void *arg2
)
650 struct spdk_reactor
*reactor
= arg1
;
651 struct spdk_poller
*poller
= arg2
;
653 _spdk_poller_register(reactor
, poller
);
657 spdk_poller_register(struct spdk_poller
**ppoller
, spdk_poller_fn fn
, void *arg
,
658 uint32_t lcore
, uint64_t period_microseconds
)
660 struct spdk_poller
*poller
;
661 struct spdk_reactor
*reactor
;
663 poller
= calloc(1, sizeof(*poller
));
664 if (poller
== NULL
) {
665 SPDK_ERRLOG("Poller memory allocation failed\n");
669 poller
->lcore
= lcore
;
670 poller
->state
= SPDK_POLLER_STATE_WAITING
;
674 if (period_microseconds
) {
675 poller
->period_ticks
= (spdk_get_ticks_hz() * period_microseconds
) / 1000000ULL;
677 poller
->period_ticks
= 0;
680 if (*ppoller
!= NULL
) {
681 SPDK_ERRLOG("Attempted reuse of poller pointer\n");
685 if (lcore
>= RTE_MAX_LCORE
) {
686 SPDK_ERRLOG("Attempted use lcore %u larger than max lcore %u\n",
687 lcore
, RTE_MAX_LCORE
- 1);
692 reactor
= spdk_reactor_get(lcore
);
694 if (lcore
== spdk_env_get_current_core()) {
696 * The poller is registered to run on the current core, so call the add function
699 _spdk_poller_register(reactor
, poller
);
702 * The poller is registered to run on a different core.
703 * Schedule an event to run on the poller's core that will add the poller.
705 spdk_event_call(spdk_event_allocate(lcore
, _spdk_event_add_poller
, reactor
, poller
));
710 _spdk_poller_unregister(struct spdk_reactor
*reactor
, struct spdk_poller
*poller
,
711 struct spdk_event
*next
)
713 assert(poller
->lcore
== reactor
->lcore
);
714 assert(poller
->lcore
== spdk_env_get_current_core());
716 poller
->unregister_complete_event
= next
;
718 if (poller
->state
== SPDK_POLLER_STATE_RUNNING
) {
720 * We are being called from the poller_fn, so set the state to unregistered
721 * and let the reactor loop free the poller.
723 poller
->state
= SPDK_POLLER_STATE_UNREGISTERED
;
725 /* Poller is not running currently, so just free it. */
726 if (poller
->period_ticks
) {
727 TAILQ_REMOVE(&reactor
->timer_pollers
, poller
, tailq
);
729 TAILQ_REMOVE(&reactor
->active_pollers
, poller
, tailq
);
732 _spdk_poller_unregister_complete(poller
);
737 _spdk_event_remove_poller(void *arg1
, void *arg2
)
739 struct spdk_poller
*poller
= arg1
;
740 struct spdk_reactor
*reactor
= spdk_reactor_get(poller
->lcore
);
741 struct spdk_event
*next
= arg2
;
743 _spdk_poller_unregister(reactor
, poller
, next
);
747 spdk_poller_unregister(struct spdk_poller
**ppoller
,
748 struct spdk_event
*complete
)
750 struct spdk_poller
*poller
;
757 if (poller
== NULL
) {
759 spdk_event_call(complete
);
764 lcore
= poller
->lcore
;
766 if (lcore
== spdk_env_get_current_core()) {
768 * The poller is registered on the current core, so call the remove function
771 _spdk_poller_unregister(spdk_reactor_get(lcore
), poller
, complete
);
774 * The poller is registered on a different core.
775 * Schedule an event to run on the poller's core that will remove the poller.
777 spdk_event_call(spdk_event_allocate(lcore
, _spdk_event_remove_poller
, poller
, complete
));