]> git.proxmox.com Git - ceph.git/blame - ceph/src/spdk/lib/thread/thread.c
import 15.2.0 Octopus source
[ceph.git] / ceph / src / spdk / lib / thread / thread.c
CommitLineData
11fdf7f2
TL
1/*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34#include "spdk/stdinc.h"
35
9f95a23c
TL
36#include "spdk/env.h"
37#include "spdk/queue.h"
11fdf7f2
TL
38#include "spdk/string.h"
39#include "spdk/thread.h"
9f95a23c 40#include "spdk/util.h"
11fdf7f2
TL
41
42#include "spdk_internal/log.h"
9f95a23c 43#include "spdk_internal/thread.h"
11fdf7f2 44
9f95a23c 45#define SPDK_MSG_BATCH_SIZE 8
11fdf7f2
TL
46
47static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
48
9f95a23c
TL
49static spdk_new_thread_fn g_new_thread_fn = NULL;
50static size_t g_ctx_sz = 0;
51
11fdf7f2
TL
52struct io_device {
53 void *io_device;
54 char *name;
55 spdk_io_channel_create_cb create_cb;
56 spdk_io_channel_destroy_cb destroy_cb;
57 spdk_io_device_unregister_cb unregister_cb;
58 struct spdk_thread *unregister_thread;
59 uint32_t ctx_size;
60 uint32_t for_each_count;
61 TAILQ_ENTRY(io_device) tailq;
62
63 uint32_t refcnt;
64
65 bool unregistered;
66};
67
68static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
69
9f95a23c
TL
70struct spdk_msg {
71 spdk_msg_fn fn;
72 void *arg;
73
74 SLIST_ENTRY(spdk_msg) link;
75};
76
77#define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024
78static struct spdk_mempool *g_spdk_msg_mempool = NULL;
79
80enum spdk_poller_state {
81 /* The poller is registered with a thread but not currently executing its fn. */
82 SPDK_POLLER_STATE_WAITING,
83
84 /* The poller is currently running its fn. */
85 SPDK_POLLER_STATE_RUNNING,
86
87 /* The poller was unregistered during the execution of its fn. */
88 SPDK_POLLER_STATE_UNREGISTERED,
89};
90
91struct spdk_poller {
92 TAILQ_ENTRY(spdk_poller) tailq;
93
94 /* Current state of the poller; should only be accessed from the poller's thread. */
95 enum spdk_poller_state state;
96
97 uint64_t period_ticks;
98 uint64_t next_run_tick;
99 spdk_poller_fn fn;
100 void *arg;
101};
102
11fdf7f2 103struct spdk_thread {
11fdf7f2
TL
104 TAILQ_HEAD(, spdk_io_channel) io_channels;
105 TAILQ_ENTRY(spdk_thread) tailq;
106 char *name;
9f95a23c
TL
107
108 bool exit;
109
110 struct spdk_cpuset *cpumask;
111
112 uint64_t tsc_last;
113 struct spdk_thread_stats stats;
114
115 /*
116 * Contains pollers actively running on this thread. Pollers
117 * are run round-robin. The thread takes one poller from the head
118 * of the ring, executes it, then puts it back at the tail of
119 * the ring.
120 */
121 TAILQ_HEAD(active_pollers_head, spdk_poller) active_pollers;
122
123 /**
124 * Contains pollers running on this thread with a periodic timer.
125 */
126 TAILQ_HEAD(timer_pollers_head, spdk_poller) timer_pollers;
127
128 struct spdk_ring *messages;
129
130 SLIST_HEAD(, spdk_msg) msg_cache;
131 size_t msg_cache_count;
132
133 /* User context allocated at the end */
134 uint8_t ctx[0];
11fdf7f2
TL
135};
136
137static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
138static uint32_t g_thread_count = 0;
139
9f95a23c
TL
140static __thread struct spdk_thread *tls_thread = NULL;
141
142static inline struct spdk_thread *
11fdf7f2
TL
143_get_thread(void)
144{
9f95a23c
TL
145 return tls_thread;
146}
11fdf7f2 147
9f95a23c
TL
148int
149spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
150{
151 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
11fdf7f2 152
9f95a23c
TL
153 assert(g_new_thread_fn == NULL);
154 g_new_thread_fn = new_thread_fn;
11fdf7f2 155
9f95a23c 156 g_ctx_sz = ctx_sz;
11fdf7f2 157
9f95a23c
TL
158 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
159 g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
160 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
161 sizeof(struct spdk_msg),
162 0, /* No cache. We do our own. */
163 SPDK_ENV_SOCKET_ID_ANY);
164
165 if (!g_spdk_msg_mempool) {
166 return -1;
167 }
11fdf7f2 168
11fdf7f2
TL
169 return 0;
170}
171
172void
173spdk_thread_lib_fini(void)
174{
9f95a23c
TL
175 struct io_device *dev;
176
177 TAILQ_FOREACH(dev, &g_io_devices, tailq) {
178 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
179 }
180
181 if (g_spdk_msg_mempool) {
182 spdk_mempool_free(g_spdk_msg_mempool);
183 g_spdk_msg_mempool = NULL;
184 }
185
186 g_new_thread_fn = NULL;
187 g_ctx_sz = 0;
11fdf7f2
TL
188}
189
9f95a23c
TL
190static void
191_free_thread(struct spdk_thread *thread)
11fdf7f2 192{
9f95a23c
TL
193 struct spdk_io_channel *ch;
194 struct spdk_msg *msg;
195 struct spdk_poller *poller, *ptmp;
196
197 TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
198 SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
199 thread->name, ch->dev->name);
200 }
201
202 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
203 if (poller->state == SPDK_POLLER_STATE_WAITING) {
204 SPDK_WARNLOG("poller %p still registered at thread exit\n",
205 poller);
206 }
207
208 TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
209 free(poller);
210 }
211
212
213 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, ptmp) {
214 if (poller->state == SPDK_POLLER_STATE_WAITING) {
215 SPDK_WARNLOG("poller %p still registered at thread exit\n",
216 poller);
217 }
218
219 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
220 free(poller);
221 }
11fdf7f2
TL
222
223 pthread_mutex_lock(&g_devlist_mutex);
9f95a23c
TL
224 assert(g_thread_count > 0);
225 g_thread_count--;
226 TAILQ_REMOVE(&g_threads, thread, tailq);
227 pthread_mutex_unlock(&g_devlist_mutex);
11fdf7f2 228
9f95a23c
TL
229 spdk_cpuset_free(thread->cpumask);
230 free(thread->name);
231
232 msg = SLIST_FIRST(&thread->msg_cache);
233 while (msg != NULL) {
234 SLIST_REMOVE_HEAD(&thread->msg_cache, link);
235
236 assert(thread->msg_cache_count > 0);
237 thread->msg_cache_count--;
238 spdk_mempool_put(g_spdk_msg_mempool, msg);
239
240 msg = SLIST_FIRST(&thread->msg_cache);
11fdf7f2
TL
241 }
242
9f95a23c
TL
243 assert(thread->msg_cache_count == 0);
244
245 spdk_ring_free(thread->messages);
246 free(thread);
247}
248
249struct spdk_thread *
250spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
251{
252 struct spdk_thread *thread;
253 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
254 int rc, i;
255
256 thread = calloc(1, sizeof(*thread) + g_ctx_sz);
11fdf7f2
TL
257 if (!thread) {
258 SPDK_ERRLOG("Unable to allocate memory for thread\n");
11fdf7f2
TL
259 return NULL;
260 }
261
9f95a23c
TL
262 thread->cpumask = spdk_cpuset_alloc();
263 if (!thread->cpumask) {
264 free(thread);
265 SPDK_ERRLOG("Unable to allocate memory for CPU mask\n");
266 return NULL;
267 }
268
269 if (cpumask) {
270 spdk_cpuset_copy(thread->cpumask, cpumask);
271 } else {
272 spdk_cpuset_negate(thread->cpumask);
273 }
274
11fdf7f2 275 TAILQ_INIT(&thread->io_channels);
9f95a23c
TL
276 TAILQ_INIT(&thread->active_pollers);
277 TAILQ_INIT(&thread->timer_pollers);
278 SLIST_INIT(&thread->msg_cache);
279 thread->msg_cache_count = 0;
280
281 thread->tsc_last = spdk_get_ticks();
282
283 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
284 if (!thread->messages) {
285 SPDK_ERRLOG("Unable to allocate memory for message ring\n");
286 spdk_cpuset_free(thread->cpumask);
287 free(thread);
288 return NULL;
289 }
290
291 /* Fill the local message pool cache. */
292 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
293 if (rc == 0) {
294 /* If we can't populate the cache it's ok. The cache will get filled
295 * up organically as messages are passed to the thread. */
296 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
297 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
298 thread->msg_cache_count++;
299 }
300 }
301
11fdf7f2 302 if (name) {
11fdf7f2
TL
303 thread->name = strdup(name);
304 } else {
305 thread->name = spdk_sprintf_alloc("%p", thread);
306 }
307
308 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread %s\n", thread->name);
309
9f95a23c
TL
310 pthread_mutex_lock(&g_devlist_mutex);
311 TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
312 g_thread_count++;
11fdf7f2
TL
313 pthread_mutex_unlock(&g_devlist_mutex);
314
9f95a23c
TL
315 if (g_new_thread_fn) {
316 rc = g_new_thread_fn(thread);
317 if (rc != 0) {
318 _free_thread(thread);
319 return NULL;
320 }
321 }
322
11fdf7f2
TL
323 return thread;
324}
325
326void
9f95a23c 327spdk_set_thread(struct spdk_thread *thread)
11fdf7f2 328{
9f95a23c
TL
329 tls_thread = thread;
330}
11fdf7f2 331
9f95a23c
TL
332void
333spdk_thread_exit(struct spdk_thread *thread)
334{
335 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name);
11fdf7f2 336
9f95a23c
TL
337 assert(tls_thread == thread);
338
339 thread->exit = true;
340}
341
342void
343spdk_thread_destroy(struct spdk_thread *thread)
344{
345 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name);
346
347 assert(thread->exit == true);
348
349 if (tls_thread == thread) {
350 tls_thread = NULL;
11fdf7f2
TL
351 }
352
9f95a23c
TL
353 _free_thread(thread);
354}
11fdf7f2 355
9f95a23c
TL
356void *
357spdk_thread_get_ctx(struct spdk_thread *thread)
358{
359 if (g_ctx_sz > 0) {
360 return thread->ctx;
361 }
11fdf7f2 362
9f95a23c
TL
363 return NULL;
364}
365
366struct spdk_cpuset *
367spdk_thread_get_cpumask(struct spdk_thread *thread)
368{
369 return thread->cpumask;
370}
371
372struct spdk_thread *
373spdk_thread_get_from_ctx(void *ctx)
374{
375 if (ctx == NULL) {
376 assert(false);
377 return NULL;
378 }
379
380 assert(g_ctx_sz > 0);
381
382 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
383}
384
385static inline uint32_t
386_spdk_msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
387{
388 unsigned count, i;
389 void *messages[SPDK_MSG_BATCH_SIZE];
390
391#ifdef DEBUG
392 /*
393 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
394 * so we will never actually read uninitialized data from events, but just to be sure
395 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
396 */
397 memset(messages, 0, sizeof(messages));
398#endif
399
400 if (max_msgs > 0) {
401 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
402 } else {
403 max_msgs = SPDK_MSG_BATCH_SIZE;
404 }
405
406 count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
407 if (count == 0) {
408 return 0;
409 }
410
411 for (i = 0; i < count; i++) {
412 struct spdk_msg *msg = messages[i];
413
414 assert(msg != NULL);
415 msg->fn(msg->arg);
416
417 if (thread->exit) {
418 break;
419 }
420
421 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
422 /* Insert the messages at the head. We want to re-use the hot
423 * ones. */
424 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
425 thread->msg_cache_count++;
426 } else {
427 spdk_mempool_put(g_spdk_msg_mempool, msg);
428 }
429 }
430
431 return count;
432}
433
434static void
435_spdk_poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
436{
437 struct spdk_poller *iter;
438
439 poller->next_run_tick = now + poller->period_ticks;
440
441 /*
442 * Insert poller in the thread's timer_pollers list in sorted order by next scheduled
443 * run time.
444 */
445 TAILQ_FOREACH_REVERSE(iter, &thread->timer_pollers, timer_pollers_head, tailq) {
446 if (iter->next_run_tick <= poller->next_run_tick) {
447 TAILQ_INSERT_AFTER(&thread->timer_pollers, iter, poller, tailq);
448 return;
449 }
450 }
451
452 /* No earlier pollers were found, so this poller must be the new head */
453 TAILQ_INSERT_HEAD(&thread->timer_pollers, poller, tailq);
454}
455
456int
457spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
458{
459 uint32_t msg_count;
460 struct spdk_thread *orig_thread;
461 struct spdk_poller *poller, *tmp;
462 int rc = 0;
463
464 orig_thread = _get_thread();
465 tls_thread = thread;
466
467 if (now == 0) {
468 now = spdk_get_ticks();
469 }
470
471 msg_count = _spdk_msg_queue_run_batch(thread, max_msgs);
472 if (msg_count) {
473 rc = 1;
474 }
475
476 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
477 active_pollers_head, tailq, tmp) {
478 int poller_rc;
479
480 if (thread->exit) {
481 break;
482 }
483
484 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
485 TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
486 free(poller);
487 continue;
488 }
489
490 poller->state = SPDK_POLLER_STATE_RUNNING;
491 poller_rc = poller->fn(poller->arg);
492
493 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
494 TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
495 free(poller);
496 continue;
497 }
498
499 poller->state = SPDK_POLLER_STATE_WAITING;
500
501#ifdef DEBUG
502 if (poller_rc == -1) {
503 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %p returned -1\n", poller);
504 }
505#endif
506
507 if (poller_rc > rc) {
508 rc = poller_rc;
509 }
510
511 }
512
513 TAILQ_FOREACH_SAFE(poller, &thread->timer_pollers, tailq, tmp) {
514 int timer_rc = 0;
515
516 if (thread->exit) {
517 break;
518 }
519
520 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
521 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
522 free(poller);
523 continue;
524 }
525
526 if (now < poller->next_run_tick) {
527 break;
528 }
529
530 poller->state = SPDK_POLLER_STATE_RUNNING;
531 timer_rc = poller->fn(poller->arg);
532
533 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
534 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
535 free(poller);
536 continue;
537 }
538
539 poller->state = SPDK_POLLER_STATE_WAITING;
540 TAILQ_REMOVE(&thread->timer_pollers, poller, tailq);
541 _spdk_poller_insert_timer(thread, poller, now);
542
543#ifdef DEBUG
544 if (timer_rc == -1) {
545 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %p returned -1\n", poller);
546 }
547#endif
548
549 if (timer_rc > rc) {
550 rc = timer_rc;
551
552 }
553 }
554
555 if (rc == 0) {
556 /* Poller status idle */
557 thread->stats.idle_tsc += now - thread->tsc_last;
558 } else if (rc > 0) {
559 /* Poller status busy */
560 thread->stats.busy_tsc += now - thread->tsc_last;
561 }
562 thread->tsc_last = now;
563
564 tls_thread = orig_thread;
565
566 return rc;
567}
568
569uint64_t
570spdk_thread_next_poller_expiration(struct spdk_thread *thread)
571{
572 struct spdk_poller *poller;
573
574 poller = TAILQ_FIRST(&thread->timer_pollers);
575 if (poller) {
576 return poller->next_run_tick;
577 }
578
579 return 0;
580}
581
582int
583spdk_thread_has_active_pollers(struct spdk_thread *thread)
584{
585 return !TAILQ_EMPTY(&thread->active_pollers);
586}
587
588bool
589spdk_thread_has_pollers(struct spdk_thread *thread)
590{
591 if (TAILQ_EMPTY(&thread->active_pollers) &&
592 TAILQ_EMPTY(&thread->timer_pollers)) {
593 return false;
594 }
595
596 return true;
597}
598
599bool
600spdk_thread_is_idle(struct spdk_thread *thread)
601{
602 if (spdk_ring_count(thread->messages) ||
603 spdk_thread_has_pollers(thread)) {
604 return false;
605 }
606
607 return true;
11fdf7f2
TL
608}
609
610uint32_t
611spdk_thread_get_count(void)
612{
613 /*
614 * Return cached value of the current thread count. We could acquire the
615 * lock and iterate through the TAILQ of threads to count them, but that
616 * count could still be invalidated after we release the lock.
617 */
618 return g_thread_count;
619}
620
621struct spdk_thread *
622spdk_get_thread(void)
623{
624 struct spdk_thread *thread;
625
11fdf7f2
TL
626 thread = _get_thread();
627 if (!thread) {
628 SPDK_ERRLOG("No thread allocated\n");
629 }
630
11fdf7f2
TL
631 return thread;
632}
633
634const char *
635spdk_thread_get_name(const struct spdk_thread *thread)
636{
637 return thread->name;
638}
639
9f95a23c
TL
640int
641spdk_thread_get_stats(struct spdk_thread_stats *stats)
11fdf7f2 642{
9f95a23c
TL
643 struct spdk_thread *thread;
644
645 thread = _get_thread();
646 if (!thread) {
647 SPDK_ERRLOG("No thread allocated\n");
648 return -EINVAL;
649 }
650
651 if (stats == NULL) {
652 return -EINVAL;
653 }
654
655 *stats = thread->stats;
656
657 return 0;
11fdf7f2
TL
658}
659
9f95a23c
TL
660void
661spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
662{
663 struct spdk_thread *local_thread;
664 struct spdk_msg *msg;
665 int rc;
666
667 if (!thread) {
668 assert(false);
669 return;
670 }
671
672 local_thread = _get_thread();
673
674 msg = NULL;
675 if (local_thread != NULL) {
676 if (local_thread->msg_cache_count > 0) {
677 msg = SLIST_FIRST(&local_thread->msg_cache);
678 assert(msg != NULL);
679 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
680 local_thread->msg_cache_count--;
681 }
682 }
683
684 if (msg == NULL) {
685 msg = spdk_mempool_get(g_spdk_msg_mempool);
686 if (!msg) {
687 assert(false);
688 return;
689 }
690 }
691
692 msg->fn = fn;
693 msg->arg = ctx;
694
695 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
696 if (rc != 1) {
697 assert(false);
698 spdk_mempool_put(g_spdk_msg_mempool, msg);
699 return;
700 }
701}
11fdf7f2
TL
702
703struct spdk_poller *
704spdk_poller_register(spdk_poller_fn fn,
705 void *arg,
706 uint64_t period_microseconds)
707{
708 struct spdk_thread *thread;
709 struct spdk_poller *poller;
9f95a23c 710 uint64_t quotient, remainder, ticks;
11fdf7f2
TL
711
712 thread = spdk_get_thread();
713 if (!thread) {
714 assert(false);
715 return NULL;
716 }
717
9f95a23c
TL
718 poller = calloc(1, sizeof(*poller));
719 if (poller == NULL) {
720 SPDK_ERRLOG("Poller memory allocation failed\n");
11fdf7f2
TL
721 return NULL;
722 }
723
9f95a23c
TL
724 poller->state = SPDK_POLLER_STATE_WAITING;
725 poller->fn = fn;
726 poller->arg = arg;
727
728 if (period_microseconds) {
729 quotient = period_microseconds / SPDK_SEC_TO_USEC;
730 remainder = period_microseconds % SPDK_SEC_TO_USEC;
731 ticks = spdk_get_ticks_hz();
732
733 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
734 } else {
735 poller->period_ticks = 0;
736 }
737
738 if (poller->period_ticks) {
739 _spdk_poller_insert_timer(thread, poller, spdk_get_ticks());
740 } else {
741 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
11fdf7f2
TL
742 }
743
744 return poller;
745}
746
747void
748spdk_poller_unregister(struct spdk_poller **ppoller)
749{
750 struct spdk_thread *thread;
751 struct spdk_poller *poller;
752
753 poller = *ppoller;
754 if (poller == NULL) {
755 return;
756 }
757
758 *ppoller = NULL;
759
760 thread = spdk_get_thread();
9f95a23c
TL
761 if (!thread) {
762 assert(false);
763 return;
11fdf7f2 764 }
9f95a23c
TL
765
766 /* Simply set the state to unregistered. The poller will get cleaned up
767 * in a subsequent call to spdk_thread_poll().
768 */
769 poller->state = SPDK_POLLER_STATE_UNREGISTERED;
11fdf7f2
TL
770}
771
772struct call_thread {
773 struct spdk_thread *cur_thread;
9f95a23c 774 spdk_msg_fn fn;
11fdf7f2
TL
775 void *ctx;
776
777 struct spdk_thread *orig_thread;
9f95a23c 778 spdk_msg_fn cpl;
11fdf7f2
TL
779};
780
781static void
782spdk_on_thread(void *ctx)
783{
784 struct call_thread *ct = ctx;
785
786 ct->fn(ct->ctx);
787
788 pthread_mutex_lock(&g_devlist_mutex);
789 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
790 pthread_mutex_unlock(&g_devlist_mutex);
791
792 if (!ct->cur_thread) {
793 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n");
794
795 spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
796 free(ctx);
797 } else {
798 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n",
799 ct->cur_thread->name);
800
801 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ctx);
802 }
803}
804
805void
9f95a23c 806spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
11fdf7f2
TL
807{
808 struct call_thread *ct;
9f95a23c 809 struct spdk_thread *thread;
11fdf7f2
TL
810
811 ct = calloc(1, sizeof(*ct));
812 if (!ct) {
813 SPDK_ERRLOG("Unable to perform thread iteration\n");
814 cpl(ctx);
815 return;
816 }
817
818 ct->fn = fn;
819 ct->ctx = ctx;
820 ct->cpl = cpl;
821
9f95a23c
TL
822 thread = _get_thread();
823 if (!thread) {
824 SPDK_ERRLOG("No thread allocated\n");
825 free(ct);
826 cpl(ctx);
827 return;
828 }
829 ct->orig_thread = thread;
830
11fdf7f2 831 pthread_mutex_lock(&g_devlist_mutex);
11fdf7f2
TL
832 ct->cur_thread = TAILQ_FIRST(&g_threads);
833 pthread_mutex_unlock(&g_devlist_mutex);
834
835 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n",
836 ct->orig_thread->name);
837
838 spdk_thread_send_msg(ct->cur_thread, spdk_on_thread, ct);
839}
840
841void
842spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
843 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
844 const char *name)
845{
846 struct io_device *dev, *tmp;
9f95a23c 847 struct spdk_thread *thread;
11fdf7f2
TL
848
849 assert(io_device != NULL);
850 assert(create_cb != NULL);
851 assert(destroy_cb != NULL);
852
9f95a23c
TL
853 thread = spdk_get_thread();
854 if (!thread) {
855 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
856 assert(false);
857 return;
858 }
859
11fdf7f2
TL
860 dev = calloc(1, sizeof(struct io_device));
861 if (dev == NULL) {
862 SPDK_ERRLOG("could not allocate io_device\n");
863 return;
864 }
865
866 dev->io_device = io_device;
867 if (name) {
868 dev->name = strdup(name);
869 } else {
870 dev->name = spdk_sprintf_alloc("%p", dev);
871 }
872 dev->create_cb = create_cb;
873 dev->destroy_cb = destroy_cb;
874 dev->unregister_cb = NULL;
875 dev->ctx_size = ctx_size;
876 dev->for_each_count = 0;
877 dev->unregistered = false;
878 dev->refcnt = 0;
879
880 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n",
9f95a23c 881 dev->name, dev->io_device, thread->name);
11fdf7f2
TL
882
883 pthread_mutex_lock(&g_devlist_mutex);
884 TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
885 if (tmp->io_device == io_device) {
9f95a23c
TL
886 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
887 io_device, tmp->name, dev->name);
11fdf7f2
TL
888 free(dev->name);
889 free(dev);
890 pthread_mutex_unlock(&g_devlist_mutex);
891 return;
892 }
893 }
894 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
895 pthread_mutex_unlock(&g_devlist_mutex);
896}
897
898static void
899_finish_unregister(void *arg)
900{
901 struct io_device *dev = arg;
902
903 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n",
904 dev->name, dev->io_device, dev->unregister_thread->name);
905
906 dev->unregister_cb(dev->io_device);
907 free(dev->name);
908 free(dev);
909}
910
911static void
912_spdk_io_device_free(struct io_device *dev)
913{
914 if (dev->unregister_cb == NULL) {
915 free(dev->name);
916 free(dev);
917 } else {
918 assert(dev->unregister_thread != NULL);
919 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n",
920 dev->name, dev->io_device, dev->unregister_thread->name);
921 spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
922 }
923}
924
925void
926spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
927{
928 struct io_device *dev;
929 uint32_t refcnt;
930 struct spdk_thread *thread;
931
932 thread = spdk_get_thread();
9f95a23c
TL
933 if (!thread) {
934 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
935 assert(false);
936 return;
937 }
11fdf7f2
TL
938
939 pthread_mutex_lock(&g_devlist_mutex);
940 TAILQ_FOREACH(dev, &g_io_devices, tailq) {
941 if (dev->io_device == io_device) {
942 break;
943 }
944 }
945
946 if (!dev) {
947 SPDK_ERRLOG("io_device %p not found\n", io_device);
948 assert(false);
949 pthread_mutex_unlock(&g_devlist_mutex);
950 return;
951 }
952
953 if (dev->for_each_count > 0) {
9f95a23c
TL
954 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
955 dev->name, io_device, dev->for_each_count);
11fdf7f2
TL
956 pthread_mutex_unlock(&g_devlist_mutex);
957 return;
958 }
959
960 dev->unregister_cb = unregister_cb;
961 dev->unregistered = true;
962 TAILQ_REMOVE(&g_io_devices, dev, tailq);
963 refcnt = dev->refcnt;
964 dev->unregister_thread = thread;
965 pthread_mutex_unlock(&g_devlist_mutex);
966
967 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n",
968 dev->name, dev->io_device, thread->name);
969
970 if (refcnt > 0) {
971 /* defer deletion */
972 return;
973 }
974
975 _spdk_io_device_free(dev);
976}
977
978struct spdk_io_channel *
979spdk_get_io_channel(void *io_device)
980{
981 struct spdk_io_channel *ch;
982 struct spdk_thread *thread;
983 struct io_device *dev;
984 int rc;
985
986 pthread_mutex_lock(&g_devlist_mutex);
987 TAILQ_FOREACH(dev, &g_io_devices, tailq) {
988 if (dev->io_device == io_device) {
989 break;
990 }
991 }
992 if (dev == NULL) {
993 SPDK_ERRLOG("could not find io_device %p\n", io_device);
994 pthread_mutex_unlock(&g_devlist_mutex);
995 return NULL;
996 }
997
998 thread = _get_thread();
999 if (!thread) {
1000 SPDK_ERRLOG("No thread allocated\n");
1001 pthread_mutex_unlock(&g_devlist_mutex);
1002 return NULL;
1003 }
1004
1005 TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1006 if (ch->dev == dev) {
1007 ch->ref++;
1008
1009 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1010 ch, dev->name, dev->io_device, thread->name, ch->ref);
1011
1012 /*
1013 * An I/O channel already exists for this device on this
1014 * thread, so return it.
1015 */
1016 pthread_mutex_unlock(&g_devlist_mutex);
1017 return ch;
1018 }
1019 }
1020
1021 ch = calloc(1, sizeof(*ch) + dev->ctx_size);
1022 if (ch == NULL) {
1023 SPDK_ERRLOG("could not calloc spdk_io_channel\n");
1024 pthread_mutex_unlock(&g_devlist_mutex);
1025 return NULL;
1026 }
1027
1028 ch->dev = dev;
1029 ch->destroy_cb = dev->destroy_cb;
1030 ch->thread = thread;
1031 ch->ref = 1;
1032 ch->destroy_ref = 0;
1033 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
1034
1035 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1036 ch, dev->name, dev->io_device, thread->name, ch->ref);
1037
1038 dev->refcnt++;
1039
1040 pthread_mutex_unlock(&g_devlist_mutex);
1041
1042 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
9f95a23c 1043 if (rc != 0) {
11fdf7f2
TL
1044 pthread_mutex_lock(&g_devlist_mutex);
1045 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1046 dev->refcnt--;
1047 free(ch);
1048 pthread_mutex_unlock(&g_devlist_mutex);
1049 return NULL;
1050 }
1051
1052 return ch;
1053}
1054
1055static void
1056_spdk_put_io_channel(void *arg)
1057{
1058 struct spdk_io_channel *ch = arg;
1059 bool do_remove_dev = true;
9f95a23c
TL
1060 struct spdk_thread *thread;
1061
1062 thread = spdk_get_thread();
1063 if (!thread) {
1064 SPDK_ERRLOG("%s called from non-SPDK thread\n", __func__);
1065 assert(false);
1066 return;
1067 }
11fdf7f2
TL
1068
1069 SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1070 "Releasing io_channel %p for io_device %s (%p). Channel thread %p. Current thread %s\n",
9f95a23c 1071 ch, ch->dev->name, ch->dev->io_device, ch->thread, thread->name);
11fdf7f2 1072
9f95a23c 1073 assert(ch->thread == thread);
11fdf7f2
TL
1074
1075 ch->destroy_ref--;
1076
1077 if (ch->ref > 0 || ch->destroy_ref > 0) {
1078 /*
1079 * Another reference to the associated io_device was requested
1080 * after this message was sent but before it had a chance to
1081 * execute.
1082 */
1083 return;
1084 }
1085
1086 pthread_mutex_lock(&g_devlist_mutex);
1087 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1088 pthread_mutex_unlock(&g_devlist_mutex);
1089
1090 /* Don't hold the devlist mutex while the destroy_cb is called. */
1091 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
1092
1093 pthread_mutex_lock(&g_devlist_mutex);
1094 ch->dev->refcnt--;
1095
1096 if (!ch->dev->unregistered) {
1097 do_remove_dev = false;
1098 }
1099
1100 if (ch->dev->refcnt > 0) {
1101 do_remove_dev = false;
1102 }
1103
1104 pthread_mutex_unlock(&g_devlist_mutex);
1105
1106 if (do_remove_dev) {
1107 _spdk_io_device_free(ch->dev);
1108 }
1109 free(ch);
1110}
1111
1112void
1113spdk_put_io_channel(struct spdk_io_channel *ch)
1114{
1115 SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1116 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1117 ch, ch->dev->name, ch->dev->io_device, ch->thread->name, ch->ref);
1118
1119 ch->ref--;
1120
1121 if (ch->ref == 0) {
1122 ch->destroy_ref++;
1123 spdk_thread_send_msg(ch->thread, _spdk_put_io_channel, ch);
1124 }
1125}
1126
1127struct spdk_io_channel *
1128spdk_io_channel_from_ctx(void *ctx)
1129{
1130 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
1131}
1132
1133struct spdk_thread *
1134spdk_io_channel_get_thread(struct spdk_io_channel *ch)
1135{
1136 return ch->thread;
1137}
1138
1139struct spdk_io_channel_iter {
1140 void *io_device;
1141 struct io_device *dev;
1142 spdk_channel_msg fn;
1143 int status;
1144 void *ctx;
1145 struct spdk_io_channel *ch;
1146
1147 struct spdk_thread *cur_thread;
1148
1149 struct spdk_thread *orig_thread;
1150 spdk_channel_for_each_cpl cpl;
1151};
1152
1153void *
1154spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1155{
1156 return i->io_device;
1157}
1158
1159struct spdk_io_channel *
1160spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1161{
1162 return i->ch;
1163}
1164
1165void *
1166spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1167{
1168 return i->ctx;
1169}
1170
1171static void
1172_call_completion(void *ctx)
1173{
1174 struct spdk_io_channel_iter *i = ctx;
1175
1176 if (i->cpl != NULL) {
1177 i->cpl(i, i->status);
1178 }
1179 free(i);
1180}
1181
1182static void
1183_call_channel(void *ctx)
1184{
1185 struct spdk_io_channel_iter *i = ctx;
1186 struct spdk_io_channel *ch;
1187
1188 /*
1189 * It is possible that the channel was deleted before this
1190 * message had a chance to execute. If so, skip calling
1191 * the fn() on this thread.
1192 */
1193 pthread_mutex_lock(&g_devlist_mutex);
1194 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1195 if (ch->dev->io_device == i->io_device) {
1196 break;
1197 }
1198 }
1199 pthread_mutex_unlock(&g_devlist_mutex);
1200
1201 if (ch) {
1202 i->fn(i);
1203 } else {
1204 spdk_for_each_channel_continue(i, 0);
1205 }
1206}
1207
1208void
1209spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1210 spdk_channel_for_each_cpl cpl)
1211{
1212 struct spdk_thread *thread;
1213 struct spdk_io_channel *ch;
1214 struct spdk_io_channel_iter *i;
1215
1216 i = calloc(1, sizeof(*i));
1217 if (!i) {
1218 SPDK_ERRLOG("Unable to allocate iterator\n");
1219 return;
1220 }
1221
1222 i->io_device = io_device;
1223 i->fn = fn;
1224 i->ctx = ctx;
1225 i->cpl = cpl;
1226
1227 pthread_mutex_lock(&g_devlist_mutex);
1228 i->orig_thread = _get_thread();
1229
1230 TAILQ_FOREACH(thread, &g_threads, tailq) {
1231 TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1232 if (ch->dev->io_device == io_device) {
1233 ch->dev->for_each_count++;
1234 i->dev = ch->dev;
1235 i->cur_thread = thread;
1236 i->ch = ch;
1237 pthread_mutex_unlock(&g_devlist_mutex);
1238 spdk_thread_send_msg(thread, _call_channel, i);
1239 return;
1240 }
1241 }
1242 }
1243
1244 pthread_mutex_unlock(&g_devlist_mutex);
1245
1246 spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1247}
1248
1249void
1250spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1251{
1252 struct spdk_thread *thread;
1253 struct spdk_io_channel *ch;
1254
1255 assert(i->cur_thread == spdk_get_thread());
1256
1257 i->status = status;
1258
1259 pthread_mutex_lock(&g_devlist_mutex);
1260 if (status) {
1261 goto end;
1262 }
1263 thread = TAILQ_NEXT(i->cur_thread, tailq);
1264 while (thread) {
1265 TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1266 if (ch->dev->io_device == i->io_device) {
1267 i->cur_thread = thread;
1268 i->ch = ch;
1269 pthread_mutex_unlock(&g_devlist_mutex);
1270 spdk_thread_send_msg(thread, _call_channel, i);
1271 return;
1272 }
1273 }
1274 thread = TAILQ_NEXT(thread, tailq);
1275 }
1276
1277end:
1278 i->dev->for_each_count--;
1279 i->ch = NULL;
1280 pthread_mutex_unlock(&g_devlist_mutex);
1281
1282 spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1283}
1284
1285
1286SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD)