]> git.proxmox.com Git - ceph.git/blame - ceph/src/spdk/lib/thread/thread.c
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / spdk / lib / thread / thread.c
CommitLineData
11fdf7f2
TL
1/*-
2 * BSD LICENSE
3 *
4 * Copyright (c) Intel Corporation.
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 */
33
34#include "spdk/stdinc.h"
35
9f95a23c 36#include "spdk/env.h"
f67539c2 37#include "spdk/likely.h"
9f95a23c 38#include "spdk/queue.h"
11fdf7f2
TL
39#include "spdk/string.h"
40#include "spdk/thread.h"
9f95a23c 41#include "spdk/util.h"
11fdf7f2
TL
42
43#include "spdk_internal/log.h"
9f95a23c 44#include "spdk_internal/thread.h"
11fdf7f2 45
9f95a23c 46#define SPDK_MSG_BATCH_SIZE 8
f67539c2
TL
47#define SPDK_MAX_DEVICE_NAME_LEN 256
48#define SPDK_THREAD_EXIT_TIMEOUT_SEC 5
11fdf7f2
TL
49
50static pthread_mutex_t g_devlist_mutex = PTHREAD_MUTEX_INITIALIZER;
51
9f95a23c 52static spdk_new_thread_fn g_new_thread_fn = NULL;
f67539c2
TL
53static spdk_thread_op_fn g_thread_op_fn = NULL;
54static spdk_thread_op_supported_fn g_thread_op_supported_fn;
9f95a23c 55static size_t g_ctx_sz = 0;
f67539c2
TL
56/* Monotonic increasing ID is set to each created thread beginning at 1. Once the
57 * ID exceeds UINT64_MAX, further thread creation is not allowed and restarting
58 * SPDK application is required.
59 */
60static uint64_t g_thread_id = 1;
9f95a23c 61
11fdf7f2
TL
62struct io_device {
63 void *io_device;
f67539c2 64 char name[SPDK_MAX_DEVICE_NAME_LEN + 1];
11fdf7f2
TL
65 spdk_io_channel_create_cb create_cb;
66 spdk_io_channel_destroy_cb destroy_cb;
67 spdk_io_device_unregister_cb unregister_cb;
68 struct spdk_thread *unregister_thread;
69 uint32_t ctx_size;
70 uint32_t for_each_count;
71 TAILQ_ENTRY(io_device) tailq;
72
73 uint32_t refcnt;
74
75 bool unregistered;
76};
77
78static TAILQ_HEAD(, io_device) g_io_devices = TAILQ_HEAD_INITIALIZER(g_io_devices);
79
9f95a23c
TL
80struct spdk_msg {
81 spdk_msg_fn fn;
82 void *arg;
83
84 SLIST_ENTRY(spdk_msg) link;
85};
86
87#define SPDK_MSG_MEMPOOL_CACHE_SIZE 1024
88static struct spdk_mempool *g_spdk_msg_mempool = NULL;
89
11fdf7f2
TL
90static TAILQ_HEAD(, spdk_thread) g_threads = TAILQ_HEAD_INITIALIZER(g_threads);
91static uint32_t g_thread_count = 0;
92
9f95a23c
TL
93static __thread struct spdk_thread *tls_thread = NULL;
94
95static inline struct spdk_thread *
11fdf7f2
TL
96_get_thread(void)
97{
9f95a23c
TL
98 return tls_thread;
99}
11fdf7f2 100
f67539c2
TL
101static int
102_thread_lib_init(size_t ctx_sz)
9f95a23c
TL
103{
104 char mempool_name[SPDK_MAX_MEMZONE_NAME_LEN];
11fdf7f2 105
9f95a23c 106 g_ctx_sz = ctx_sz;
11fdf7f2 107
9f95a23c
TL
108 snprintf(mempool_name, sizeof(mempool_name), "msgpool_%d", getpid());
109 g_spdk_msg_mempool = spdk_mempool_create(mempool_name,
110 262144 - 1, /* Power of 2 minus 1 is optimal for memory consumption */
111 sizeof(struct spdk_msg),
112 0, /* No cache. We do our own. */
113 SPDK_ENV_SOCKET_ID_ANY);
114
115 if (!g_spdk_msg_mempool) {
116 return -1;
117 }
11fdf7f2 118
11fdf7f2
TL
119 return 0;
120}
121
f67539c2
TL
122int
123spdk_thread_lib_init(spdk_new_thread_fn new_thread_fn, size_t ctx_sz)
124{
125 assert(g_new_thread_fn == NULL);
126 assert(g_thread_op_fn == NULL);
127
128 if (new_thread_fn == NULL) {
129 SPDK_INFOLOG(SPDK_LOG_THREAD, "new_thread_fn was not specified at spdk_thread_lib_init\n");
130 } else {
131 g_new_thread_fn = new_thread_fn;
132 }
133
134 return _thread_lib_init(ctx_sz);
135}
136
137int
138spdk_thread_lib_init_ext(spdk_thread_op_fn thread_op_fn,
139 spdk_thread_op_supported_fn thread_op_supported_fn,
140 size_t ctx_sz)
141{
142 assert(g_new_thread_fn == NULL);
143 assert(g_thread_op_fn == NULL);
144 assert(g_thread_op_supported_fn == NULL);
145
146 if ((thread_op_fn != NULL) != (thread_op_supported_fn != NULL)) {
147 SPDK_ERRLOG("Both must be defined or undefined together.\n");
148 return -EINVAL;
149 }
150
151 if (thread_op_fn == NULL && thread_op_supported_fn == NULL) {
152 SPDK_INFOLOG(SPDK_LOG_THREAD, "thread_op_fn and thread_op_supported_fn were not specified\n");
153 } else {
154 g_thread_op_fn = thread_op_fn;
155 g_thread_op_supported_fn = thread_op_supported_fn;
156 }
157
158 return _thread_lib_init(ctx_sz);
159}
160
11fdf7f2
TL
161void
162spdk_thread_lib_fini(void)
163{
9f95a23c
TL
164 struct io_device *dev;
165
166 TAILQ_FOREACH(dev, &g_io_devices, tailq) {
167 SPDK_ERRLOG("io_device %s not unregistered\n", dev->name);
168 }
169
170 if (g_spdk_msg_mempool) {
171 spdk_mempool_free(g_spdk_msg_mempool);
172 g_spdk_msg_mempool = NULL;
173 }
174
175 g_new_thread_fn = NULL;
f67539c2
TL
176 g_thread_op_fn = NULL;
177 g_thread_op_supported_fn = NULL;
9f95a23c 178 g_ctx_sz = 0;
11fdf7f2
TL
179}
180
9f95a23c
TL
181static void
182_free_thread(struct spdk_thread *thread)
11fdf7f2 183{
9f95a23c
TL
184 struct spdk_io_channel *ch;
185 struct spdk_msg *msg;
186 struct spdk_poller *poller, *ptmp;
187
188 TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
189 SPDK_ERRLOG("thread %s still has channel for io_device %s\n",
190 thread->name, ch->dev->name);
191 }
192
193 TAILQ_FOREACH_SAFE(poller, &thread->active_pollers, tailq, ptmp) {
f67539c2
TL
194 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
195 SPDK_WARNLOG("poller %s still registered at thread exit\n",
196 poller->name);
9f95a23c 197 }
9f95a23c
TL
198 TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
199 free(poller);
200 }
201
f67539c2
TL
202 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, ptmp) {
203 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
204 SPDK_WARNLOG("poller %s still registered at thread exit\n",
205 poller->name);
9f95a23c 206 }
f67539c2
TL
207 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
208 free(poller);
209 }
9f95a23c 210
f67539c2
TL
211 TAILQ_FOREACH_SAFE(poller, &thread->paused_pollers, tailq, ptmp) {
212 SPDK_WARNLOG("poller %s still registered at thread exit\n", poller->name);
213 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
9f95a23c
TL
214 free(poller);
215 }
11fdf7f2
TL
216
217 pthread_mutex_lock(&g_devlist_mutex);
9f95a23c
TL
218 assert(g_thread_count > 0);
219 g_thread_count--;
220 TAILQ_REMOVE(&g_threads, thread, tailq);
221 pthread_mutex_unlock(&g_devlist_mutex);
11fdf7f2 222
9f95a23c
TL
223 msg = SLIST_FIRST(&thread->msg_cache);
224 while (msg != NULL) {
225 SLIST_REMOVE_HEAD(&thread->msg_cache, link);
226
227 assert(thread->msg_cache_count > 0);
228 thread->msg_cache_count--;
229 spdk_mempool_put(g_spdk_msg_mempool, msg);
230
231 msg = SLIST_FIRST(&thread->msg_cache);
11fdf7f2
TL
232 }
233
9f95a23c
TL
234 assert(thread->msg_cache_count == 0);
235
236 spdk_ring_free(thread->messages);
237 free(thread);
238}
239
240struct spdk_thread *
241spdk_thread_create(const char *name, struct spdk_cpuset *cpumask)
242{
243 struct spdk_thread *thread;
244 struct spdk_msg *msgs[SPDK_MSG_MEMPOOL_CACHE_SIZE];
f67539c2 245 int rc = 0, i;
9f95a23c
TL
246
247 thread = calloc(1, sizeof(*thread) + g_ctx_sz);
11fdf7f2
TL
248 if (!thread) {
249 SPDK_ERRLOG("Unable to allocate memory for thread\n");
11fdf7f2
TL
250 return NULL;
251 }
252
9f95a23c 253 if (cpumask) {
f67539c2 254 spdk_cpuset_copy(&thread->cpumask, cpumask);
9f95a23c 255 } else {
f67539c2 256 spdk_cpuset_negate(&thread->cpumask);
9f95a23c
TL
257 }
258
11fdf7f2 259 TAILQ_INIT(&thread->io_channels);
9f95a23c 260 TAILQ_INIT(&thread->active_pollers);
f67539c2
TL
261 TAILQ_INIT(&thread->timed_pollers);
262 TAILQ_INIT(&thread->paused_pollers);
9f95a23c
TL
263 SLIST_INIT(&thread->msg_cache);
264 thread->msg_cache_count = 0;
265
266 thread->tsc_last = spdk_get_ticks();
267
268 thread->messages = spdk_ring_create(SPDK_RING_TYPE_MP_SC, 65536, SPDK_ENV_SOCKET_ID_ANY);
269 if (!thread->messages) {
270 SPDK_ERRLOG("Unable to allocate memory for message ring\n");
9f95a23c
TL
271 free(thread);
272 return NULL;
273 }
274
275 /* Fill the local message pool cache. */
276 rc = spdk_mempool_get_bulk(g_spdk_msg_mempool, (void **)msgs, SPDK_MSG_MEMPOOL_CACHE_SIZE);
277 if (rc == 0) {
278 /* If we can't populate the cache it's ok. The cache will get filled
279 * up organically as messages are passed to the thread. */
280 for (i = 0; i < SPDK_MSG_MEMPOOL_CACHE_SIZE; i++) {
281 SLIST_INSERT_HEAD(&thread->msg_cache, msgs[i], link);
282 thread->msg_cache_count++;
283 }
284 }
285
11fdf7f2 286 if (name) {
f67539c2 287 snprintf(thread->name, sizeof(thread->name), "%s", name);
11fdf7f2 288 } else {
f67539c2 289 snprintf(thread->name, sizeof(thread->name), "%p", thread);
11fdf7f2
TL
290 }
291
9f95a23c 292 pthread_mutex_lock(&g_devlist_mutex);
f67539c2
TL
293 if (g_thread_id == 0) {
294 SPDK_ERRLOG("Thread ID rolled over. Further thread creation is not allowed.\n");
295 pthread_mutex_unlock(&g_devlist_mutex);
296 _free_thread(thread);
297 return NULL;
298 }
299 thread->id = g_thread_id++;
9f95a23c
TL
300 TAILQ_INSERT_TAIL(&g_threads, thread, tailq);
301 g_thread_count++;
11fdf7f2
TL
302 pthread_mutex_unlock(&g_devlist_mutex);
303
f67539c2
TL
304 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Allocating new thread (%" PRIu64 ", %s)\n",
305 thread->id, thread->name);
306
9f95a23c
TL
307 if (g_new_thread_fn) {
308 rc = g_new_thread_fn(thread);
f67539c2
TL
309 } else if (g_thread_op_supported_fn && g_thread_op_supported_fn(SPDK_THREAD_OP_NEW)) {
310 rc = g_thread_op_fn(thread, SPDK_THREAD_OP_NEW);
9f95a23c
TL
311 }
312
f67539c2
TL
313 if (rc != 0) {
314 _free_thread(thread);
315 return NULL;
316 }
317
318 thread->state = SPDK_THREAD_STATE_RUNNING;
319
11fdf7f2
TL
320 return thread;
321}
322
323void
9f95a23c 324spdk_set_thread(struct spdk_thread *thread)
11fdf7f2 325{
9f95a23c
TL
326 tls_thread = thread;
327}
11fdf7f2 328
f67539c2
TL
329static void
330thread_exit(struct spdk_thread *thread, uint64_t now)
331{
332 struct spdk_poller *poller;
333 struct spdk_io_channel *ch;
334
335 if (now >= thread->exit_timeout_tsc) {
336 SPDK_ERRLOG("thread %s got timeout, and move it to the exited state forcefully\n",
337 thread->name);
338 goto exited;
339 }
340
341 TAILQ_FOREACH(poller, &thread->active_pollers, tailq) {
342 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
343 SPDK_INFOLOG(SPDK_LOG_THREAD,
344 "thread %s still has active poller %s\n",
345 thread->name, poller->name);
346 return;
347 }
348 }
349
350 TAILQ_FOREACH(poller, &thread->timed_pollers, tailq) {
351 if (poller->state != SPDK_POLLER_STATE_UNREGISTERED) {
352 SPDK_INFOLOG(SPDK_LOG_THREAD,
353 "thread %s still has active timed poller %s\n",
354 thread->name, poller->name);
355 return;
356 }
357 }
358
359 TAILQ_FOREACH(poller, &thread->paused_pollers, tailq) {
360 SPDK_INFOLOG(SPDK_LOG_THREAD,
361 "thread %s still has paused poller %s\n",
362 thread->name, poller->name);
363 return;
364 }
365
366 TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
367 SPDK_INFOLOG(SPDK_LOG_THREAD,
368 "thread %s still has channel for io_device %s\n",
369 thread->name, ch->dev->name);
370 return;
371 }
372
373exited:
374 thread->state = SPDK_THREAD_STATE_EXITED;
375}
376
377int
9f95a23c
TL
378spdk_thread_exit(struct spdk_thread *thread)
379{
380 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Exit thread %s\n", thread->name);
11fdf7f2 381
9f95a23c
TL
382 assert(tls_thread == thread);
383
f67539c2
TL
384 if (thread->state >= SPDK_THREAD_STATE_EXITING) {
385 SPDK_INFOLOG(SPDK_LOG_THREAD,
386 "thread %s is already exiting\n",
387 thread->name);
388 return 0;
389 }
390
391 thread->exit_timeout_tsc = spdk_get_ticks() + (spdk_get_ticks_hz() *
392 SPDK_THREAD_EXIT_TIMEOUT_SEC);
393 thread->state = SPDK_THREAD_STATE_EXITING;
394 return 0;
395}
396
397bool
398spdk_thread_is_exited(struct spdk_thread *thread)
399{
400 return thread->state == SPDK_THREAD_STATE_EXITED;
9f95a23c
TL
401}
402
403void
404spdk_thread_destroy(struct spdk_thread *thread)
405{
406 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Destroy thread %s\n", thread->name);
407
f67539c2 408 assert(thread->state == SPDK_THREAD_STATE_EXITED);
9f95a23c
TL
409
410 if (tls_thread == thread) {
411 tls_thread = NULL;
11fdf7f2
TL
412 }
413
9f95a23c
TL
414 _free_thread(thread);
415}
11fdf7f2 416
9f95a23c
TL
417void *
418spdk_thread_get_ctx(struct spdk_thread *thread)
419{
420 if (g_ctx_sz > 0) {
421 return thread->ctx;
422 }
11fdf7f2 423
9f95a23c
TL
424 return NULL;
425}
426
427struct spdk_cpuset *
428spdk_thread_get_cpumask(struct spdk_thread *thread)
429{
f67539c2
TL
430 return &thread->cpumask;
431}
432
433int
434spdk_thread_set_cpumask(struct spdk_cpuset *cpumask)
435{
436 struct spdk_thread *thread;
437
438 if (!g_thread_op_supported_fn || !g_thread_op_supported_fn(SPDK_THREAD_OP_RESCHED)) {
439 SPDK_ERRLOG("Framework does not support reschedule operation.\n");
440 assert(false);
441 return -ENOTSUP;
442 }
443
444 thread = spdk_get_thread();
445 if (!thread) {
446 SPDK_ERRLOG("Called from non-SPDK thread\n");
447 assert(false);
448 return -EINVAL;
449 }
450
451 spdk_cpuset_copy(&thread->cpumask, cpumask);
452
453 /* Invoke framework's reschedule operation. If this function is called multiple times
454 * in a single spdk_thread_poll() context, the last cpumask will be used in the
455 * reschedule operation.
456 */
457 g_thread_op_fn(thread, SPDK_THREAD_OP_RESCHED);
458
459 return 0;
9f95a23c
TL
460}
461
462struct spdk_thread *
463spdk_thread_get_from_ctx(void *ctx)
464{
465 if (ctx == NULL) {
466 assert(false);
467 return NULL;
468 }
469
470 assert(g_ctx_sz > 0);
471
472 return SPDK_CONTAINEROF(ctx, struct spdk_thread, ctx);
473}
474
475static inline uint32_t
f67539c2 476msg_queue_run_batch(struct spdk_thread *thread, uint32_t max_msgs)
9f95a23c
TL
477{
478 unsigned count, i;
479 void *messages[SPDK_MSG_BATCH_SIZE];
480
481#ifdef DEBUG
482 /*
483 * spdk_ring_dequeue() fills messages and returns how many entries it wrote,
484 * so we will never actually read uninitialized data from events, but just to be sure
485 * (and to silence a static analyzer false positive), initialize the array to NULL pointers.
486 */
487 memset(messages, 0, sizeof(messages));
488#endif
489
490 if (max_msgs > 0) {
491 max_msgs = spdk_min(max_msgs, SPDK_MSG_BATCH_SIZE);
492 } else {
493 max_msgs = SPDK_MSG_BATCH_SIZE;
494 }
495
496 count = spdk_ring_dequeue(thread->messages, messages, max_msgs);
497 if (count == 0) {
498 return 0;
499 }
500
501 for (i = 0; i < count; i++) {
502 struct spdk_msg *msg = messages[i];
503
504 assert(msg != NULL);
505 msg->fn(msg->arg);
506
9f95a23c
TL
507 if (thread->msg_cache_count < SPDK_MSG_MEMPOOL_CACHE_SIZE) {
508 /* Insert the messages at the head. We want to re-use the hot
509 * ones. */
510 SLIST_INSERT_HEAD(&thread->msg_cache, msg, link);
511 thread->msg_cache_count++;
512 } else {
513 spdk_mempool_put(g_spdk_msg_mempool, msg);
514 }
515 }
516
517 return count;
518}
519
520static void
f67539c2 521poller_insert_timer(struct spdk_thread *thread, struct spdk_poller *poller, uint64_t now)
9f95a23c
TL
522{
523 struct spdk_poller *iter;
524
525 poller->next_run_tick = now + poller->period_ticks;
526
527 /*
f67539c2 528 * Insert poller in the thread's timed_pollers list in sorted order by next scheduled
9f95a23c
TL
529 * run time.
530 */
f67539c2 531 TAILQ_FOREACH_REVERSE(iter, &thread->timed_pollers, timed_pollers_head, tailq) {
9f95a23c 532 if (iter->next_run_tick <= poller->next_run_tick) {
f67539c2 533 TAILQ_INSERT_AFTER(&thread->timed_pollers, iter, poller, tailq);
9f95a23c
TL
534 return;
535 }
536 }
537
538 /* No earlier pollers were found, so this poller must be the new head */
f67539c2 539 TAILQ_INSERT_HEAD(&thread->timed_pollers, poller, tailq);
9f95a23c
TL
540}
541
f67539c2
TL
542static void
543thread_insert_poller(struct spdk_thread *thread, struct spdk_poller *poller)
544{
545 if (poller->period_ticks) {
546 poller_insert_timer(thread, poller, spdk_get_ticks());
547 } else {
548 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
549 }
550}
551
552static inline void
553thread_update_stats(struct spdk_thread *thread, uint64_t end,
554 uint64_t start, int rc)
555{
556 if (rc == 0) {
557 /* Poller status idle */
558 thread->stats.idle_tsc += end - start;
559 } else if (rc > 0) {
560 /* Poller status busy */
561 thread->stats.busy_tsc += end - start;
562 }
563 /* Store end time to use it as start time of the next spdk_thread_poll(). */
564 thread->tsc_last = end;
565}
566
567static int
568thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
9f95a23c
TL
569{
570 uint32_t msg_count;
9f95a23c 571 struct spdk_poller *poller, *tmp;
f67539c2 572 spdk_msg_fn critical_msg;
9f95a23c
TL
573 int rc = 0;
574
f67539c2
TL
575 critical_msg = thread->critical_msg;
576 if (spdk_unlikely(critical_msg != NULL)) {
577 critical_msg(NULL);
578 thread->critical_msg = NULL;
9f95a23c
TL
579 }
580
f67539c2 581 msg_count = msg_queue_run_batch(thread, max_msgs);
9f95a23c
TL
582 if (msg_count) {
583 rc = 1;
584 }
585
586 TAILQ_FOREACH_REVERSE_SAFE(poller, &thread->active_pollers,
587 active_pollers_head, tailq, tmp) {
588 int poller_rc;
589
9f95a23c
TL
590 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
591 TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
592 free(poller);
593 continue;
f67539c2
TL
594 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
595 TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
596 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
597 poller->state = SPDK_POLLER_STATE_PAUSED;
598 continue;
9f95a23c
TL
599 }
600
601 poller->state = SPDK_POLLER_STATE_RUNNING;
602 poller_rc = poller->fn(poller->arg);
603
f67539c2
TL
604 poller->run_count++;
605 if (poller_rc > 0) {
606 poller->busy_count++;
9f95a23c
TL
607 }
608
9f95a23c
TL
609#ifdef DEBUG
610 if (poller_rc == -1) {
f67539c2 611 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Poller %s returned -1\n", poller->name);
9f95a23c
TL
612 }
613#endif
614
f67539c2
TL
615 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
616 TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
617 free(poller);
618 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
619 poller->state = SPDK_POLLER_STATE_WAITING;
620 }
621
9f95a23c
TL
622 if (poller_rc > rc) {
623 rc = poller_rc;
624 }
9f95a23c
TL
625 }
626
f67539c2 627 TAILQ_FOREACH_SAFE(poller, &thread->timed_pollers, tailq, tmp) {
9f95a23c
TL
628 int timer_rc = 0;
629
9f95a23c 630 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
f67539c2 631 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
9f95a23c
TL
632 free(poller);
633 continue;
f67539c2
TL
634 } else if (poller->state == SPDK_POLLER_STATE_PAUSING) {
635 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
636 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
637 poller->state = SPDK_POLLER_STATE_PAUSED;
638 continue;
9f95a23c
TL
639 }
640
641 if (now < poller->next_run_tick) {
642 break;
643 }
644
645 poller->state = SPDK_POLLER_STATE_RUNNING;
646 timer_rc = poller->fn(poller->arg);
647
f67539c2
TL
648 poller->run_count++;
649 if (timer_rc > 0) {
650 poller->busy_count++;
9f95a23c
TL
651 }
652
9f95a23c
TL
653#ifdef DEBUG
654 if (timer_rc == -1) {
f67539c2 655 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Timed poller %s returned -1\n", poller->name);
9f95a23c
TL
656 }
657#endif
658
f67539c2
TL
659 if (poller->state == SPDK_POLLER_STATE_UNREGISTERED) {
660 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
661 free(poller);
662 } else if (poller->state != SPDK_POLLER_STATE_PAUSED) {
663 poller->state = SPDK_POLLER_STATE_WAITING;
664 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
665 poller_insert_timer(thread, poller, now);
666 }
667
9f95a23c
TL
668 if (timer_rc > rc) {
669 rc = timer_rc;
9f95a23c
TL
670 }
671 }
672
f67539c2
TL
673 return rc;
674}
675
676int
677spdk_thread_poll(struct spdk_thread *thread, uint32_t max_msgs, uint64_t now)
678{
679 struct spdk_thread *orig_thread;
680 int rc;
681
682 orig_thread = _get_thread();
683 tls_thread = thread;
684
685 if (now == 0) {
686 now = spdk_get_ticks();
687 }
688
689 rc = thread_poll(thread, max_msgs, now);
690
691 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITING)) {
692 thread_exit(thread, now);
9f95a23c 693 }
f67539c2
TL
694
695 thread_update_stats(thread, spdk_get_ticks(), now, rc);
9f95a23c
TL
696
697 tls_thread = orig_thread;
698
699 return rc;
700}
701
702uint64_t
703spdk_thread_next_poller_expiration(struct spdk_thread *thread)
704{
705 struct spdk_poller *poller;
706
f67539c2 707 poller = TAILQ_FIRST(&thread->timed_pollers);
9f95a23c
TL
708 if (poller) {
709 return poller->next_run_tick;
710 }
711
712 return 0;
713}
714
715int
716spdk_thread_has_active_pollers(struct spdk_thread *thread)
717{
718 return !TAILQ_EMPTY(&thread->active_pollers);
719}
720
f67539c2
TL
721static bool
722thread_has_unpaused_pollers(struct spdk_thread *thread)
723{
724 if (TAILQ_EMPTY(&thread->active_pollers) &&
725 TAILQ_EMPTY(&thread->timed_pollers)) {
726 return false;
727 }
728
729 return true;
730}
731
9f95a23c
TL
732bool
733spdk_thread_has_pollers(struct spdk_thread *thread)
734{
f67539c2
TL
735 if (!thread_has_unpaused_pollers(thread) &&
736 TAILQ_EMPTY(&thread->paused_pollers)) {
9f95a23c
TL
737 return false;
738 }
739
740 return true;
741}
742
743bool
744spdk_thread_is_idle(struct spdk_thread *thread)
745{
746 if (spdk_ring_count(thread->messages) ||
f67539c2
TL
747 thread_has_unpaused_pollers(thread) ||
748 thread->critical_msg != NULL) {
9f95a23c
TL
749 return false;
750 }
751
752 return true;
11fdf7f2
TL
753}
754
755uint32_t
756spdk_thread_get_count(void)
757{
758 /*
759 * Return cached value of the current thread count. We could acquire the
760 * lock and iterate through the TAILQ of threads to count them, but that
761 * count could still be invalidated after we release the lock.
762 */
763 return g_thread_count;
764}
765
766struct spdk_thread *
767spdk_get_thread(void)
768{
f67539c2 769 return _get_thread();
11fdf7f2
TL
770}
771
772const char *
773spdk_thread_get_name(const struct spdk_thread *thread)
774{
775 return thread->name;
776}
777
f67539c2
TL
778uint64_t
779spdk_thread_get_id(const struct spdk_thread *thread)
780{
781 return thread->id;
782}
783
784struct spdk_thread *
785spdk_thread_get_by_id(uint64_t id)
786{
787 struct spdk_thread *thread;
788
789 pthread_mutex_lock(&g_devlist_mutex);
790 TAILQ_FOREACH(thread, &g_threads, tailq) {
791 if (thread->id == id) {
792 pthread_mutex_unlock(&g_devlist_mutex);
793
794 return thread;
795 }
796 }
797 pthread_mutex_unlock(&g_devlist_mutex);
798
799 return NULL;
800}
801
9f95a23c
TL
802int
803spdk_thread_get_stats(struct spdk_thread_stats *stats)
11fdf7f2 804{
9f95a23c
TL
805 struct spdk_thread *thread;
806
807 thread = _get_thread();
808 if (!thread) {
809 SPDK_ERRLOG("No thread allocated\n");
810 return -EINVAL;
811 }
812
813 if (stats == NULL) {
814 return -EINVAL;
815 }
816
817 *stats = thread->stats;
818
819 return 0;
11fdf7f2
TL
820}
821
f67539c2
TL
822uint64_t
823spdk_thread_get_last_tsc(struct spdk_thread *thread)
824{
825 return thread->tsc_last;
826}
827
828int
9f95a23c
TL
829spdk_thread_send_msg(const struct spdk_thread *thread, spdk_msg_fn fn, void *ctx)
830{
831 struct spdk_thread *local_thread;
832 struct spdk_msg *msg;
833 int rc;
834
f67539c2
TL
835 assert(thread != NULL);
836
837 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
838 SPDK_ERRLOG("Thread %s is marked as exited.\n", thread->name);
839 return -EIO;
9f95a23c
TL
840 }
841
842 local_thread = _get_thread();
843
844 msg = NULL;
845 if (local_thread != NULL) {
846 if (local_thread->msg_cache_count > 0) {
847 msg = SLIST_FIRST(&local_thread->msg_cache);
848 assert(msg != NULL);
849 SLIST_REMOVE_HEAD(&local_thread->msg_cache, link);
850 local_thread->msg_cache_count--;
851 }
852 }
853
854 if (msg == NULL) {
855 msg = spdk_mempool_get(g_spdk_msg_mempool);
856 if (!msg) {
f67539c2
TL
857 SPDK_ERRLOG("msg could not be allocated\n");
858 return -ENOMEM;
9f95a23c
TL
859 }
860 }
861
862 msg->fn = fn;
863 msg->arg = ctx;
864
865 rc = spdk_ring_enqueue(thread->messages, (void **)&msg, 1, NULL);
866 if (rc != 1) {
f67539c2 867 SPDK_ERRLOG("msg could not be enqueued\n");
9f95a23c 868 spdk_mempool_put(g_spdk_msg_mempool, msg);
f67539c2 869 return -EIO;
9f95a23c 870 }
f67539c2
TL
871
872 return 0;
9f95a23c 873}
11fdf7f2 874
f67539c2
TL
875int
876spdk_thread_send_critical_msg(struct spdk_thread *thread, spdk_msg_fn fn)
877{
878 spdk_msg_fn expected = NULL;
879
880 if (__atomic_compare_exchange_n(&thread->critical_msg, &expected, fn, false, __ATOMIC_SEQ_CST,
881 __ATOMIC_SEQ_CST)) {
882 return 0;
883 }
884
885 return -EIO;
886}
887
888static struct spdk_poller *
889poller_register(spdk_poller_fn fn,
890 void *arg,
891 uint64_t period_microseconds,
892 const char *name)
11fdf7f2
TL
893{
894 struct spdk_thread *thread;
895 struct spdk_poller *poller;
9f95a23c 896 uint64_t quotient, remainder, ticks;
11fdf7f2
TL
897
898 thread = spdk_get_thread();
899 if (!thread) {
900 assert(false);
901 return NULL;
902 }
903
f67539c2
TL
904 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
905 SPDK_ERRLOG("thread %s is marked as exited\n", thread->name);
906 return NULL;
907 }
908
9f95a23c
TL
909 poller = calloc(1, sizeof(*poller));
910 if (poller == NULL) {
911 SPDK_ERRLOG("Poller memory allocation failed\n");
11fdf7f2
TL
912 return NULL;
913 }
914
f67539c2
TL
915 if (name) {
916 snprintf(poller->name, sizeof(poller->name), "%s", name);
917 } else {
918 snprintf(poller->name, sizeof(poller->name), "%p", fn);
919 }
920
9f95a23c
TL
921 poller->state = SPDK_POLLER_STATE_WAITING;
922 poller->fn = fn;
923 poller->arg = arg;
f67539c2 924 poller->thread = thread;
9f95a23c
TL
925
926 if (period_microseconds) {
927 quotient = period_microseconds / SPDK_SEC_TO_USEC;
928 remainder = period_microseconds % SPDK_SEC_TO_USEC;
929 ticks = spdk_get_ticks_hz();
930
931 poller->period_ticks = ticks * quotient + (ticks * remainder) / SPDK_SEC_TO_USEC;
932 } else {
933 poller->period_ticks = 0;
934 }
935
f67539c2 936 thread_insert_poller(thread, poller);
11fdf7f2
TL
937
938 return poller;
939}
940
f67539c2
TL
941struct spdk_poller *
942spdk_poller_register(spdk_poller_fn fn,
943 void *arg,
944 uint64_t period_microseconds)
945{
946 return poller_register(fn, arg, period_microseconds, NULL);
947}
948
949struct spdk_poller *
950spdk_poller_register_named(spdk_poller_fn fn,
951 void *arg,
952 uint64_t period_microseconds,
953 const char *name)
954{
955 return poller_register(fn, arg, period_microseconds, name);
956}
957
11fdf7f2
TL
958void
959spdk_poller_unregister(struct spdk_poller **ppoller)
960{
961 struct spdk_thread *thread;
962 struct spdk_poller *poller;
963
964 poller = *ppoller;
965 if (poller == NULL) {
966 return;
967 }
968
969 *ppoller = NULL;
970
971 thread = spdk_get_thread();
9f95a23c
TL
972 if (!thread) {
973 assert(false);
974 return;
11fdf7f2 975 }
9f95a23c 976
f67539c2
TL
977 if (poller->thread != thread) {
978 SPDK_ERRLOG("different from the thread that called spdk_poller_register()\n");
979 assert(false);
980 return;
981 }
982
983 /* If the poller was paused, put it on the active_pollers list so that
984 * its unregistration can be processed by spdk_thread_poll().
985 */
986 if (poller->state == SPDK_POLLER_STATE_PAUSED) {
987 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
988 TAILQ_INSERT_TAIL(&thread->active_pollers, poller, tailq);
989 poller->period_ticks = 0;
990 }
991
9f95a23c
TL
992 /* Simply set the state to unregistered. The poller will get cleaned up
993 * in a subsequent call to spdk_thread_poll().
994 */
995 poller->state = SPDK_POLLER_STATE_UNREGISTERED;
11fdf7f2
TL
996}
997
f67539c2
TL
998void
999spdk_poller_pause(struct spdk_poller *poller)
1000{
1001 struct spdk_thread *thread;
1002
1003 if (poller->state == SPDK_POLLER_STATE_PAUSED ||
1004 poller->state == SPDK_POLLER_STATE_PAUSING) {
1005 return;
1006 }
1007
1008 thread = spdk_get_thread();
1009 if (!thread) {
1010 assert(false);
1011 return;
1012 }
1013
1014 /* If a poller is paused from within itself, we can immediately move it
1015 * on the paused_pollers list. Otherwise we just set its state to
1016 * SPDK_POLLER_STATE_PAUSING and let spdk_thread_poll() move it. It
1017 * allows a poller to be paused from another one's context without
1018 * breaking the TAILQ_FOREACH_REVERSE_SAFE iteration.
1019 */
1020 if (poller->state != SPDK_POLLER_STATE_RUNNING) {
1021 poller->state = SPDK_POLLER_STATE_PAUSING;
1022 } else {
1023 if (poller->period_ticks > 0) {
1024 TAILQ_REMOVE(&thread->timed_pollers, poller, tailq);
1025 } else {
1026 TAILQ_REMOVE(&thread->active_pollers, poller, tailq);
1027 }
1028
1029 TAILQ_INSERT_TAIL(&thread->paused_pollers, poller, tailq);
1030 poller->state = SPDK_POLLER_STATE_PAUSED;
1031 }
1032}
1033
1034void
1035spdk_poller_resume(struct spdk_poller *poller)
1036{
1037 struct spdk_thread *thread;
1038
1039 if (poller->state != SPDK_POLLER_STATE_PAUSED &&
1040 poller->state != SPDK_POLLER_STATE_PAUSING) {
1041 return;
1042 }
1043
1044 thread = spdk_get_thread();
1045 if (!thread) {
1046 assert(false);
1047 return;
1048 }
1049
1050 /* If a poller is paused it has to be removed from the paused pollers
1051 * list and put on the active / timer list depending on its
1052 * period_ticks. If a poller is still in the process of being paused,
1053 * we just need to flip its state back to waiting, as it's already on
1054 * the appropriate list.
1055 */
1056 if (poller->state == SPDK_POLLER_STATE_PAUSED) {
1057 TAILQ_REMOVE(&thread->paused_pollers, poller, tailq);
1058 thread_insert_poller(thread, poller);
1059 }
1060
1061 poller->state = SPDK_POLLER_STATE_WAITING;
1062}
1063
1064const char *
1065spdk_poller_state_str(enum spdk_poller_state state)
1066{
1067 switch (state) {
1068 case SPDK_POLLER_STATE_WAITING:
1069 return "waiting";
1070 case SPDK_POLLER_STATE_RUNNING:
1071 return "running";
1072 case SPDK_POLLER_STATE_UNREGISTERED:
1073 return "unregistered";
1074 case SPDK_POLLER_STATE_PAUSING:
1075 return "pausing";
1076 case SPDK_POLLER_STATE_PAUSED:
1077 return "paused";
1078 default:
1079 return NULL;
1080 }
1081}
1082
11fdf7f2
TL
1083struct call_thread {
1084 struct spdk_thread *cur_thread;
9f95a23c 1085 spdk_msg_fn fn;
11fdf7f2
TL
1086 void *ctx;
1087
1088 struct spdk_thread *orig_thread;
9f95a23c 1089 spdk_msg_fn cpl;
11fdf7f2
TL
1090};
1091
1092static void
f67539c2 1093_on_thread(void *ctx)
11fdf7f2
TL
1094{
1095 struct call_thread *ct = ctx;
f67539c2 1096 int rc __attribute__((unused));
11fdf7f2
TL
1097
1098 ct->fn(ct->ctx);
1099
1100 pthread_mutex_lock(&g_devlist_mutex);
1101 ct->cur_thread = TAILQ_NEXT(ct->cur_thread, tailq);
1102 pthread_mutex_unlock(&g_devlist_mutex);
1103
1104 if (!ct->cur_thread) {
1105 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Completed thread iteration\n");
1106
f67539c2 1107 rc = spdk_thread_send_msg(ct->orig_thread, ct->cpl, ct->ctx);
11fdf7f2
TL
1108 free(ctx);
1109 } else {
1110 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Continuing thread iteration to %s\n",
1111 ct->cur_thread->name);
1112
f67539c2 1113 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ctx);
11fdf7f2 1114 }
f67539c2 1115 assert(rc == 0);
11fdf7f2
TL
1116}
1117
1118void
9f95a23c 1119spdk_for_each_thread(spdk_msg_fn fn, void *ctx, spdk_msg_fn cpl)
11fdf7f2
TL
1120{
1121 struct call_thread *ct;
9f95a23c 1122 struct spdk_thread *thread;
f67539c2 1123 int rc __attribute__((unused));
11fdf7f2
TL
1124
1125 ct = calloc(1, sizeof(*ct));
1126 if (!ct) {
1127 SPDK_ERRLOG("Unable to perform thread iteration\n");
1128 cpl(ctx);
1129 return;
1130 }
1131
1132 ct->fn = fn;
1133 ct->ctx = ctx;
1134 ct->cpl = cpl;
1135
9f95a23c
TL
1136 thread = _get_thread();
1137 if (!thread) {
1138 SPDK_ERRLOG("No thread allocated\n");
1139 free(ct);
1140 cpl(ctx);
1141 return;
1142 }
1143 ct->orig_thread = thread;
1144
11fdf7f2 1145 pthread_mutex_lock(&g_devlist_mutex);
11fdf7f2
TL
1146 ct->cur_thread = TAILQ_FIRST(&g_threads);
1147 pthread_mutex_unlock(&g_devlist_mutex);
1148
1149 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Starting thread iteration from %s\n",
1150 ct->orig_thread->name);
1151
f67539c2
TL
1152 rc = spdk_thread_send_msg(ct->cur_thread, _on_thread, ct);
1153 assert(rc == 0);
11fdf7f2
TL
1154}
1155
1156void
1157spdk_io_device_register(void *io_device, spdk_io_channel_create_cb create_cb,
1158 spdk_io_channel_destroy_cb destroy_cb, uint32_t ctx_size,
1159 const char *name)
1160{
1161 struct io_device *dev, *tmp;
9f95a23c 1162 struct spdk_thread *thread;
11fdf7f2
TL
1163
1164 assert(io_device != NULL);
1165 assert(create_cb != NULL);
1166 assert(destroy_cb != NULL);
1167
9f95a23c
TL
1168 thread = spdk_get_thread();
1169 if (!thread) {
f67539c2 1170 SPDK_ERRLOG("called from non-SPDK thread\n");
9f95a23c
TL
1171 assert(false);
1172 return;
1173 }
1174
11fdf7f2
TL
1175 dev = calloc(1, sizeof(struct io_device));
1176 if (dev == NULL) {
1177 SPDK_ERRLOG("could not allocate io_device\n");
1178 return;
1179 }
1180
1181 dev->io_device = io_device;
1182 if (name) {
f67539c2 1183 snprintf(dev->name, sizeof(dev->name), "%s", name);
11fdf7f2 1184 } else {
f67539c2 1185 snprintf(dev->name, sizeof(dev->name), "%p", dev);
11fdf7f2
TL
1186 }
1187 dev->create_cb = create_cb;
1188 dev->destroy_cb = destroy_cb;
1189 dev->unregister_cb = NULL;
1190 dev->ctx_size = ctx_size;
1191 dev->for_each_count = 0;
1192 dev->unregistered = false;
1193 dev->refcnt = 0;
1194
1195 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Registering io_device %s (%p) on thread %s\n",
9f95a23c 1196 dev->name, dev->io_device, thread->name);
11fdf7f2
TL
1197
1198 pthread_mutex_lock(&g_devlist_mutex);
1199 TAILQ_FOREACH(tmp, &g_io_devices, tailq) {
1200 if (tmp->io_device == io_device) {
9f95a23c
TL
1201 SPDK_ERRLOG("io_device %p already registered (old:%s new:%s)\n",
1202 io_device, tmp->name, dev->name);
11fdf7f2
TL
1203 free(dev);
1204 pthread_mutex_unlock(&g_devlist_mutex);
1205 return;
1206 }
1207 }
1208 TAILQ_INSERT_TAIL(&g_io_devices, dev, tailq);
1209 pthread_mutex_unlock(&g_devlist_mutex);
1210}
1211
1212static void
1213_finish_unregister(void *arg)
1214{
1215 struct io_device *dev = arg;
1216
1217 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Finishing unregistration of io_device %s (%p) on thread %s\n",
1218 dev->name, dev->io_device, dev->unregister_thread->name);
1219
1220 dev->unregister_cb(dev->io_device);
11fdf7f2
TL
1221 free(dev);
1222}
1223
1224static void
f67539c2 1225io_device_free(struct io_device *dev)
11fdf7f2 1226{
f67539c2
TL
1227 int rc __attribute__((unused));
1228
11fdf7f2 1229 if (dev->unregister_cb == NULL) {
11fdf7f2
TL
1230 free(dev);
1231 } else {
1232 assert(dev->unregister_thread != NULL);
1233 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "io_device %s (%p) needs to unregister from thread %s\n",
1234 dev->name, dev->io_device, dev->unregister_thread->name);
f67539c2
TL
1235 rc = spdk_thread_send_msg(dev->unregister_thread, _finish_unregister, dev);
1236 assert(rc == 0);
11fdf7f2
TL
1237 }
1238}
1239
1240void
1241spdk_io_device_unregister(void *io_device, spdk_io_device_unregister_cb unregister_cb)
1242{
1243 struct io_device *dev;
1244 uint32_t refcnt;
1245 struct spdk_thread *thread;
1246
1247 thread = spdk_get_thread();
9f95a23c 1248 if (!thread) {
f67539c2 1249 SPDK_ERRLOG("called from non-SPDK thread\n");
9f95a23c
TL
1250 assert(false);
1251 return;
1252 }
11fdf7f2
TL
1253
1254 pthread_mutex_lock(&g_devlist_mutex);
1255 TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1256 if (dev->io_device == io_device) {
1257 break;
1258 }
1259 }
1260
1261 if (!dev) {
1262 SPDK_ERRLOG("io_device %p not found\n", io_device);
1263 assert(false);
1264 pthread_mutex_unlock(&g_devlist_mutex);
1265 return;
1266 }
1267
1268 if (dev->for_each_count > 0) {
9f95a23c
TL
1269 SPDK_ERRLOG("io_device %s (%p) has %u for_each calls outstanding\n",
1270 dev->name, io_device, dev->for_each_count);
11fdf7f2
TL
1271 pthread_mutex_unlock(&g_devlist_mutex);
1272 return;
1273 }
1274
1275 dev->unregister_cb = unregister_cb;
1276 dev->unregistered = true;
1277 TAILQ_REMOVE(&g_io_devices, dev, tailq);
1278 refcnt = dev->refcnt;
1279 dev->unregister_thread = thread;
1280 pthread_mutex_unlock(&g_devlist_mutex);
1281
1282 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Unregistering io_device %s (%p) from thread %s\n",
1283 dev->name, dev->io_device, thread->name);
1284
1285 if (refcnt > 0) {
1286 /* defer deletion */
1287 return;
1288 }
1289
f67539c2
TL
1290 io_device_free(dev);
1291}
1292
1293const char *
1294spdk_io_device_get_name(struct io_device *dev)
1295{
1296 return dev->name;
11fdf7f2
TL
1297}
1298
1299struct spdk_io_channel *
1300spdk_get_io_channel(void *io_device)
1301{
1302 struct spdk_io_channel *ch;
1303 struct spdk_thread *thread;
1304 struct io_device *dev;
1305 int rc;
1306
1307 pthread_mutex_lock(&g_devlist_mutex);
1308 TAILQ_FOREACH(dev, &g_io_devices, tailq) {
1309 if (dev->io_device == io_device) {
1310 break;
1311 }
1312 }
1313 if (dev == NULL) {
1314 SPDK_ERRLOG("could not find io_device %p\n", io_device);
1315 pthread_mutex_unlock(&g_devlist_mutex);
1316 return NULL;
1317 }
1318
1319 thread = _get_thread();
1320 if (!thread) {
1321 SPDK_ERRLOG("No thread allocated\n");
1322 pthread_mutex_unlock(&g_devlist_mutex);
1323 return NULL;
1324 }
1325
f67539c2
TL
1326 if (spdk_unlikely(thread->state == SPDK_THREAD_STATE_EXITED)) {
1327 SPDK_ERRLOG("Thread %s is marked as exited\n", thread->name);
1328 pthread_mutex_unlock(&g_devlist_mutex);
1329 return NULL;
1330 }
1331
11fdf7f2
TL
1332 TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1333 if (ch->dev == dev) {
1334 ch->ref++;
1335
1336 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1337 ch, dev->name, dev->io_device, thread->name, ch->ref);
1338
1339 /*
1340 * An I/O channel already exists for this device on this
1341 * thread, so return it.
1342 */
1343 pthread_mutex_unlock(&g_devlist_mutex);
1344 return ch;
1345 }
1346 }
1347
1348 ch = calloc(1, sizeof(*ch) + dev->ctx_size);
1349 if (ch == NULL) {
1350 SPDK_ERRLOG("could not calloc spdk_io_channel\n");
1351 pthread_mutex_unlock(&g_devlist_mutex);
1352 return NULL;
1353 }
1354
1355 ch->dev = dev;
1356 ch->destroy_cb = dev->destroy_cb;
1357 ch->thread = thread;
1358 ch->ref = 1;
1359 ch->destroy_ref = 0;
1360 TAILQ_INSERT_TAIL(&thread->io_channels, ch, tailq);
1361
1362 SPDK_DEBUGLOG(SPDK_LOG_THREAD, "Get io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
1363 ch, dev->name, dev->io_device, thread->name, ch->ref);
1364
1365 dev->refcnt++;
1366
1367 pthread_mutex_unlock(&g_devlist_mutex);
1368
1369 rc = dev->create_cb(io_device, (uint8_t *)ch + sizeof(*ch));
9f95a23c 1370 if (rc != 0) {
11fdf7f2
TL
1371 pthread_mutex_lock(&g_devlist_mutex);
1372 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1373 dev->refcnt--;
1374 free(ch);
1375 pthread_mutex_unlock(&g_devlist_mutex);
1376 return NULL;
1377 }
1378
1379 return ch;
1380}
1381
1382static void
f67539c2 1383put_io_channel(void *arg)
11fdf7f2
TL
1384{
1385 struct spdk_io_channel *ch = arg;
1386 bool do_remove_dev = true;
9f95a23c
TL
1387 struct spdk_thread *thread;
1388
1389 thread = spdk_get_thread();
1390 if (!thread) {
f67539c2 1391 SPDK_ERRLOG("called from non-SPDK thread\n");
9f95a23c
TL
1392 assert(false);
1393 return;
1394 }
11fdf7f2
TL
1395
1396 SPDK_DEBUGLOG(SPDK_LOG_THREAD,
f67539c2
TL
1397 "Releasing io_channel %p for io_device %s (%p) on thread %s\n",
1398 ch, ch->dev->name, ch->dev->io_device, thread->name);
11fdf7f2 1399
9f95a23c 1400 assert(ch->thread == thread);
11fdf7f2
TL
1401
1402 ch->destroy_ref--;
1403
1404 if (ch->ref > 0 || ch->destroy_ref > 0) {
1405 /*
1406 * Another reference to the associated io_device was requested
1407 * after this message was sent but before it had a chance to
1408 * execute.
1409 */
1410 return;
1411 }
1412
1413 pthread_mutex_lock(&g_devlist_mutex);
1414 TAILQ_REMOVE(&ch->thread->io_channels, ch, tailq);
1415 pthread_mutex_unlock(&g_devlist_mutex);
1416
1417 /* Don't hold the devlist mutex while the destroy_cb is called. */
1418 ch->destroy_cb(ch->dev->io_device, spdk_io_channel_get_ctx(ch));
1419
1420 pthread_mutex_lock(&g_devlist_mutex);
1421 ch->dev->refcnt--;
1422
1423 if (!ch->dev->unregistered) {
1424 do_remove_dev = false;
1425 }
1426
1427 if (ch->dev->refcnt > 0) {
1428 do_remove_dev = false;
1429 }
1430
1431 pthread_mutex_unlock(&g_devlist_mutex);
1432
1433 if (do_remove_dev) {
f67539c2 1434 io_device_free(ch->dev);
11fdf7f2
TL
1435 }
1436 free(ch);
1437}
1438
1439void
1440spdk_put_io_channel(struct spdk_io_channel *ch)
1441{
f67539c2
TL
1442 struct spdk_thread *thread;
1443 int rc __attribute__((unused));
1444
1445 thread = spdk_get_thread();
1446 if (!thread) {
1447 SPDK_ERRLOG("called from non-SPDK thread\n");
1448 assert(false);
1449 return;
1450 }
1451
1452 if (ch->thread != thread) {
1453 SPDK_ERRLOG("different from the thread that called get_io_channel()\n");
1454 assert(false);
1455 return;
1456 }
1457
11fdf7f2
TL
1458 SPDK_DEBUGLOG(SPDK_LOG_THREAD,
1459 "Putting io_channel %p for io_device %s (%p) on thread %s refcnt %u\n",
f67539c2 1460 ch, ch->dev->name, ch->dev->io_device, thread->name, ch->ref);
11fdf7f2
TL
1461
1462 ch->ref--;
1463
1464 if (ch->ref == 0) {
1465 ch->destroy_ref++;
f67539c2
TL
1466 rc = spdk_thread_send_msg(thread, put_io_channel, ch);
1467 assert(rc == 0);
11fdf7f2
TL
1468 }
1469}
1470
1471struct spdk_io_channel *
1472spdk_io_channel_from_ctx(void *ctx)
1473{
1474 return (struct spdk_io_channel *)((uint8_t *)ctx - sizeof(struct spdk_io_channel));
1475}
1476
1477struct spdk_thread *
1478spdk_io_channel_get_thread(struct spdk_io_channel *ch)
1479{
1480 return ch->thread;
1481}
1482
1483struct spdk_io_channel_iter {
1484 void *io_device;
1485 struct io_device *dev;
1486 spdk_channel_msg fn;
1487 int status;
1488 void *ctx;
1489 struct spdk_io_channel *ch;
1490
1491 struct spdk_thread *cur_thread;
1492
1493 struct spdk_thread *orig_thread;
1494 spdk_channel_for_each_cpl cpl;
1495};
1496
1497void *
1498spdk_io_channel_iter_get_io_device(struct spdk_io_channel_iter *i)
1499{
1500 return i->io_device;
1501}
1502
1503struct spdk_io_channel *
1504spdk_io_channel_iter_get_channel(struct spdk_io_channel_iter *i)
1505{
1506 return i->ch;
1507}
1508
1509void *
1510spdk_io_channel_iter_get_ctx(struct spdk_io_channel_iter *i)
1511{
1512 return i->ctx;
1513}
1514
1515static void
1516_call_completion(void *ctx)
1517{
1518 struct spdk_io_channel_iter *i = ctx;
1519
1520 if (i->cpl != NULL) {
1521 i->cpl(i, i->status);
1522 }
1523 free(i);
1524}
1525
1526static void
1527_call_channel(void *ctx)
1528{
1529 struct spdk_io_channel_iter *i = ctx;
1530 struct spdk_io_channel *ch;
1531
1532 /*
1533 * It is possible that the channel was deleted before this
1534 * message had a chance to execute. If so, skip calling
1535 * the fn() on this thread.
1536 */
1537 pthread_mutex_lock(&g_devlist_mutex);
1538 TAILQ_FOREACH(ch, &i->cur_thread->io_channels, tailq) {
1539 if (ch->dev->io_device == i->io_device) {
1540 break;
1541 }
1542 }
1543 pthread_mutex_unlock(&g_devlist_mutex);
1544
1545 if (ch) {
1546 i->fn(i);
1547 } else {
1548 spdk_for_each_channel_continue(i, 0);
1549 }
1550}
1551
1552void
1553spdk_for_each_channel(void *io_device, spdk_channel_msg fn, void *ctx,
1554 spdk_channel_for_each_cpl cpl)
1555{
1556 struct spdk_thread *thread;
1557 struct spdk_io_channel *ch;
1558 struct spdk_io_channel_iter *i;
f67539c2 1559 int rc __attribute__((unused));
11fdf7f2
TL
1560
1561 i = calloc(1, sizeof(*i));
1562 if (!i) {
1563 SPDK_ERRLOG("Unable to allocate iterator\n");
1564 return;
1565 }
1566
1567 i->io_device = io_device;
1568 i->fn = fn;
1569 i->ctx = ctx;
1570 i->cpl = cpl;
1571
1572 pthread_mutex_lock(&g_devlist_mutex);
1573 i->orig_thread = _get_thread();
1574
1575 TAILQ_FOREACH(thread, &g_threads, tailq) {
1576 TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1577 if (ch->dev->io_device == io_device) {
1578 ch->dev->for_each_count++;
1579 i->dev = ch->dev;
1580 i->cur_thread = thread;
1581 i->ch = ch;
1582 pthread_mutex_unlock(&g_devlist_mutex);
f67539c2
TL
1583 rc = spdk_thread_send_msg(thread, _call_channel, i);
1584 assert(rc == 0);
11fdf7f2
TL
1585 return;
1586 }
1587 }
1588 }
1589
1590 pthread_mutex_unlock(&g_devlist_mutex);
1591
f67539c2
TL
1592 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1593 assert(rc == 0);
11fdf7f2
TL
1594}
1595
1596void
1597spdk_for_each_channel_continue(struct spdk_io_channel_iter *i, int status)
1598{
1599 struct spdk_thread *thread;
1600 struct spdk_io_channel *ch;
f67539c2 1601 int rc __attribute__((unused));
11fdf7f2
TL
1602
1603 assert(i->cur_thread == spdk_get_thread());
1604
1605 i->status = status;
1606
1607 pthread_mutex_lock(&g_devlist_mutex);
1608 if (status) {
1609 goto end;
1610 }
1611 thread = TAILQ_NEXT(i->cur_thread, tailq);
1612 while (thread) {
1613 TAILQ_FOREACH(ch, &thread->io_channels, tailq) {
1614 if (ch->dev->io_device == i->io_device) {
1615 i->cur_thread = thread;
1616 i->ch = ch;
1617 pthread_mutex_unlock(&g_devlist_mutex);
f67539c2
TL
1618 rc = spdk_thread_send_msg(thread, _call_channel, i);
1619 assert(rc == 0);
11fdf7f2
TL
1620 return;
1621 }
1622 }
1623 thread = TAILQ_NEXT(thread, tailq);
1624 }
1625
1626end:
1627 i->dev->for_each_count--;
1628 i->ch = NULL;
1629 pthread_mutex_unlock(&g_devlist_mutex);
1630
f67539c2
TL
1631 rc = spdk_thread_send_msg(i->orig_thread, _call_completion, i);
1632 assert(rc == 0);
11fdf7f2
TL
1633}
1634
1635
1636SPDK_LOG_REGISTER_COMPONENT("thread", SPDK_LOG_THREAD)