]> git.proxmox.com Git - mirror_zfs.git/blame - module/zfs/txg.c
Add visibility in to arc_read
[mirror_zfs.git] / module / zfs / txg.c
CommitLineData
34dc7c2f
BB
1/*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21/*
428870ff 22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
29809a6c 23 * Portions Copyright 2011 Martin Matuska
89103a26 24 * Copyright (c) 2013 by Delphix. All rights reserved.
34dc7c2f
BB
25 */
26
34dc7c2f
BB
27#include <sys/zfs_context.h>
28#include <sys/txg_impl.h>
29#include <sys/dmu_impl.h>
428870ff 30#include <sys/dmu_tx.h>
34dc7c2f 31#include <sys/dsl_pool.h>
428870ff 32#include <sys/dsl_scan.h>
34dc7c2f
BB
33#include <sys/callb.h>
34
35/*
89103a26
AL
36 * ZFS Transaction Groups
37 * ----------------------
38 *
39 * ZFS transaction groups are, as the name implies, groups of transactions
40 * that act on persistent state. ZFS asserts consistency at the granularity of
41 * these transaction groups. Each successive transaction group (txg) is
42 * assigned a 64-bit consecutive identifier. There are three active
43 * transaction group states: open, quiescing, or syncing. At any given time,
44 * there may be an active txg associated with each state; each active txg may
45 * either be processing, or blocked waiting to enter the next state. There may
46 * be up to three active txgs, and there is always a txg in the open state
47 * (though it may be blocked waiting to enter the quiescing state). In broad
48 * strokes, transactions — operations that change in-memory structures — are
49 * accepted into the txg in the open state, and are completed while the txg is
50 * in the open or quiescing states. The accumulated changes are written to
51 * disk in the syncing state.
52 *
53 * Open
54 *
55 * When a new txg becomes active, it first enters the open state. New
56 * transactions — updates to in-memory structures — are assigned to the
57 * currently open txg. There is always a txg in the open state so that ZFS can
58 * accept new changes (though the txg may refuse new changes if it has hit
59 * some limit). ZFS advances the open txg to the next state for a variety of
60 * reasons such as it hitting a time or size threshold, or the execution of an
61 * administrative action that must be completed in the syncing state.
62 *
63 * Quiescing
64 *
65 * After a txg exits the open state, it enters the quiescing state. The
66 * quiescing state is intended to provide a buffer between accepting new
67 * transactions in the open state and writing them out to stable storage in
68 * the syncing state. While quiescing, transactions can continue their
69 * operation without delaying either of the other states. Typically, a txg is
70 * in the quiescing state very briefly since the operations are bounded by
71 * software latencies rather than, say, slower I/O latencies. After all
72 * transactions complete, the txg is ready to enter the next state.
73 *
74 * Syncing
75 *
76 * In the syncing state, the in-memory state built up during the open and (to
77 * a lesser degree) the quiescing states is written to stable storage. The
78 * process of writing out modified data can, in turn modify more data. For
79 * example when we write new blocks, we need to allocate space for them; those
80 * allocations modify metadata (space maps)... which themselves must be
81 * written to stable storage. During the sync state, ZFS iterates, writing out
82 * data until it converges and all in-memory changes have been written out.
83 * The first such pass is the largest as it encompasses all the modified user
84 * data (as opposed to filesystem metadata). Subsequent passes typically have
85 * far less data to write as they consist exclusively of filesystem metadata.
86 *
87 * To ensure convergence, after a certain number of passes ZFS begins
88 * overwriting locations on stable storage that had been allocated earlier in
89 * the syncing state (and subsequently freed). ZFS usually allocates new
90 * blocks to optimize for large, continuous, writes. For the syncing state to
91 * converge however it must complete a pass where no new blocks are allocated
92 * since each allocation requires a modification of persistent metadata.
93 * Further, to hasten convergence, after a prescribed number of passes, ZFS
94 * also defers frees, and stops compressing.
95 *
96 * In addition to writing out user data, we must also execute synctasks during
97 * the syncing context. A synctask is the mechanism by which some
98 * administrative activities work such as creating and destroying snapshots or
99 * datasets. Note that when a synctask is initiated it enters the open txg,
100 * and ZFS then pushes that txg as quickly as possible to completion of the
101 * syncing state in order to reduce the latency of the administrative
102 * activity. To complete the syncing state, ZFS writes out a new uberblock,
103 * the root of the tree of blocks that comprise all state stored on the ZFS
104 * pool. Finally, if there is a quiesced txg waiting, we signal that it can
105 * now transition to the syncing state.
34dc7c2f
BB
106 */
107
108static void txg_sync_thread(dsl_pool_t *dp);
109static void txg_quiesce_thread(dsl_pool_t *dp);
110
572e2857 111int zfs_txg_timeout = 5; /* max seconds worth of delta per txg */
34dc7c2f
BB
112
113/*
114 * Prepare the txg subsystem.
115 */
116void
117txg_init(dsl_pool_t *dp, uint64_t txg)
118{
119 tx_state_t *tx = &dp->dp_tx;
120 int c;
121 bzero(tx, sizeof (tx_state_t));
122
00b46022 123 tx->tx_cpu = vmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
34dc7c2f
BB
124
125 for (c = 0; c < max_ncpus; c++) {
126 int i;
127
128 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
129 for (i = 0; i < TXG_SIZE; i++) {
130 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
131 NULL);
428870ff
BB
132 list_create(&tx->tx_cpu[c].tc_callbacks[i],
133 sizeof (dmu_tx_callback_t),
134 offsetof(dmu_tx_callback_t, dcb_node));
34dc7c2f
BB
135 }
136 }
137
34dc7c2f
BB
138 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
139
fb5f0bc8
BB
140 cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
141 cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
142 cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
143 cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
144 cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
145
34dc7c2f
BB
146 tx->tx_open_txg = txg;
147}
148
149/*
150 * Close down the txg subsystem.
151 */
152void
153txg_fini(dsl_pool_t *dp)
154{
155 tx_state_t *tx = &dp->dp_tx;
156 int c;
157
158 ASSERT(tx->tx_threads == 0);
159
34dc7c2f
BB
160 mutex_destroy(&tx->tx_sync_lock);
161
fb5f0bc8
BB
162 cv_destroy(&tx->tx_sync_more_cv);
163 cv_destroy(&tx->tx_sync_done_cv);
164 cv_destroy(&tx->tx_quiesce_more_cv);
165 cv_destroy(&tx->tx_quiesce_done_cv);
166 cv_destroy(&tx->tx_exit_cv);
167
34dc7c2f
BB
168 for (c = 0; c < max_ncpus; c++) {
169 int i;
170
171 mutex_destroy(&tx->tx_cpu[c].tc_lock);
428870ff 172 for (i = 0; i < TXG_SIZE; i++) {
34dc7c2f 173 cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
428870ff
BB
174 list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
175 }
34dc7c2f
BB
176 }
177
428870ff
BB
178 if (tx->tx_commit_cb_taskq != NULL)
179 taskq_destroy(tx->tx_commit_cb_taskq);
180
00b46022 181 vmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
34dc7c2f
BB
182
183 bzero(tx, sizeof (tx_state_t));
184}
185
186/*
187 * Start syncing transaction groups.
188 */
189void
190txg_sync_start(dsl_pool_t *dp)
191{
192 tx_state_t *tx = &dp->dp_tx;
193
194 mutex_enter(&tx->tx_sync_lock);
195
196 dprintf("pool %p\n", dp);
197
198 ASSERT(tx->tx_threads == 0);
199
200 tx->tx_threads = 2;
201
202 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
203 dp, 0, &p0, TS_RUN, minclsyspri);
204
b128c09f
BB
205 /*
206 * The sync thread can need a larger-than-default stack size on
207 * 32-bit x86. This is due in part to nested pools and
208 * scrub_visitbp() recursion.
209 */
428870ff 210 tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
34dc7c2f
BB
211 dp, 0, &p0, TS_RUN, minclsyspri);
212
213 mutex_exit(&tx->tx_sync_lock);
214}
215
216static void
217txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
218{
219 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
220 mutex_enter(&tx->tx_sync_lock);
221}
222
223static void
224txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
225{
226 ASSERT(*tpp != NULL);
227 *tpp = NULL;
228 tx->tx_threads--;
229 cv_broadcast(&tx->tx_exit_cv);
230 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */
231 thread_exit();
232}
233
234static void
235txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
236{
237 CALLB_CPR_SAFE_BEGIN(cpr);
238
239 if (time)
bfd214af 240 (void) cv_timedwait_interruptible(cv, &tx->tx_sync_lock,
428870ff 241 ddi_get_lbolt() + time);
34dc7c2f 242 else
bfd214af 243 cv_wait_interruptible(cv, &tx->tx_sync_lock);
34dc7c2f
BB
244
245 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
246}
247
248/*
249 * Stop syncing transaction groups.
250 */
251void
252txg_sync_stop(dsl_pool_t *dp)
253{
254 tx_state_t *tx = &dp->dp_tx;
255
256 dprintf("pool %p\n", dp);
257 /*
258 * Finish off any work in progress.
259 */
260 ASSERT(tx->tx_threads == 2);
428870ff
BB
261
262 /*
263 * We need to ensure that we've vacated the deferred space_maps.
264 */
265 txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
34dc7c2f
BB
266
267 /*
268 * Wake all sync threads and wait for them to die.
269 */
270 mutex_enter(&tx->tx_sync_lock);
271
272 ASSERT(tx->tx_threads == 2);
273
274 tx->tx_exiting = 1;
275
276 cv_broadcast(&tx->tx_quiesce_more_cv);
277 cv_broadcast(&tx->tx_quiesce_done_cv);
278 cv_broadcast(&tx->tx_sync_more_cv);
279
280 while (tx->tx_threads != 0)
281 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
282
283 tx->tx_exiting = 0;
284
285 mutex_exit(&tx->tx_sync_lock);
286}
287
288uint64_t
289txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
290{
291 tx_state_t *tx = &dp->dp_tx;
15a9e033 292 tx_cpu_t *tc;
34dc7c2f
BB
293 uint64_t txg;
294
15a9e033
PS
295 /*
296 * It appears the processor id is simply used as a "random"
297 * number to index into the array, and there isn't any other
298 * significance to the chosen tx_cpu. Because.. Why not use
299 * the current cpu to index into the array?
300 */
301 kpreempt_disable();
302 tc = &tx->tx_cpu[CPU_SEQID];
303 kpreempt_enable();
304
34dc7c2f
BB
305 mutex_enter(&tc->tc_lock);
306
307 txg = tx->tx_open_txg;
308 tc->tc_count[txg & TXG_MASK]++;
309
310 th->th_cpu = tc;
311 th->th_txg = txg;
312
313 return (txg);
314}
315
316void
317txg_rele_to_quiesce(txg_handle_t *th)
318{
319 tx_cpu_t *tc = th->th_cpu;
320
321 mutex_exit(&tc->tc_lock);
322}
323
428870ff
BB
324void
325txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
326{
327 tx_cpu_t *tc = th->th_cpu;
328 int g = th->th_txg & TXG_MASK;
329
330 mutex_enter(&tc->tc_lock);
331 list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
332 mutex_exit(&tc->tc_lock);
333}
334
34dc7c2f
BB
335void
336txg_rele_to_sync(txg_handle_t *th)
337{
338 tx_cpu_t *tc = th->th_cpu;
339 int g = th->th_txg & TXG_MASK;
340
341 mutex_enter(&tc->tc_lock);
342 ASSERT(tc->tc_count[g] != 0);
343 if (--tc->tc_count[g] == 0)
344 cv_broadcast(&tc->tc_cv[g]);
345 mutex_exit(&tc->tc_lock);
346
347 th->th_cpu = NULL; /* defensive */
348}
349
350static void
351txg_quiesce(dsl_pool_t *dp, uint64_t txg)
352{
353 tx_state_t *tx = &dp->dp_tx;
354 int g = txg & TXG_MASK;
355 int c;
356
357 /*
358 * Grab all tx_cpu locks so nobody else can get into this txg.
359 */
360 for (c = 0; c < max_ncpus; c++)
361 mutex_enter(&tx->tx_cpu[c].tc_lock);
362
363 ASSERT(txg == tx->tx_open_txg);
364 tx->tx_open_txg++;
365
57f5a200
BB
366 /*
367 * Now that we've incremented tx_open_txg, we can let threads
368 * enter the next transaction group.
369 */
370 for (c = 0; c < max_ncpus; c++)
371 mutex_exit(&tx->tx_cpu[c].tc_lock);
372
34dc7c2f
BB
373 /*
374 * Quiesce the transaction group by waiting for everyone to txg_exit().
375 */
376 for (c = 0; c < max_ncpus; c++) {
377 tx_cpu_t *tc = &tx->tx_cpu[c];
378 mutex_enter(&tc->tc_lock);
379 while (tc->tc_count[g] != 0)
380 cv_wait(&tc->tc_cv[g], &tc->tc_lock);
381 mutex_exit(&tc->tc_lock);
382 }
383}
384
428870ff
BB
385static void
386txg_do_callbacks(list_t *cb_list)
387{
388 dmu_tx_do_callbacks(cb_list, 0);
389
390 list_destroy(cb_list);
391
392 kmem_free(cb_list, sizeof (list_t));
393}
394
395/*
396 * Dispatch the commit callbacks registered on this txg to worker threads.
397 */
398static void
399txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
400{
401 int c;
402 tx_state_t *tx = &dp->dp_tx;
403 list_t *cb_list;
404
405 for (c = 0; c < max_ncpus; c++) {
406 tx_cpu_t *tc = &tx->tx_cpu[c];
407 /* No need to lock tx_cpu_t at this point */
408
409 int g = txg & TXG_MASK;
410
411 if (list_is_empty(&tc->tc_callbacks[g]))
412 continue;
413
414 if (tx->tx_commit_cb_taskq == NULL) {
415 /*
416 * Commit callback taskq hasn't been created yet.
417 */
418 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
090ff092
RC
419 100, minclsyspri, max_ncpus, INT_MAX,
420 TASKQ_THREADS_CPU_PCT | TASKQ_PREPOPULATE);
428870ff
BB
421 }
422
b8d06fca 423 cb_list = kmem_alloc(sizeof (list_t), KM_PUSHPAGE);
428870ff
BB
424 list_create(cb_list, sizeof (dmu_tx_callback_t),
425 offsetof(dmu_tx_callback_t, dcb_node));
426
090ff092 427 list_move_tail(cb_list, &tc->tc_callbacks[g]);
428870ff
BB
428
429 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
430 txg_do_callbacks, cb_list, TQ_SLEEP);
431 }
432}
433
54a179e7
RC
434/*
435 * Wait for pending commit callbacks of already-synced transactions to finish
436 * processing.
437 * Calling this function from within a commit callback will deadlock.
438 */
439void
440txg_wait_callbacks(dsl_pool_t *dp)
441{
442 tx_state_t *tx = &dp->dp_tx;
443
444 if (tx->tx_commit_cb_taskq != NULL)
445 taskq_wait(tx->tx_commit_cb_taskq);
446}
447
34dc7c2f
BB
448static void
449txg_sync_thread(dsl_pool_t *dp)
450{
428870ff 451 spa_t *spa = dp->dp_spa;
34dc7c2f
BB
452 tx_state_t *tx = &dp->dp_tx;
453 callb_cpr_t cpr;
b128c09f 454 uint64_t start, delta;
34dc7c2f 455
8630650a
BB
456#ifdef _KERNEL
457 /*
458 * Annotate this process with a flag that indicates that it is
459 * unsafe to use KM_SLEEP during memory allocations due to the
460 * potential for a deadlock. KM_PUSHPAGE should be used instead.
461 */
462 current->flags |= PF_NOFS;
463#endif /* _KERNEL */
464
34dc7c2f
BB
465 txg_thread_enter(tx, &cpr);
466
467 start = delta = 0;
34dc7c2f 468 for (;;) {
87d98efe 469 uint64_t timer, timeout;
b128c09f 470 uint64_t txg;
34dc7c2f 471
87d98efe
BB
472 timeout = zfs_txg_timeout * hz;
473
34dc7c2f 474 /*
428870ff 475 * We sync when we're scanning, there's someone waiting
b128c09f
BB
476 * on us, or the quiesce thread has handed off a txg to
477 * us, or we have reached our timeout.
34dc7c2f
BB
478 */
479 timer = (delta >= timeout ? 0 : timeout - delta);
428870ff 480 while (!dsl_scan_active(dp->dp_scan) &&
b128c09f 481 !tx->tx_exiting && timer > 0 &&
34dc7c2f
BB
482 tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
483 tx->tx_quiesced_txg == 0) {
484 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
485 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
486 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
428870ff 487 delta = ddi_get_lbolt() - start;
34dc7c2f
BB
488 timer = (delta > timeout ? 0 : timeout - delta);
489 }
490
491 /*
492 * Wait until the quiesce thread hands off a txg to us,
493 * prompting it to do so if necessary.
494 */
495 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
496 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
497 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
498 cv_broadcast(&tx->tx_quiesce_more_cv);
499 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
500 }
501
502 if (tx->tx_exiting)
503 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
504
34dc7c2f
BB
505 /*
506 * Consume the quiesced txg which has been handed off to
507 * us. This may cause the quiescing thread to now be
508 * able to quiesce another txg, so we must signal it.
509 */
510 txg = tx->tx_quiesced_txg;
511 tx->tx_quiesced_txg = 0;
512 tx->tx_syncing_txg = txg;
513 cv_broadcast(&tx->tx_quiesce_more_cv);
34dc7c2f
BB
514
515 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
516 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
517 mutex_exit(&tx->tx_sync_lock);
b128c09f 518
428870ff
BB
519 start = ddi_get_lbolt();
520 spa_sync(spa, txg);
521 delta = ddi_get_lbolt() - start;
34dc7c2f 522
34dc7c2f 523 mutex_enter(&tx->tx_sync_lock);
34dc7c2f
BB
524 tx->tx_synced_txg = txg;
525 tx->tx_syncing_txg = 0;
34dc7c2f 526 cv_broadcast(&tx->tx_sync_done_cv);
428870ff
BB
527
528 /*
529 * Dispatch commit callbacks to worker threads.
530 */
531 txg_dispatch_callbacks(dp, txg);
34dc7c2f
BB
532 }
533}
534
535static void
536txg_quiesce_thread(dsl_pool_t *dp)
537{
538 tx_state_t *tx = &dp->dp_tx;
539 callb_cpr_t cpr;
540
541 txg_thread_enter(tx, &cpr);
542
543 for (;;) {
544 uint64_t txg;
545
546 /*
547 * We quiesce when there's someone waiting on us.
548 * However, we can only have one txg in "quiescing" or
549 * "quiesced, waiting to sync" state. So we wait until
550 * the "quiesced, waiting to sync" txg has been consumed
551 * by the sync thread.
552 */
553 while (!tx->tx_exiting &&
554 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
555 tx->tx_quiesced_txg != 0))
556 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
557
558 if (tx->tx_exiting)
559 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
560
561 txg = tx->tx_open_txg;
562 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
563 txg, tx->tx_quiesce_txg_waiting,
564 tx->tx_sync_txg_waiting);
565 mutex_exit(&tx->tx_sync_lock);
566 txg_quiesce(dp, txg);
567 mutex_enter(&tx->tx_sync_lock);
568
569 /*
570 * Hand this txg off to the sync thread.
571 */
572 dprintf("quiesce done, handing off txg %llu\n", txg);
573 tx->tx_quiesced_txg = txg;
574 cv_broadcast(&tx->tx_sync_more_cv);
575 cv_broadcast(&tx->tx_quiesce_done_cv);
576 }
577}
578
579/*
580 * Delay this thread by 'ticks' if we are still in the open transaction
581 * group and there is already a waiting txg quiesing or quiesced. Abort
582 * the delay if this txg stalls or enters the quiesing state.
583 */
584void
585txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
586{
587 tx_state_t *tx = &dp->dp_tx;
cddafdcb 588 clock_t timeout = ddi_get_lbolt() + ticks;
34dc7c2f
BB
589
590 /* don't delay if this txg could transition to quiesing immediately */
591 if (tx->tx_open_txg > txg ||
592 tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
593 return;
594
595 mutex_enter(&tx->tx_sync_lock);
596 if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
597 mutex_exit(&tx->tx_sync_lock);
598 return;
599 }
600
428870ff 601 while (ddi_get_lbolt() < timeout &&
34dc7c2f
BB
602 tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
603 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
604 timeout);
605
570827e1
BB
606 DMU_TX_STAT_BUMP(dmu_tx_delay);
607
34dc7c2f
BB
608 mutex_exit(&tx->tx_sync_lock);
609}
610
611void
612txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
613{
614 tx_state_t *tx = &dp->dp_tx;
615
13fe0198
MA
616 ASSERT(!dsl_pool_config_held(dp));
617
34dc7c2f
BB
618 mutex_enter(&tx->tx_sync_lock);
619 ASSERT(tx->tx_threads == 2);
620 if (txg == 0)
428870ff 621 txg = tx->tx_open_txg + TXG_DEFER_SIZE;
34dc7c2f
BB
622 if (tx->tx_sync_txg_waiting < txg)
623 tx->tx_sync_txg_waiting = txg;
624 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
625 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
626 while (tx->tx_synced_txg < txg) {
627 dprintf("broadcasting sync more "
628 "tx_synced=%llu waiting=%llu dp=%p\n",
629 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
630 cv_broadcast(&tx->tx_sync_more_cv);
631 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
632 }
633 mutex_exit(&tx->tx_sync_lock);
634}
635
636void
637txg_wait_open(dsl_pool_t *dp, uint64_t txg)
638{
639 tx_state_t *tx = &dp->dp_tx;
640
13fe0198
MA
641 ASSERT(!dsl_pool_config_held(dp));
642
34dc7c2f
BB
643 mutex_enter(&tx->tx_sync_lock);
644 ASSERT(tx->tx_threads == 2);
645 if (txg == 0)
646 txg = tx->tx_open_txg + 1;
647 if (tx->tx_quiesce_txg_waiting < txg)
648 tx->tx_quiesce_txg_waiting = txg;
649 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
650 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
651 while (tx->tx_open_txg < txg) {
652 cv_broadcast(&tx->tx_quiesce_more_cv);
653 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
654 }
655 mutex_exit(&tx->tx_sync_lock);
656}
657
b128c09f 658boolean_t
34dc7c2f
BB
659txg_stalled(dsl_pool_t *dp)
660{
661 tx_state_t *tx = &dp->dp_tx;
662 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
663}
664
b128c09f
BB
665boolean_t
666txg_sync_waiting(dsl_pool_t *dp)
667{
668 tx_state_t *tx = &dp->dp_tx;
669
670 return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
671 tx->tx_quiesced_txg != 0);
672}
673
34dc7c2f
BB
674/*
675 * Per-txg object lists.
676 */
677void
678txg_list_create(txg_list_t *tl, size_t offset)
679{
680 int t;
681
682 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
683
684 tl->tl_offset = offset;
685
686 for (t = 0; t < TXG_SIZE; t++)
687 tl->tl_head[t] = NULL;
688}
689
690void
691txg_list_destroy(txg_list_t *tl)
692{
693 int t;
694
695 for (t = 0; t < TXG_SIZE; t++)
696 ASSERT(txg_list_empty(tl, t));
697
698 mutex_destroy(&tl->tl_lock);
699}
700
29809a6c 701boolean_t
34dc7c2f
BB
702txg_list_empty(txg_list_t *tl, uint64_t txg)
703{
704 return (tl->tl_head[txg & TXG_MASK] == NULL);
705}
706
707/*
13fe0198
MA
708 * Add an entry to the list (unless it's already on the list).
709 * Returns B_TRUE if it was actually added.
34dc7c2f 710 */
13fe0198 711boolean_t
34dc7c2f
BB
712txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
713{
714 int t = txg & TXG_MASK;
715 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
13fe0198 716 boolean_t add;
34dc7c2f
BB
717
718 mutex_enter(&tl->tl_lock);
13fe0198
MA
719 add = (tn->tn_member[t] == 0);
720 if (add) {
34dc7c2f
BB
721 tn->tn_member[t] = 1;
722 tn->tn_next[t] = tl->tl_head[t];
723 tl->tl_head[t] = tn;
724 }
725 mutex_exit(&tl->tl_lock);
726
13fe0198 727 return (add);
34dc7c2f
BB
728}
729
428870ff 730/*
13fe0198
MA
731 * Add an entry to the end of the list, unless it's already on the list.
732 * (walks list to find end)
733 * Returns B_TRUE if it was actually added.
428870ff 734 */
13fe0198 735boolean_t
428870ff
BB
736txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
737{
738 int t = txg & TXG_MASK;
739 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
13fe0198 740 boolean_t add;
428870ff
BB
741
742 mutex_enter(&tl->tl_lock);
13fe0198
MA
743 add = (tn->tn_member[t] == 0);
744 if (add) {
428870ff
BB
745 txg_node_t **tp;
746
747 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
748 continue;
749
750 tn->tn_member[t] = 1;
751 tn->tn_next[t] = NULL;
752 *tp = tn;
753 }
754 mutex_exit(&tl->tl_lock);
755
13fe0198 756 return (add);
428870ff
BB
757}
758
34dc7c2f
BB
759/*
760 * Remove the head of the list and return it.
761 */
762void *
763txg_list_remove(txg_list_t *tl, uint64_t txg)
764{
765 int t = txg & TXG_MASK;
766 txg_node_t *tn;
767 void *p = NULL;
768
769 mutex_enter(&tl->tl_lock);
770 if ((tn = tl->tl_head[t]) != NULL) {
771 p = (char *)tn - tl->tl_offset;
772 tl->tl_head[t] = tn->tn_next[t];
773 tn->tn_next[t] = NULL;
774 tn->tn_member[t] = 0;
775 }
776 mutex_exit(&tl->tl_lock);
777
778 return (p);
779}
780
781/*
782 * Remove a specific item from the list and return it.
783 */
784void *
785txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
786{
787 int t = txg & TXG_MASK;
788 txg_node_t *tn, **tp;
789
790 mutex_enter(&tl->tl_lock);
791
792 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
793 if ((char *)tn - tl->tl_offset == p) {
794 *tp = tn->tn_next[t];
795 tn->tn_next[t] = NULL;
796 tn->tn_member[t] = 0;
797 mutex_exit(&tl->tl_lock);
798 return (p);
799 }
800 }
801
802 mutex_exit(&tl->tl_lock);
803
804 return (NULL);
805}
806
13fe0198 807boolean_t
34dc7c2f
BB
808txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
809{
810 int t = txg & TXG_MASK;
811 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
812
13fe0198 813 return (tn->tn_member[t] != 0);
34dc7c2f
BB
814}
815
816/*
817 * Walk a txg list -- only safe if you know it's not changing.
818 */
819void *
820txg_list_head(txg_list_t *tl, uint64_t txg)
821{
822 int t = txg & TXG_MASK;
823 txg_node_t *tn = tl->tl_head[t];
824
825 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
826}
827
828void *
829txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
830{
831 int t = txg & TXG_MASK;
832 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
833
834 tn = tn->tn_next[t];
835
836 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
837}
c28b2279
BB
838
839#if defined(_KERNEL) && defined(HAVE_SPL)
840EXPORT_SYMBOL(txg_init);
841EXPORT_SYMBOL(txg_fini);
842EXPORT_SYMBOL(txg_sync_start);
843EXPORT_SYMBOL(txg_sync_stop);
844EXPORT_SYMBOL(txg_hold_open);
845EXPORT_SYMBOL(txg_rele_to_quiesce);
846EXPORT_SYMBOL(txg_rele_to_sync);
847EXPORT_SYMBOL(txg_register_callbacks);
848EXPORT_SYMBOL(txg_delay);
849EXPORT_SYMBOL(txg_wait_synced);
850EXPORT_SYMBOL(txg_wait_open);
54a179e7 851EXPORT_SYMBOL(txg_wait_callbacks);
c28b2279
BB
852EXPORT_SYMBOL(txg_stalled);
853EXPORT_SYMBOL(txg_sync_waiting);
87d98efe
BB
854
855module_param(zfs_txg_timeout, int, 0644);
856MODULE_PARM_DESC(zfs_txg_timeout, "Max seconds worth of delta per txg");
c28b2279 857#endif