4 * Copyright (c) Intel Corporation.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 #include "spdk/stdinc.h"
35 #include "spdk/likely.h"
37 #include "spdk_internal/event.h"
38 #include "spdk_internal/log.h"
41 #include "spdk/thread.h"
43 #include "spdk/util.h"
45 #define SPDK_MAX_SOCKET 64
47 #define SPDK_EVENT_BATCH_SIZE 8
49 enum spdk_poller_state
{
50 /* The poller is registered with a reactor but not currently executing its fn. */
51 SPDK_POLLER_STATE_WAITING
,
53 /* The poller is currently running its fn. */
54 SPDK_POLLER_STATE_RUNNING
,
56 /* The poller was unregistered during the execution of its fn. */
57 SPDK_POLLER_STATE_UNREGISTERED
,
61 TAILQ_ENTRY(spdk_poller
) tailq
;
64 /* Current state of the poller; should only be accessed from the poller's thread. */
65 enum spdk_poller_state state
;
67 uint64_t period_ticks
;
68 uint64_t next_run_tick
;
73 enum spdk_reactor_state
{
74 SPDK_REACTOR_STATE_INVALID
= 0,
75 SPDK_REACTOR_STATE_INITIALIZED
= 1,
76 SPDK_REACTOR_STATE_RUNNING
= 2,
77 SPDK_REACTOR_STATE_EXITING
= 3,
78 SPDK_REACTOR_STATE_SHUTDOWN
= 4,
82 /* Logical core number for this reactor. */
85 /* Socket ID for this reactor. */
88 /* Poller for get the rusage for the reactor. */
89 struct spdk_poller
*rusage_poller
;
91 /* Reactor tsc stats */
92 struct spdk_reactor_tsc_stats tsc_stats
;
96 /* The last known rusage values */
100 * Contains pollers actively running on this reactor. Pollers
101 * are run round-robin. The reactor takes one poller from the head
102 * of the ring, executes it, then puts it back at the tail of
105 TAILQ_HEAD(, spdk_poller
) active_pollers
;
108 * Contains pollers running on this reactor with a periodic timer.
110 TAILQ_HEAD(timer_pollers_head
, spdk_poller
) timer_pollers
;
112 struct spdk_ring
*events
;
114 /* Pointer to the per-socket g_spdk_event_mempool for this reactor. */
115 struct spdk_mempool
*event_mempool
;
117 uint64_t max_delay_us
;
118 } __attribute__((aligned(64)));
120 static struct spdk_reactor
*g_reactors
;
122 static enum spdk_reactor_state g_reactor_state
= SPDK_REACTOR_STATE_INVALID
;
124 static bool g_context_switch_monitor_enabled
= true;
126 static void spdk_reactor_construct(struct spdk_reactor
*w
, uint32_t lcore
,
127 uint64_t max_delay_us
);
129 static struct spdk_mempool
*g_spdk_event_mempool
[SPDK_MAX_SOCKET
];
131 static struct spdk_cpuset
*g_spdk_app_core_mask
;
133 static struct spdk_reactor
*
134 spdk_reactor_get(uint32_t lcore
)
136 struct spdk_reactor
*reactor
;
137 reactor
= spdk_likely(g_reactors
) ? &g_reactors
[lcore
] : NULL
;
142 spdk_event_allocate(uint32_t lcore
, spdk_event_fn fn
, void *arg1
, void *arg2
)
144 struct spdk_event
*event
= NULL
;
145 struct spdk_reactor
*reactor
= spdk_reactor_get(lcore
);
152 event
= spdk_mempool_get(reactor
->event_mempool
);
158 event
->lcore
= lcore
;
167 spdk_event_call(struct spdk_event
*event
)
170 struct spdk_reactor
*reactor
;
172 reactor
= spdk_reactor_get(event
->lcore
);
174 assert(reactor
->events
!= NULL
);
175 rc
= spdk_ring_enqueue(reactor
->events
, (void **)&event
, 1);
181 static inline uint32_t
182 _spdk_event_queue_run_batch(struct spdk_reactor
*reactor
)
185 void *events
[SPDK_EVENT_BATCH_SIZE
];
189 * spdk_ring_dequeue() fills events and returns how many entries it wrote,
190 * so we will never actually read uninitialized data from events, but just to be sure
191 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
193 memset(events
, 0, sizeof(events
));
196 count
= spdk_ring_dequeue(reactor
->events
, events
, SPDK_EVENT_BATCH_SIZE
);
201 for (i
= 0; i
< count
; i
++) {
202 struct spdk_event
*event
= events
[i
];
204 assert(event
!= NULL
);
205 event
->fn(event
->arg1
, event
->arg2
);
208 spdk_mempool_put_bulk(reactor
->event_mempool
, events
, count
);
214 _spdk_reactor_msg_passed(void *arg1
, void *arg2
)
216 spdk_thread_fn fn
= arg1
;
222 _spdk_reactor_send_msg(spdk_thread_fn fn
, void *ctx
, void *thread_ctx
)
224 struct spdk_event
*event
;
225 struct spdk_reactor
*reactor
;
227 reactor
= thread_ctx
;
229 event
= spdk_event_allocate(reactor
->lcore
, _spdk_reactor_msg_passed
, fn
, ctx
);
231 spdk_event_call(event
);
235 _spdk_poller_insert_timer(struct spdk_reactor
*reactor
, struct spdk_poller
*poller
, uint64_t now
)
237 struct spdk_poller
*iter
;
238 uint64_t next_run_tick
;
240 next_run_tick
= now
+ poller
->period_ticks
;
241 poller
->next_run_tick
= next_run_tick
;
244 * Insert poller in the reactor's timer_pollers list in sorted order by next scheduled
247 TAILQ_FOREACH_REVERSE(iter
, &reactor
->timer_pollers
, timer_pollers_head
, tailq
) {
248 if (iter
->next_run_tick
<= next_run_tick
) {
249 TAILQ_INSERT_AFTER(&reactor
->timer_pollers
, iter
, poller
, tailq
);
254 /* No earlier pollers were found, so this poller must be the new head */
255 TAILQ_INSERT_HEAD(&reactor
->timer_pollers
, poller
, tailq
);
258 static struct spdk_poller
*
259 _spdk_reactor_start_poller(void *thread_ctx
,
262 uint64_t period_microseconds
)
264 struct spdk_poller
*poller
;
265 struct spdk_reactor
*reactor
;
266 uint64_t quotient
, remainder
, ticks
;
268 reactor
= thread_ctx
;
270 poller
= calloc(1, sizeof(*poller
));
271 if (poller
== NULL
) {
272 SPDK_ERRLOG("Poller memory allocation failed\n");
276 poller
->lcore
= reactor
->lcore
;
277 poller
->state
= SPDK_POLLER_STATE_WAITING
;
281 if (period_microseconds
) {
282 quotient
= period_microseconds
/ SPDK_SEC_TO_USEC
;
283 remainder
= period_microseconds
% SPDK_SEC_TO_USEC
;
284 ticks
= spdk_get_ticks_hz();
286 poller
->period_ticks
= ticks
* quotient
+ (ticks
* remainder
) / SPDK_SEC_TO_USEC
;
288 poller
->period_ticks
= 0;
291 if (poller
->period_ticks
) {
292 _spdk_poller_insert_timer(reactor
, poller
, spdk_get_ticks());
294 TAILQ_INSERT_TAIL(&reactor
->active_pollers
, poller
, tailq
);
301 _spdk_reactor_stop_poller(struct spdk_poller
*poller
, void *thread_ctx
)
303 struct spdk_reactor
*reactor
;
305 reactor
= thread_ctx
;
307 assert(poller
->lcore
== spdk_env_get_current_core());
309 if (poller
->state
== SPDK_POLLER_STATE_RUNNING
) {
311 * We are being called from the poller_fn, so set the state to unregistered
312 * and let the reactor loop free the poller.
314 poller
->state
= SPDK_POLLER_STATE_UNREGISTERED
;
316 /* Poller is not running currently, so just free it. */
317 if (poller
->period_ticks
) {
318 TAILQ_REMOVE(&reactor
->timer_pollers
, poller
, tailq
);
320 TAILQ_REMOVE(&reactor
->active_pollers
, poller
, tailq
);
328 get_rusage(void *arg
)
330 struct spdk_reactor
*reactor
= arg
;
331 struct rusage rusage
;
333 if (getrusage(RUSAGE_THREAD
, &rusage
) != 0) {
337 if (rusage
.ru_nvcsw
!= reactor
->rusage
.ru_nvcsw
|| rusage
.ru_nivcsw
!= reactor
->rusage
.ru_nivcsw
) {
338 SPDK_INFOLOG(SPDK_LOG_REACTOR
,
339 "Reactor %d: %ld voluntary context switches and %ld involuntary context switches in the last second.\n",
340 reactor
->lcore
, rusage
.ru_nvcsw
- reactor
->rusage
.ru_nvcsw
,
341 rusage
.ru_nivcsw
- reactor
->rusage
.ru_nivcsw
);
343 reactor
->rusage
= rusage
;
349 _spdk_reactor_context_switch_monitor_start(void *arg1
, void *arg2
)
351 struct spdk_reactor
*reactor
= arg1
;
353 if (reactor
->rusage_poller
== NULL
) {
354 getrusage(RUSAGE_THREAD
, &reactor
->rusage
);
355 reactor
->rusage_poller
= spdk_poller_register(get_rusage
, reactor
, 1000000);
360 _spdk_reactor_context_switch_monitor_stop(void *arg1
, void *arg2
)
362 struct spdk_reactor
*reactor
= arg1
;
364 if (reactor
->rusage_poller
!= NULL
) {
365 spdk_poller_unregister(&reactor
->rusage_poller
);
370 _spdk_reactor_get_max_event_cnt(uint8_t socket_count
)
374 /* Try to make event ring fill at most 2MB of memory,
375 * as some ring implementations may require physical address
376 * contingency. We don't want to introduce a requirement of
377 * at least 2 physically contiguous 2MB hugepages.
379 cnt
= spdk_min(262144 / socket_count
, 262144 / 2);
380 /* Take into account one extra element required by
381 * some ring implementations.
388 spdk_reactor_enable_context_switch_monitor(bool enable
)
390 struct spdk_reactor
*reactor
;
394 if (enable
!= g_context_switch_monitor_enabled
) {
395 g_context_switch_monitor_enabled
= enable
;
397 fn
= _spdk_reactor_context_switch_monitor_start
;
399 fn
= _spdk_reactor_context_switch_monitor_stop
;
401 SPDK_ENV_FOREACH_CORE(core
) {
402 reactor
= spdk_reactor_get(core
);
403 spdk_event_call(spdk_event_allocate(core
, fn
, reactor
, NULL
));
409 spdk_reactor_context_switch_monitor_enabled(void)
411 return g_context_switch_monitor_enabled
;
415 spdk_reactor_add_tsc_stats(void *arg
, int rc
, uint64_t now
)
417 struct spdk_reactor
*reactor
= arg
;
418 struct spdk_reactor_tsc_stats
*tsc_stats
= &reactor
->tsc_stats
;
421 /* Poller status idle */
422 tsc_stats
->idle_tsc
+= now
- reactor
->tsc_last
;
424 /* Poller status busy */
425 tsc_stats
->busy_tsc
+= now
- reactor
->tsc_last
;
427 /* Poller status unknown */
428 tsc_stats
->unknown_tsc
+= now
- reactor
->tsc_last
;
431 reactor
->tsc_last
= now
;
435 spdk_reactor_get_tsc_stats(struct spdk_reactor_tsc_stats
*tsc_stats
, uint32_t core
)
437 struct spdk_reactor
*reactor
;
439 if (!spdk_cpuset_get_cpu(g_spdk_app_core_mask
, core
)) {
443 reactor
= spdk_reactor_get(core
);
444 *tsc_stats
= reactor
->tsc_stats
;
451 * \brief This is the main function of the reactor thread.
457 * dequeue and run a batch of events
459 * if (active pollers)
460 * run the first poller in the list and move it to the back
462 * if (first timer poller has expired)
463 * run the first timer poller and reinsert it in the timer list
465 * if (no action taken and sleep enabled)
466 * sleep until next timer poller is scheduled to expire
471 _spdk_reactor_run(void *arg
)
473 struct spdk_reactor
*reactor
= arg
;
474 struct spdk_poller
*poller
;
475 uint32_t event_count
;
477 uint64_t sleep_cycles
;
480 char thread_name
[32];
482 snprintf(thread_name
, sizeof(thread_name
), "reactor_%u", reactor
->lcore
);
483 if (spdk_allocate_thread(_spdk_reactor_send_msg
,
484 _spdk_reactor_start_poller
,
485 _spdk_reactor_stop_poller
,
486 reactor
, thread_name
) == NULL
) {
489 SPDK_NOTICELOG("Reactor started on core %u on socket %u\n", reactor
->lcore
,
492 sleep_cycles
= reactor
->max_delay_us
* spdk_get_ticks_hz() / SPDK_SEC_TO_USEC
;
493 if (g_context_switch_monitor_enabled
) {
494 _spdk_reactor_context_switch_monitor_start(reactor
, NULL
);
496 now
= spdk_get_ticks();
497 reactor
->tsc_last
= now
;
500 bool took_action
= false;
502 event_count
= _spdk_event_queue_run_batch(reactor
);
503 if (event_count
> 0) {
505 now
= spdk_get_ticks();
506 spdk_reactor_add_tsc_stats(reactor
, rc
, now
);
510 poller
= TAILQ_FIRST(&reactor
->active_pollers
);
512 TAILQ_REMOVE(&reactor
->active_pollers
, poller
, tailq
);
513 poller
->state
= SPDK_POLLER_STATE_RUNNING
;
514 rc
= poller
->fn(poller
->arg
);
515 now
= spdk_get_ticks();
516 spdk_reactor_add_tsc_stats(reactor
, rc
, now
);
517 if (poller
->state
== SPDK_POLLER_STATE_UNREGISTERED
) {
520 poller
->state
= SPDK_POLLER_STATE_WAITING
;
521 TAILQ_INSERT_TAIL(&reactor
->active_pollers
, poller
, tailq
);
526 poller
= TAILQ_FIRST(&reactor
->timer_pollers
);
528 if (took_action
== false) {
529 now
= spdk_get_ticks();
532 if (now
>= poller
->next_run_tick
) {
533 uint64_t tmp_timer_tsc
;
535 TAILQ_REMOVE(&reactor
->timer_pollers
, poller
, tailq
);
536 poller
->state
= SPDK_POLLER_STATE_RUNNING
;
537 rc
= poller
->fn(poller
->arg
);
538 /* Save the tsc value from before poller->fn was executed. We want to
539 * use the current time for idle/busy tsc value accounting, but want to
540 * use the older time to reinsert to the timer poller below. */
542 now
= spdk_get_ticks();
543 spdk_reactor_add_tsc_stats(reactor
, rc
, now
);
544 if (poller
->state
== SPDK_POLLER_STATE_UNREGISTERED
) {
547 poller
->state
= SPDK_POLLER_STATE_WAITING
;
548 _spdk_poller_insert_timer(reactor
, poller
, tmp_timer_tsc
);
554 /* Determine if the thread can sleep */
555 if (sleep_cycles
&& !took_action
) {
556 now
= spdk_get_ticks();
557 sleep_us
= reactor
->max_delay_us
;
559 poller
= TAILQ_FIRST(&reactor
->timer_pollers
);
561 /* There are timers registered, so don't sleep beyond
562 * when the next timer should fire */
563 if (poller
->next_run_tick
< (now
+ sleep_cycles
)) {
564 if (poller
->next_run_tick
<= now
) {
567 sleep_us
= ((poller
->next_run_tick
- now
) *
568 SPDK_SEC_TO_USEC
) / spdk_get_ticks_hz();
578 if (g_reactor_state
!= SPDK_REACTOR_STATE_RUNNING
) {
583 _spdk_reactor_context_switch_monitor_stop(reactor
, NULL
);
589 spdk_reactor_construct(struct spdk_reactor
*reactor
, uint32_t lcore
, uint64_t max_delay_us
)
591 reactor
->lcore
= lcore
;
592 reactor
->socket_id
= spdk_env_get_socket_id(lcore
);
593 assert(reactor
->socket_id
< SPDK_MAX_SOCKET
);
594 reactor
->max_delay_us
= max_delay_us
;
596 TAILQ_INIT(&reactor
->active_pollers
);
597 TAILQ_INIT(&reactor
->timer_pollers
);
599 reactor
->events
= spdk_ring_create(SPDK_RING_TYPE_MP_SC
, 65536, reactor
->socket_id
);
600 if (!reactor
->events
) {
601 SPDK_NOTICELOG("Ring creation failed on preferred socket %d. Try other sockets.\n",
604 reactor
->events
= spdk_ring_create(SPDK_RING_TYPE_MP_SC
, 65536,
605 SPDK_ENV_SOCKET_ID_ANY
);
607 assert(reactor
->events
!= NULL
);
609 reactor
->event_mempool
= g_spdk_event_mempool
[reactor
->socket_id
];
613 spdk_app_parse_core_mask(const char *mask
, struct spdk_cpuset
*cpumask
)
616 struct spdk_cpuset
*validmask
;
618 ret
= spdk_cpuset_parse(cpumask
, mask
);
623 validmask
= spdk_app_get_core_mask();
624 spdk_cpuset_and(cpumask
, validmask
);
630 spdk_app_get_core_mask(void)
632 return g_spdk_app_core_mask
;
637 spdk_reactor_get_socket_mask(void)
641 uint64_t socket_info
= 0;
643 SPDK_ENV_FOREACH_CORE(i
) {
644 socket_id
= spdk_env_get_socket_id(i
);
645 socket_info
|= (1ULL << socket_id
);
652 spdk_reactors_start(void)
654 struct spdk_reactor
*reactor
;
655 uint32_t i
, current_core
;
658 g_reactor_state
= SPDK_REACTOR_STATE_RUNNING
;
659 g_spdk_app_core_mask
= spdk_cpuset_alloc();
661 current_core
= spdk_env_get_current_core();
662 SPDK_ENV_FOREACH_CORE(i
) {
663 if (i
!= current_core
) {
664 reactor
= spdk_reactor_get(i
);
665 rc
= spdk_env_thread_launch_pinned(reactor
->lcore
, _spdk_reactor_run
, reactor
);
667 SPDK_ERRLOG("Unable to start reactor thread on core %u\n", reactor
->lcore
);
672 spdk_cpuset_set_cpu(g_spdk_app_core_mask
, i
, true);
675 /* Start the master reactor */
676 reactor
= spdk_reactor_get(current_core
);
677 _spdk_reactor_run(reactor
);
679 spdk_env_thread_wait_all();
681 g_reactor_state
= SPDK_REACTOR_STATE_SHUTDOWN
;
682 spdk_cpuset_free(g_spdk_app_core_mask
);
683 g_spdk_app_core_mask
= NULL
;
687 spdk_reactors_stop(void *arg1
, void *arg2
)
689 g_reactor_state
= SPDK_REACTOR_STATE_EXITING
;
693 spdk_reactors_init(unsigned int max_delay_us
)
696 uint32_t i
, j
, last_core
;
697 struct spdk_reactor
*reactor
;
698 uint64_t socket_mask
= 0x0;
699 uint8_t socket_count
= 0;
700 char mempool_name
[32];
702 socket_mask
= spdk_reactor_get_socket_mask();
703 SPDK_NOTICELOG("Occupied cpu socket mask is 0x%lx\n", socket_mask
);
705 for (i
= 0; i
< SPDK_MAX_SOCKET
; i
++) {
706 if ((1ULL << i
) & socket_mask
) {
710 if (socket_count
== 0) {
711 SPDK_ERRLOG("No sockets occupied (internal error)\n");
715 for (i
= 0; i
< SPDK_MAX_SOCKET
; i
++) {
716 if ((1ULL << i
) & socket_mask
) {
717 snprintf(mempool_name
, sizeof(mempool_name
), "evtpool%d_%d", i
, getpid());
718 g_spdk_event_mempool
[i
] = spdk_mempool_create(mempool_name
,
719 _spdk_reactor_get_max_event_cnt(socket_count
),
720 sizeof(struct spdk_event
),
721 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE
, i
);
723 if (g_spdk_event_mempool
[i
] == NULL
) {
724 SPDK_NOTICELOG("Event_mempool creation failed on preferred socket %d.\n", i
);
727 * Instead of failing the operation directly, try to create
728 * the mempool on any available sockets in the case that
729 * memory is not evenly installed on all sockets. If still
730 * fails, free all allocated memory and exits.
732 g_spdk_event_mempool
[i
] = spdk_mempool_create(
734 _spdk_reactor_get_max_event_cnt(socket_count
),
735 sizeof(struct spdk_event
),
736 SPDK_MEMPOOL_DEFAULT_CACHE_SIZE
,
737 SPDK_ENV_SOCKET_ID_ANY
);
739 if (g_spdk_event_mempool
[i
] == NULL
) {
740 for (j
= i
- 1; j
< i
; j
--) {
741 if (g_spdk_event_mempool
[j
] != NULL
) {
742 spdk_mempool_free(g_spdk_event_mempool
[j
]);
745 SPDK_ERRLOG("spdk_event_mempool creation failed\n");
750 g_spdk_event_mempool
[i
] = NULL
;
754 /* struct spdk_reactor must be aligned on 64 byte boundary */
755 last_core
= spdk_env_get_last_core();
756 rc
= posix_memalign((void **)&g_reactors
, 64,
757 (last_core
+ 1) * sizeof(struct spdk_reactor
));
759 SPDK_ERRLOG("Could not allocate array size=%u for g_reactors\n",
761 for (i
= 0; i
< SPDK_MAX_SOCKET
; i
++) {
762 if (g_spdk_event_mempool
[i
] != NULL
) {
763 spdk_mempool_free(g_spdk_event_mempool
[i
]);
769 memset(g_reactors
, 0, (last_core
+ 1) * sizeof(struct spdk_reactor
));
771 SPDK_ENV_FOREACH_CORE(i
) {
772 reactor
= spdk_reactor_get(i
);
773 spdk_reactor_construct(reactor
, i
, max_delay_us
);
776 g_reactor_state
= SPDK_REACTOR_STATE_INITIALIZED
;
782 spdk_reactors_fini(void)
785 struct spdk_reactor
*reactor
;
787 SPDK_ENV_FOREACH_CORE(i
) {
788 reactor
= spdk_reactor_get(i
);
789 if (spdk_likely(reactor
!= NULL
) && reactor
->events
!= NULL
) {
790 spdk_ring_free(reactor
->events
);
794 for (i
= 0; i
< SPDK_MAX_SOCKET
; i
++) {
795 if (g_spdk_event_mempool
[i
] != NULL
) {
796 spdk_mempool_free(g_spdk_event_mempool
[i
]);
804 SPDK_LOG_REGISTER_COMPONENT("reactor", SPDK_LOG_REACTOR
)