]> git.proxmox.com Git - mirror_zfs.git/blob - module/zfs/dmu_send.c
Add `zfs allow` and `zfs unallow` support
[mirror_zfs.git] / module / zfs / dmu_send.c
1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
24 * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
25 * Copyright (c) 2014, Joyent, Inc. All rights reserved.
26 * Copyright (c) 2011, 2014 by Delphix. All rights reserved.
27 * Copyright (c) 2016 Actifio, Inc. All rights reserved.
28 */
29
30 #include <sys/dmu.h>
31 #include <sys/dmu_impl.h>
32 #include <sys/dmu_tx.h>
33 #include <sys/dbuf.h>
34 #include <sys/dnode.h>
35 #include <sys/zfs_context.h>
36 #include <sys/dmu_objset.h>
37 #include <sys/dmu_traverse.h>
38 #include <sys/dsl_dataset.h>
39 #include <sys/dsl_dir.h>
40 #include <sys/dsl_prop.h>
41 #include <sys/dsl_pool.h>
42 #include <sys/dsl_synctask.h>
43 #include <sys/spa_impl.h>
44 #include <sys/zfs_ioctl.h>
45 #include <sys/zap.h>
46 #include <sys/zio_checksum.h>
47 #include <sys/zfs_znode.h>
48 #include <zfs_fletcher.h>
49 #include <sys/avl.h>
50 #include <sys/ddt.h>
51 #include <sys/zfs_onexit.h>
52 #include <sys/dmu_send.h>
53 #include <sys/dsl_destroy.h>
54 #include <sys/blkptr.h>
55 #include <sys/dsl_bookmark.h>
56 #include <sys/zfeature.h>
57 #include <sys/bqueue.h>
58 #include <sys/zvol.h>
59 #include <sys/policy.h>
60
61 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
62 int zfs_send_corrupt_data = B_FALSE;
63 int zfs_send_queue_length = 16 * 1024 * 1024;
64 int zfs_recv_queue_length = 16 * 1024 * 1024;
65
66 static char *dmu_recv_tag = "dmu_recv_tag";
67 static const char *recv_clone_name = "%recv";
68
69 #define BP_SPAN(datablkszsec, indblkshift, level) \
70 (((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \
71 (level) * (indblkshift - SPA_BLKPTRSHIFT)))
72
73 struct send_thread_arg {
74 bqueue_t q;
75 dsl_dataset_t *ds; /* Dataset to traverse */
76 uint64_t fromtxg; /* Traverse from this txg */
77 int flags; /* flags to pass to traverse_dataset */
78 int error_code;
79 boolean_t cancel;
80 };
81
82 struct send_block_record {
83 boolean_t eos_marker; /* Marks the end of the stream */
84 blkptr_t bp;
85 zbookmark_phys_t zb;
86 uint8_t indblkshift;
87 uint16_t datablkszsec;
88 bqueue_node_t ln;
89 };
90
91 typedef struct dump_bytes_io {
92 dmu_sendarg_t *dbi_dsp;
93 void *dbi_buf;
94 int dbi_len;
95 } dump_bytes_io_t;
96
97 static void
98 dump_bytes_cb(void *arg)
99 {
100 dump_bytes_io_t *dbi = (dump_bytes_io_t *)arg;
101 dmu_sendarg_t *dsp = dbi->dbi_dsp;
102 dsl_dataset_t *ds = dsp->dsa_os->os_dsl_dataset;
103 ssize_t resid; /* have to get resid to get detailed errno */
104 ASSERT0(dbi->dbi_len % 8);
105
106 dsp->dsa_err = vn_rdwr(UIO_WRITE, dsp->dsa_vp,
107 (caddr_t)dbi->dbi_buf, dbi->dbi_len,
108 0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
109
110 mutex_enter(&ds->ds_sendstream_lock);
111 *dsp->dsa_off += dbi->dbi_len;
112 mutex_exit(&ds->ds_sendstream_lock);
113 }
114
115 static int
116 dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
117 {
118 dump_bytes_io_t dbi;
119
120 dbi.dbi_dsp = dsp;
121 dbi.dbi_buf = buf;
122 dbi.dbi_len = len;
123
124 #if defined(HAVE_LARGE_STACKS)
125 dump_bytes_cb(&dbi);
126 #else
127 /*
128 * The vn_rdwr() call is performed in a taskq to ensure that there is
129 * always enough stack space to write safely to the target filesystem.
130 * The ZIO_TYPE_FREE threads are used because there can be a lot of
131 * them and they are used in vdev_file.c for a similar purpose.
132 */
133 spa_taskq_dispatch_sync(dmu_objset_spa(dsp->dsa_os), ZIO_TYPE_FREE,
134 ZIO_TASKQ_ISSUE, dump_bytes_cb, &dbi, TQ_SLEEP);
135 #endif /* HAVE_LARGE_STACKS */
136
137 return (dsp->dsa_err);
138 }
139
140 /*
141 * For all record types except BEGIN, fill in the checksum (overlaid in
142 * drr_u.drr_checksum.drr_checksum). The checksum verifies everything
143 * up to the start of the checksum itself.
144 */
145 static int
146 dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len)
147 {
148 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
149 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
150 fletcher_4_incremental_native(dsp->dsa_drr,
151 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
152 &dsp->dsa_zc);
153 if (dsp->dsa_drr->drr_type != DRR_BEGIN) {
154 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u.
155 drr_checksum.drr_checksum));
156 dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc;
157 }
158 fletcher_4_incremental_native(&dsp->dsa_drr->
159 drr_u.drr_checksum.drr_checksum,
160 sizeof (zio_cksum_t), &dsp->dsa_zc);
161 if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0)
162 return (SET_ERROR(EINTR));
163 if (payload_len != 0) {
164 fletcher_4_incremental_native(payload, payload_len,
165 &dsp->dsa_zc);
166 if (dump_bytes(dsp, payload, payload_len) != 0)
167 return (SET_ERROR(EINTR));
168 }
169 return (0);
170 }
171
172 static int
173 dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
174 uint64_t length)
175 {
176 struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free);
177
178 /*
179 * When we receive a free record, dbuf_free_range() assumes
180 * that the receiving system doesn't have any dbufs in the range
181 * being freed. This is always true because there is a one-record
182 * constraint: we only send one WRITE record for any given
183 * object+offset. We know that the one-record constraint is
184 * true because we always send data in increasing order by
185 * object,offset.
186 *
187 * If the increasing-order constraint ever changes, we should find
188 * another way to assert that the one-record constraint is still
189 * satisfied.
190 */
191 ASSERT(object > dsp->dsa_last_data_object ||
192 (object == dsp->dsa_last_data_object &&
193 offset > dsp->dsa_last_data_offset));
194
195 /*
196 * If we are doing a non-incremental send, then there can't
197 * be any data in the dataset we're receiving into. Therefore
198 * a free record would simply be a no-op. Save space by not
199 * sending it to begin with.
200 */
201 if (!dsp->dsa_incremental)
202 return (0);
203
204 if (length != -1ULL && offset + length < offset)
205 length = -1ULL;
206
207 /*
208 * If there is a pending op, but it's not PENDING_FREE, push it out,
209 * since free block aggregation can only be done for blocks of the
210 * same type (i.e., DRR_FREE records can only be aggregated with
211 * other DRR_FREE records. DRR_FREEOBJECTS records can only be
212 * aggregated with other DRR_FREEOBJECTS records.
213 */
214 if (dsp->dsa_pending_op != PENDING_NONE &&
215 dsp->dsa_pending_op != PENDING_FREE) {
216 if (dump_record(dsp, NULL, 0) != 0)
217 return (SET_ERROR(EINTR));
218 dsp->dsa_pending_op = PENDING_NONE;
219 }
220
221 if (dsp->dsa_pending_op == PENDING_FREE) {
222 /*
223 * There should never be a PENDING_FREE if length is -1
224 * (because dump_dnode is the only place where this
225 * function is called with a -1, and only after flushing
226 * any pending record).
227 */
228 ASSERT(length != -1ULL);
229 /*
230 * Check to see whether this free block can be aggregated
231 * with pending one.
232 */
233 if (drrf->drr_object == object && drrf->drr_offset +
234 drrf->drr_length == offset) {
235 drrf->drr_length += length;
236 return (0);
237 } else {
238 /* not a continuation. Push out pending record */
239 if (dump_record(dsp, NULL, 0) != 0)
240 return (SET_ERROR(EINTR));
241 dsp->dsa_pending_op = PENDING_NONE;
242 }
243 }
244 /* create a FREE record and make it pending */
245 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
246 dsp->dsa_drr->drr_type = DRR_FREE;
247 drrf->drr_object = object;
248 drrf->drr_offset = offset;
249 drrf->drr_length = length;
250 drrf->drr_toguid = dsp->dsa_toguid;
251 if (length == -1ULL) {
252 if (dump_record(dsp, NULL, 0) != 0)
253 return (SET_ERROR(EINTR));
254 } else {
255 dsp->dsa_pending_op = PENDING_FREE;
256 }
257
258 return (0);
259 }
260
261 static int
262 dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type,
263 uint64_t object, uint64_t offset, int blksz, const blkptr_t *bp, void *data)
264 {
265 struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write);
266
267 /*
268 * We send data in increasing object, offset order.
269 * See comment in dump_free() for details.
270 */
271 ASSERT(object > dsp->dsa_last_data_object ||
272 (object == dsp->dsa_last_data_object &&
273 offset > dsp->dsa_last_data_offset));
274 dsp->dsa_last_data_object = object;
275 dsp->dsa_last_data_offset = offset + blksz - 1;
276
277 /*
278 * If there is any kind of pending aggregation (currently either
279 * a grouping of free objects or free blocks), push it out to
280 * the stream, since aggregation can't be done across operations
281 * of different types.
282 */
283 if (dsp->dsa_pending_op != PENDING_NONE) {
284 if (dump_record(dsp, NULL, 0) != 0)
285 return (SET_ERROR(EINTR));
286 dsp->dsa_pending_op = PENDING_NONE;
287 }
288 /* write a WRITE record */
289 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
290 dsp->dsa_drr->drr_type = DRR_WRITE;
291 drrw->drr_object = object;
292 drrw->drr_type = type;
293 drrw->drr_offset = offset;
294 drrw->drr_length = blksz;
295 drrw->drr_toguid = dsp->dsa_toguid;
296 if (bp == NULL || BP_IS_EMBEDDED(bp)) {
297 /*
298 * There's no pre-computed checksum for partial-block
299 * writes or embedded BP's, so (like
300 * fletcher4-checkummed blocks) userland will have to
301 * compute a dedup-capable checksum itself.
302 */
303 drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;
304 } else {
305 drrw->drr_checksumtype = BP_GET_CHECKSUM(bp);
306 if (zio_checksum_table[drrw->drr_checksumtype].ci_dedup)
307 drrw->drr_checksumflags |= DRR_CHECKSUM_DEDUP;
308 DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp));
309 DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp));
310 DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp));
311 drrw->drr_key.ddk_cksum = bp->blk_cksum;
312 }
313
314 if (dump_record(dsp, data, blksz) != 0)
315 return (SET_ERROR(EINTR));
316 return (0);
317 }
318
319 static int
320 dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
321 int blksz, const blkptr_t *bp)
322 {
323 char buf[BPE_PAYLOAD_SIZE];
324 struct drr_write_embedded *drrw =
325 &(dsp->dsa_drr->drr_u.drr_write_embedded);
326
327 if (dsp->dsa_pending_op != PENDING_NONE) {
328 if (dump_record(dsp, NULL, 0) != 0)
329 return (EINTR);
330 dsp->dsa_pending_op = PENDING_NONE;
331 }
332
333 ASSERT(BP_IS_EMBEDDED(bp));
334
335 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
336 dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED;
337 drrw->drr_object = object;
338 drrw->drr_offset = offset;
339 drrw->drr_length = blksz;
340 drrw->drr_toguid = dsp->dsa_toguid;
341 drrw->drr_compression = BP_GET_COMPRESS(bp);
342 drrw->drr_etype = BPE_GET_ETYPE(bp);
343 drrw->drr_lsize = BPE_GET_LSIZE(bp);
344 drrw->drr_psize = BPE_GET_PSIZE(bp);
345
346 decode_embedded_bp_compressed(bp, buf);
347
348 if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
349 return (EINTR);
350 return (0);
351 }
352
353 static int
354 dump_spill(dmu_sendarg_t *dsp, uint64_t object, int blksz, void *data)
355 {
356 struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill);
357
358 if (dsp->dsa_pending_op != PENDING_NONE) {
359 if (dump_record(dsp, NULL, 0) != 0)
360 return (SET_ERROR(EINTR));
361 dsp->dsa_pending_op = PENDING_NONE;
362 }
363
364 /* write a SPILL record */
365 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
366 dsp->dsa_drr->drr_type = DRR_SPILL;
367 drrs->drr_object = object;
368 drrs->drr_length = blksz;
369 drrs->drr_toguid = dsp->dsa_toguid;
370
371 if (dump_record(dsp, data, blksz) != 0)
372 return (SET_ERROR(EINTR));
373 return (0);
374 }
375
376 static int
377 dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs)
378 {
379 struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects);
380
381 /* See comment in dump_free(). */
382 if (!dsp->dsa_incremental)
383 return (0);
384
385 /*
386 * If there is a pending op, but it's not PENDING_FREEOBJECTS,
387 * push it out, since free block aggregation can only be done for
388 * blocks of the same type (i.e., DRR_FREE records can only be
389 * aggregated with other DRR_FREE records. DRR_FREEOBJECTS records
390 * can only be aggregated with other DRR_FREEOBJECTS records.
391 */
392 if (dsp->dsa_pending_op != PENDING_NONE &&
393 dsp->dsa_pending_op != PENDING_FREEOBJECTS) {
394 if (dump_record(dsp, NULL, 0) != 0)
395 return (SET_ERROR(EINTR));
396 dsp->dsa_pending_op = PENDING_NONE;
397 }
398 if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) {
399 /*
400 * See whether this free object array can be aggregated
401 * with pending one
402 */
403 if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {
404 drrfo->drr_numobjs += numobjs;
405 return (0);
406 } else {
407 /* can't be aggregated. Push out pending record */
408 if (dump_record(dsp, NULL, 0) != 0)
409 return (SET_ERROR(EINTR));
410 dsp->dsa_pending_op = PENDING_NONE;
411 }
412 }
413
414 /* write a FREEOBJECTS record */
415 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
416 dsp->dsa_drr->drr_type = DRR_FREEOBJECTS;
417 drrfo->drr_firstobj = firstobj;
418 drrfo->drr_numobjs = numobjs;
419 drrfo->drr_toguid = dsp->dsa_toguid;
420
421 dsp->dsa_pending_op = PENDING_FREEOBJECTS;
422
423 return (0);
424 }
425
426 static int
427 dump_dnode(dmu_sendarg_t *dsp, uint64_t object, dnode_phys_t *dnp)
428 {
429 struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object);
430
431 if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
432 return (dump_freeobjects(dsp, object, 1));
433
434 if (dsp->dsa_pending_op != PENDING_NONE) {
435 if (dump_record(dsp, NULL, 0) != 0)
436 return (SET_ERROR(EINTR));
437 dsp->dsa_pending_op = PENDING_NONE;
438 }
439
440 /* write an OBJECT record */
441 bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
442 dsp->dsa_drr->drr_type = DRR_OBJECT;
443 drro->drr_object = object;
444 drro->drr_type = dnp->dn_type;
445 drro->drr_bonustype = dnp->dn_bonustype;
446 drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
447 drro->drr_bonuslen = dnp->dn_bonuslen;
448 drro->drr_checksumtype = dnp->dn_checksum;
449 drro->drr_compress = dnp->dn_compress;
450 drro->drr_toguid = dsp->dsa_toguid;
451
452 if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
453 drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE)
454 drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE;
455
456 if (dump_record(dsp, DN_BONUS(dnp),
457 P2ROUNDUP(dnp->dn_bonuslen, 8)) != 0) {
458 return (SET_ERROR(EINTR));
459 }
460
461 /* Free anything past the end of the file. */
462 if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) *
463 (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), -1ULL) != 0)
464 return (SET_ERROR(EINTR));
465 if (dsp->dsa_err != 0)
466 return (SET_ERROR(EINTR));
467 return (0);
468 }
469
470 static boolean_t
471 backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp)
472 {
473 if (!BP_IS_EMBEDDED(bp))
474 return (B_FALSE);
475
476 /*
477 * Compression function must be legacy, or explicitly enabled.
478 */
479 if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
480 !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4)))
481 return (B_FALSE);
482
483 /*
484 * Embed type must be explicitly enabled.
485 */
486 switch (BPE_GET_ETYPE(bp)) {
487 case BP_EMBEDDED_TYPE_DATA:
488 if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
489 return (B_TRUE);
490 break;
491 default:
492 return (B_FALSE);
493 }
494 return (B_FALSE);
495 }
496
497 /*
498 * This is the callback function to traverse_dataset that acts as the worker
499 * thread for dmu_send_impl.
500 */
501 /*ARGSUSED*/
502 static int
503 send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
504 const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
505 {
506 struct send_thread_arg *sta = arg;
507 struct send_block_record *record;
508 uint64_t record_size;
509 int err = 0;
510
511 if (sta->cancel)
512 return (SET_ERROR(EINTR));
513
514 if (bp == NULL) {
515 ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL);
516 return (0);
517 } else if (zb->zb_level < 0) {
518 return (0);
519 }
520
521 record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP);
522 record->eos_marker = B_FALSE;
523 record->bp = *bp;
524 record->zb = *zb;
525 record->indblkshift = dnp->dn_indblkshift;
526 record->datablkszsec = dnp->dn_datablkszsec;
527 record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
528 bqueue_enqueue(&sta->q, record, record_size);
529
530 return (err);
531 }
532
533 /*
534 * This function kicks off the traverse_dataset. It also handles setting the
535 * error code of the thread in case something goes wrong, and pushes the End of
536 * Stream record when the traverse_dataset call has finished. If there is no
537 * dataset to traverse, the thread immediately pushes End of Stream marker.
538 */
539 static void
540 send_traverse_thread(void *arg)
541 {
542 struct send_thread_arg *st_arg = arg;
543 int err;
544 struct send_block_record *data;
545
546 if (st_arg->ds != NULL) {
547 err = traverse_dataset(st_arg->ds, st_arg->fromtxg,
548 st_arg->flags, send_cb, arg);
549 if (err != EINTR)
550 st_arg->error_code = err;
551 }
552 data = kmem_zalloc(sizeof (*data), KM_SLEEP);
553 data->eos_marker = B_TRUE;
554 bqueue_enqueue(&st_arg->q, data, 1);
555 }
556
557 /*
558 * This function actually handles figuring out what kind of record needs to be
559 * dumped, reading the data (which has hopefully been prefetched), and calling
560 * the appropriate helper function.
561 */
562 static int
563 do_dump(dmu_sendarg_t *dsa, struct send_block_record *data)
564 {
565 dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os);
566 const blkptr_t *bp = &data->bp;
567 const zbookmark_phys_t *zb = &data->zb;
568 uint8_t indblkshift = data->indblkshift;
569 uint16_t dblkszsec = data->datablkszsec;
570 spa_t *spa = ds->ds_dir->dd_pool->dp_spa;
571 dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE;
572 int err = 0;
573 dnode_phys_t *blk;
574 uint64_t dnobj;
575
576 ASSERT3U(zb->zb_level, >=, 0);
577
578 if (zb->zb_object != DMU_META_DNODE_OBJECT &&
579 DMU_OBJECT_IS_SPECIAL(zb->zb_object)) {
580 return (0);
581 } else if (BP_IS_HOLE(bp) &&
582 zb->zb_object == DMU_META_DNODE_OBJECT) {
583 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
584 uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT;
585 err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT);
586 } else if (BP_IS_HOLE(bp)) {
587 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
588 uint64_t offset = zb->zb_blkid * span;
589 err = dump_free(dsa, zb->zb_object, offset, span);
590 } else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) {
591 return (0);
592 } else if (type == DMU_OT_DNODE) {
593 int blksz = BP_GET_LSIZE(bp);
594 arc_flags_t aflags = ARC_FLAG_WAIT;
595 arc_buf_t *abuf;
596 int i;
597
598 ASSERT0(zb->zb_level);
599
600 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
601 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
602 &aflags, zb) != 0)
603 return (SET_ERROR(EIO));
604
605 blk = abuf->b_data;
606 dnobj = zb->zb_blkid * (blksz >> DNODE_SHIFT);
607 for (i = 0; i < blksz >> DNODE_SHIFT; i++) {
608 err = dump_dnode(dsa, dnobj + i, blk + i);
609 if (err != 0)
610 break;
611 }
612 (void) arc_buf_remove_ref(abuf, &abuf);
613 } else if (type == DMU_OT_SA) {
614 arc_flags_t aflags = ARC_FLAG_WAIT;
615 arc_buf_t *abuf;
616 int blksz = BP_GET_LSIZE(bp);
617
618 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
619 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
620 &aflags, zb) != 0)
621 return (SET_ERROR(EIO));
622
623 err = dump_spill(dsa, zb->zb_object, blksz, abuf->b_data);
624 (void) arc_buf_remove_ref(abuf, &abuf);
625 } else if (backup_do_embed(dsa, bp)) {
626 /* it's an embedded level-0 block of a regular object */
627 int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
628 ASSERT0(zb->zb_level);
629 err = dump_write_embedded(dsa, zb->zb_object,
630 zb->zb_blkid * blksz, blksz, bp);
631 } else {
632 /* it's a level-0 block of a regular object */
633 arc_flags_t aflags = ARC_FLAG_WAIT;
634 arc_buf_t *abuf;
635 int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
636 uint64_t offset;
637
638 ASSERT0(zb->zb_level);
639 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
640 ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
641 &aflags, zb) != 0) {
642 if (zfs_send_corrupt_data) {
643 uint64_t *ptr;
644 /* Send a block filled with 0x"zfs badd bloc" */
645 abuf = arc_buf_alloc(spa, blksz, &abuf,
646 ARC_BUFC_DATA);
647 for (ptr = abuf->b_data;
648 (char *)ptr < (char *)abuf->b_data + blksz;
649 ptr++)
650 *ptr = 0x2f5baddb10cULL;
651 } else {
652 return (SET_ERROR(EIO));
653 }
654 }
655
656 offset = zb->zb_blkid * blksz;
657
658 if (!(dsa->dsa_featureflags &
659 DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
660 blksz > SPA_OLD_MAXBLOCKSIZE) {
661 char *buf = abuf->b_data;
662 while (blksz > 0 && err == 0) {
663 int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE);
664 err = dump_write(dsa, type, zb->zb_object,
665 offset, n, NULL, buf);
666 offset += n;
667 buf += n;
668 blksz -= n;
669 }
670 } else {
671 err = dump_write(dsa, type, zb->zb_object,
672 offset, blksz, bp, abuf->b_data);
673 }
674 (void) arc_buf_remove_ref(abuf, &abuf);
675 }
676
677 ASSERT(err == 0 || err == EINTR);
678 return (err);
679 }
680
681 /*
682 * Pop the new data off the queue, and free the old data.
683 */
684 static struct send_block_record *
685 get_next_record(bqueue_t *bq, struct send_block_record *data)
686 {
687 struct send_block_record *tmp = bqueue_dequeue(bq);
688 kmem_free(data, sizeof (*data));
689 return (tmp);
690 }
691
692 /*
693 * Actually do the bulk of the work in a zfs send.
694 *
695 * Note: Releases dp using the specified tag.
696 */
697 static int
698 dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds,
699 zfs_bookmark_phys_t *ancestor_zb, boolean_t is_clone, boolean_t embedok,
700 boolean_t large_block_ok, int outfd, vnode_t *vp, offset_t *off)
701 {
702 objset_t *os;
703 dmu_replay_record_t *drr;
704 dmu_sendarg_t *dsp;
705 int err;
706 uint64_t fromtxg = 0;
707 uint64_t featureflags = 0;
708 struct send_thread_arg to_arg;
709 struct send_block_record *to_data;
710
711 err = dmu_objset_from_ds(to_ds, &os);
712 if (err != 0) {
713 dsl_pool_rele(dp, tag);
714 return (err);
715 }
716
717 drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
718 drr->drr_type = DRR_BEGIN;
719 drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
720 DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo,
721 DMU_SUBSTREAM);
722
723 #ifdef _KERNEL
724 if (dmu_objset_type(os) == DMU_OST_ZFS) {
725 uint64_t version;
726 if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) {
727 kmem_free(drr, sizeof (dmu_replay_record_t));
728 dsl_pool_rele(dp, tag);
729 return (SET_ERROR(EINVAL));
730 }
731 if (version >= ZPL_VERSION_SA) {
732 featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
733 }
734 }
735 #endif
736
737 if (large_block_ok && to_ds->ds_feature_inuse[SPA_FEATURE_LARGE_BLOCKS])
738 featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;
739 if (embedok &&
740 spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {
741 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;
742 if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
743 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA_LZ4;
744 }
745
746 DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo,
747 featureflags);
748
749 drr->drr_u.drr_begin.drr_creation_time =
750 dsl_dataset_phys(to_ds)->ds_creation_time;
751 drr->drr_u.drr_begin.drr_type = dmu_objset_type(os);
752 if (is_clone)
753 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE;
754 drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;
755 if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET)
756 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA;
757
758 if (ancestor_zb != NULL) {
759 drr->drr_u.drr_begin.drr_fromguid =
760 ancestor_zb->zbm_guid;
761 fromtxg = ancestor_zb->zbm_creation_txg;
762 }
763 dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname);
764 if (!to_ds->ds_is_snapshot) {
765 (void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--",
766 sizeof (drr->drr_u.drr_begin.drr_toname));
767 }
768
769 dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP);
770
771 dsp->dsa_drr = drr;
772 dsp->dsa_vp = vp;
773 dsp->dsa_outfd = outfd;
774 dsp->dsa_proc = curproc;
775 dsp->dsa_os = os;
776 dsp->dsa_off = off;
777 dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid;
778 dsp->dsa_pending_op = PENDING_NONE;
779 dsp->dsa_incremental = (ancestor_zb != NULL);
780 dsp->dsa_featureflags = featureflags;
781
782 mutex_enter(&to_ds->ds_sendstream_lock);
783 list_insert_head(&to_ds->ds_sendstreams, dsp);
784 mutex_exit(&to_ds->ds_sendstream_lock);
785
786 dsl_dataset_long_hold(to_ds, FTAG);
787 dsl_pool_rele(dp, tag);
788
789 if (dump_record(dsp, NULL, 0) != 0) {
790 err = dsp->dsa_err;
791 goto out;
792 }
793
794 err = bqueue_init(&to_arg.q, zfs_send_queue_length,
795 offsetof(struct send_block_record, ln));
796 to_arg.error_code = 0;
797 to_arg.cancel = B_FALSE;
798 to_arg.ds = to_ds;
799 to_arg.fromtxg = fromtxg;
800 to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH;
801 (void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, curproc,
802 TS_RUN, minclsyspri);
803
804 to_data = bqueue_dequeue(&to_arg.q);
805
806 while (!to_data->eos_marker && err == 0) {
807 err = do_dump(dsp, to_data);
808 to_data = get_next_record(&to_arg.q, to_data);
809 if (issig(JUSTLOOKING) && issig(FORREAL))
810 err = EINTR;
811 }
812
813 if (err != 0) {
814 to_arg.cancel = B_TRUE;
815 while (!to_data->eos_marker) {
816 to_data = get_next_record(&to_arg.q, to_data);
817 }
818 }
819 kmem_free(to_data, sizeof (*to_data));
820
821 bqueue_destroy(&to_arg.q);
822
823 if (err == 0 && to_arg.error_code != 0)
824 err = to_arg.error_code;
825
826 if (err != 0)
827 goto out;
828
829 if (dsp->dsa_pending_op != PENDING_NONE)
830 if (dump_record(dsp, NULL, 0) != 0)
831 err = SET_ERROR(EINTR);
832
833 if (err != 0) {
834 if (err == EINTR && dsp->dsa_err != 0)
835 err = dsp->dsa_err;
836 goto out;
837 }
838
839 bzero(drr, sizeof (dmu_replay_record_t));
840 drr->drr_type = DRR_END;
841 drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc;
842 drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid;
843
844 if (dump_record(dsp, NULL, 0) != 0)
845 err = dsp->dsa_err;
846
847 out:
848 mutex_enter(&to_ds->ds_sendstream_lock);
849 list_remove(&to_ds->ds_sendstreams, dsp);
850 mutex_exit(&to_ds->ds_sendstream_lock);
851
852 kmem_free(drr, sizeof (dmu_replay_record_t));
853 kmem_free(dsp, sizeof (dmu_sendarg_t));
854
855 dsl_dataset_long_rele(to_ds, FTAG);
856
857 return (err);
858 }
859
860 int
861 dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
862 boolean_t embedok, boolean_t large_block_ok,
863 int outfd, vnode_t *vp, offset_t *off)
864 {
865 dsl_pool_t *dp;
866 dsl_dataset_t *ds;
867 dsl_dataset_t *fromds = NULL;
868 int err;
869
870 err = dsl_pool_hold(pool, FTAG, &dp);
871 if (err != 0)
872 return (err);
873
874 err = dsl_dataset_hold_obj(dp, tosnap, FTAG, &ds);
875 if (err != 0) {
876 dsl_pool_rele(dp, FTAG);
877 return (err);
878 }
879
880 if (fromsnap != 0) {
881 zfs_bookmark_phys_t zb;
882 boolean_t is_clone;
883
884 err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds);
885 if (err != 0) {
886 dsl_dataset_rele(ds, FTAG);
887 dsl_pool_rele(dp, FTAG);
888 return (err);
889 }
890 if (!dsl_dataset_is_before(ds, fromds, 0))
891 err = SET_ERROR(EXDEV);
892 zb.zbm_creation_time =
893 dsl_dataset_phys(fromds)->ds_creation_time;
894 zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg;
895 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
896 is_clone = (fromds->ds_dir != ds->ds_dir);
897 dsl_dataset_rele(fromds, FTAG);
898 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
899 embedok, large_block_ok, outfd, vp, off);
900 } else {
901 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
902 embedok, large_block_ok, outfd, vp, off);
903 }
904 dsl_dataset_rele(ds, FTAG);
905 return (err);
906 }
907
908 int
909 dmu_send(const char *tosnap, const char *fromsnap,
910 boolean_t embedok, boolean_t large_block_ok,
911 int outfd, vnode_t *vp, offset_t *off)
912 {
913 dsl_pool_t *dp;
914 dsl_dataset_t *ds;
915 int err;
916 boolean_t owned = B_FALSE;
917
918 if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
919 return (SET_ERROR(EINVAL));
920
921 err = dsl_pool_hold(tosnap, FTAG, &dp);
922 if (err != 0)
923 return (err);
924
925 if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) {
926 /*
927 * We are sending a filesystem or volume. Ensure
928 * that it doesn't change by owning the dataset.
929 */
930 err = dsl_dataset_own(dp, tosnap, FTAG, &ds);
931 owned = B_TRUE;
932 } else {
933 err = dsl_dataset_hold(dp, tosnap, FTAG, &ds);
934 }
935 if (err != 0) {
936 dsl_pool_rele(dp, FTAG);
937 return (err);
938 }
939
940 if (fromsnap != NULL) {
941 zfs_bookmark_phys_t zb;
942 boolean_t is_clone = B_FALSE;
943 int fsnamelen = strchr(tosnap, '@') - tosnap;
944
945 /*
946 * If the fromsnap is in a different filesystem, then
947 * mark the send stream as a clone.
948 */
949 if (strncmp(tosnap, fromsnap, fsnamelen) != 0 ||
950 (fromsnap[fsnamelen] != '@' &&
951 fromsnap[fsnamelen] != '#')) {
952 is_clone = B_TRUE;
953 }
954
955 if (strchr(fromsnap, '@')) {
956 dsl_dataset_t *fromds;
957 err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds);
958 if (err == 0) {
959 if (!dsl_dataset_is_before(ds, fromds, 0))
960 err = SET_ERROR(EXDEV);
961 zb.zbm_creation_time =
962 dsl_dataset_phys(fromds)->ds_creation_time;
963 zb.zbm_creation_txg =
964 dsl_dataset_phys(fromds)->ds_creation_txg;
965 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
966 is_clone = (ds->ds_dir != fromds->ds_dir);
967 dsl_dataset_rele(fromds, FTAG);
968 }
969 } else {
970 err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb);
971 }
972 if (err != 0) {
973 dsl_dataset_rele(ds, FTAG);
974 dsl_pool_rele(dp, FTAG);
975 return (err);
976 }
977 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
978 embedok, large_block_ok, outfd, vp, off);
979 } else {
980 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
981 embedok, large_block_ok, outfd, vp, off);
982 }
983 if (owned)
984 dsl_dataset_disown(ds, FTAG);
985 else
986 dsl_dataset_rele(ds, FTAG);
987 return (err);
988 }
989
990 static int
991 dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t size,
992 uint64_t *sizep)
993 {
994 int err;
995 /*
996 * Assume that space (both on-disk and in-stream) is dominated by
997 * data. We will adjust for indirect blocks and the copies property,
998 * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
999 */
1000
1001 /*
1002 * Subtract out approximate space used by indirect blocks.
1003 * Assume most space is used by data blocks (non-indirect, non-dnode).
1004 * Assume all blocks are recordsize. Assume ditto blocks and
1005 * internal fragmentation counter out compression.
1006 *
1007 * Therefore, space used by indirect blocks is sizeof(blkptr_t) per
1008 * block, which we observe in practice.
1009 */
1010 uint64_t recordsize;
1011 err = dsl_prop_get_int_ds(ds, "recordsize", &recordsize);
1012 if (err != 0)
1013 return (err);
1014 size -= size / recordsize * sizeof (blkptr_t);
1015
1016 /* Add in the space for the record associated with each block. */
1017 size += size / recordsize * sizeof (dmu_replay_record_t);
1018
1019 *sizep = size;
1020
1021 return (0);
1022 }
1023
1024 int
1025 dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds, uint64_t *sizep)
1026 {
1027 int err;
1028 uint64_t size;
1029
1030 ASSERT(dsl_pool_config_held(ds->ds_dir->dd_pool));
1031
1032 /* tosnap must be a snapshot */
1033 if (!ds->ds_is_snapshot)
1034 return (SET_ERROR(EINVAL));
1035
1036 /* fromsnap, if provided, must be a snapshot */
1037 if (fromds != NULL && !fromds->ds_is_snapshot)
1038 return (SET_ERROR(EINVAL));
1039
1040 /*
1041 * fromsnap must be an earlier snapshot from the same fs as tosnap,
1042 * or the origin's fs.
1043 */
1044 if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0))
1045 return (SET_ERROR(EXDEV));
1046
1047 /* Get uncompressed size estimate of changed data. */
1048 if (fromds == NULL) {
1049 size = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
1050 } else {
1051 uint64_t used, comp;
1052 err = dsl_dataset_space_written(fromds, ds,
1053 &used, &comp, &size);
1054 if (err != 0)
1055 return (err);
1056 }
1057
1058 err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep);
1059 return (err);
1060 }
1061
1062 /*
1063 * Simple callback used to traverse the blocks of a snapshot and sum their
1064 * uncompressed size
1065 */
1066 /* ARGSUSED */
1067 static int
1068 dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1069 const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg)
1070 {
1071 uint64_t *spaceptr = arg;
1072 if (bp != NULL && !BP_IS_HOLE(bp)) {
1073 *spaceptr += BP_GET_UCSIZE(bp);
1074 }
1075 return (0);
1076 }
1077
1078 /*
1079 * Given a desination snapshot and a TXG, calculate the approximate size of a
1080 * send stream sent from that TXG. from_txg may be zero, indicating that the
1081 * whole snapshot will be sent.
1082 */
1083 int
1084 dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg,
1085 uint64_t *sizep)
1086 {
1087 int err;
1088 uint64_t size = 0;
1089
1090 ASSERT(dsl_pool_config_held(ds->ds_dir->dd_pool));
1091
1092 /* tosnap must be a snapshot */
1093 if (!dsl_dataset_is_snapshot(ds))
1094 return (SET_ERROR(EINVAL));
1095
1096 /* verify that from_txg is before the provided snapshot was taken */
1097 if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) {
1098 return (SET_ERROR(EXDEV));
1099 }
1100 /*
1101 * traverse the blocks of the snapshot with birth times after
1102 * from_txg, summing their uncompressed size
1103 */
1104 err = traverse_dataset(ds, from_txg, TRAVERSE_POST,
1105 dmu_calculate_send_traversal, &size);
1106 if (err)
1107 return (err);
1108
1109 err = dmu_adjust_send_estimate_for_indirects(ds, size, sizep);
1110 return (err);
1111 }
1112
1113 typedef struct dmu_recv_begin_arg {
1114 const char *drba_origin;
1115 dmu_recv_cookie_t *drba_cookie;
1116 cred_t *drba_cred;
1117 uint64_t drba_snapobj;
1118 } dmu_recv_begin_arg_t;
1119
1120 static int
1121 recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
1122 uint64_t fromguid)
1123 {
1124 uint64_t val;
1125 int error;
1126 dsl_pool_t *dp = ds->ds_dir->dd_pool;
1127
1128 /* temporary clone name must not exist */
1129 error = zap_lookup(dp->dp_meta_objset,
1130 dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name,
1131 8, 1, &val);
1132 if (error != ENOENT)
1133 return (error == 0 ? EBUSY : error);
1134
1135 /* new snapshot name must not exist */
1136 error = zap_lookup(dp->dp_meta_objset,
1137 dsl_dataset_phys(ds)->ds_snapnames_zapobj,
1138 drba->drba_cookie->drc_tosnap, 8, 1, &val);
1139 if (error != ENOENT)
1140 return (error == 0 ? EEXIST : error);
1141
1142 /*
1143 * Check snapshot limit before receiving. We'll recheck again at the
1144 * end, but might as well abort before receiving if we're already over
1145 * the limit.
1146 *
1147 * Note that we do not check the file system limit with
1148 * dsl_dir_fscount_check because the temporary %clones don't count
1149 * against that limit.
1150 */
1151 error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT,
1152 NULL, drba->drba_cred);
1153 if (error != 0)
1154 return (error);
1155
1156 if (fromguid != 0) {
1157 dsl_dataset_t *snap;
1158 uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj;
1159
1160 /* Find snapshot in this dir that matches fromguid. */
1161 while (obj != 0) {
1162 error = dsl_dataset_hold_obj(dp, obj, FTAG,
1163 &snap);
1164 if (error != 0)
1165 return (SET_ERROR(ENODEV));
1166 if (snap->ds_dir != ds->ds_dir) {
1167 dsl_dataset_rele(snap, FTAG);
1168 return (SET_ERROR(ENODEV));
1169 }
1170 if (dsl_dataset_phys(snap)->ds_guid == fromguid)
1171 break;
1172 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
1173 dsl_dataset_rele(snap, FTAG);
1174 }
1175 if (obj == 0)
1176 return (SET_ERROR(ENODEV));
1177
1178 if (drba->drba_cookie->drc_force) {
1179 drba->drba_snapobj = obj;
1180 } else {
1181 /*
1182 * If we are not forcing, there must be no
1183 * changes since fromsnap.
1184 */
1185 if (dsl_dataset_modified_since_snap(ds, snap)) {
1186 dsl_dataset_rele(snap, FTAG);
1187 return (SET_ERROR(ETXTBSY));
1188 }
1189 drba->drba_snapobj = ds->ds_prev->ds_object;
1190 }
1191
1192 dsl_dataset_rele(snap, FTAG);
1193 } else {
1194 /* if full, then must be forced */
1195 if (!drba->drba_cookie->drc_force)
1196 return (SET_ERROR(EEXIST));
1197 /* start from $ORIGIN@$ORIGIN, if supported */
1198 drba->drba_snapobj = dp->dp_origin_snap != NULL ?
1199 dp->dp_origin_snap->ds_object : 0;
1200 }
1201
1202 return (0);
1203
1204 }
1205
1206 static int
1207 dmu_recv_begin_check(void *arg, dmu_tx_t *tx)
1208 {
1209 dmu_recv_begin_arg_t *drba = arg;
1210 dsl_pool_t *dp = dmu_tx_pool(tx);
1211 struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1212 uint64_t fromguid = drrb->drr_fromguid;
1213 int flags = drrb->drr_flags;
1214 int error;
1215 uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
1216 dsl_dataset_t *ds;
1217 const char *tofs = drba->drba_cookie->drc_tofs;
1218
1219 /* already checked */
1220 ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
1221
1222 if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
1223 DMU_COMPOUNDSTREAM ||
1224 drrb->drr_type >= DMU_OST_NUMTYPES ||
1225 ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL))
1226 return (SET_ERROR(EINVAL));
1227
1228 /* Verify pool version supports SA if SA_SPILL feature set */
1229 if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
1230 spa_version(dp->dp_spa) < SPA_VERSION_SA)
1231 return (SET_ERROR(ENOTSUP));
1232
1233 /*
1234 * The receiving code doesn't know how to translate a WRITE_EMBEDDED
1235 * record to a plan WRITE record, so the pool must have the
1236 * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
1237 * records. Same with WRITE_EMBEDDED records that use LZ4 compression.
1238 */
1239 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
1240 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
1241 return (SET_ERROR(ENOTSUP));
1242 if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA_LZ4) &&
1243 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
1244 return (SET_ERROR(ENOTSUP));
1245
1246 /*
1247 * The receiving code doesn't know how to translate large blocks
1248 * to smaller ones, so the pool must have the LARGE_BLOCKS
1249 * feature enabled if the stream has LARGE_BLOCKS.
1250 */
1251 if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
1252 !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
1253 return (SET_ERROR(ENOTSUP));
1254
1255 error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1256 if (error == 0) {
1257 /* target fs already exists; recv into temp clone */
1258
1259 /* Can't recv a clone into an existing fs */
1260 if (flags & DRR_FLAG_CLONE) {
1261 dsl_dataset_rele(ds, FTAG);
1262 return (SET_ERROR(EINVAL));
1263 }
1264
1265 error = recv_begin_check_existing_impl(drba, ds, fromguid);
1266 dsl_dataset_rele(ds, FTAG);
1267 } else if (error == ENOENT) {
1268 /* target fs does not exist; must be a full backup or clone */
1269 char buf[MAXNAMELEN];
1270
1271 /*
1272 * If it's a non-clone incremental, we are missing the
1273 * target fs, so fail the recv.
1274 */
1275 if (fromguid != 0 && !(flags & DRR_FLAG_CLONE ||
1276 drba->drba_origin))
1277 return (SET_ERROR(ENOENT));
1278
1279 /* Open the parent of tofs */
1280 ASSERT3U(strlen(tofs), <, MAXNAMELEN);
1281 (void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1);
1282 error = dsl_dataset_hold(dp, buf, FTAG, &ds);
1283 if (error != 0)
1284 return (error);
1285
1286 /*
1287 * Check filesystem and snapshot limits before receiving. We'll
1288 * recheck snapshot limits again at the end (we create the
1289 * filesystems and increment those counts during begin_sync).
1290 */
1291 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1292 ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred);
1293 if (error != 0) {
1294 dsl_dataset_rele(ds, FTAG);
1295 return (error);
1296 }
1297
1298 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1299 ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred);
1300 if (error != 0) {
1301 dsl_dataset_rele(ds, FTAG);
1302 return (error);
1303 }
1304
1305 if (drba->drba_origin != NULL) {
1306 dsl_dataset_t *origin;
1307 error = dsl_dataset_hold(dp, drba->drba_origin,
1308 FTAG, &origin);
1309 if (error != 0) {
1310 dsl_dataset_rele(ds, FTAG);
1311 return (error);
1312 }
1313 if (!origin->ds_is_snapshot) {
1314 dsl_dataset_rele(origin, FTAG);
1315 dsl_dataset_rele(ds, FTAG);
1316 return (SET_ERROR(EINVAL));
1317 }
1318 if (dsl_dataset_phys(origin)->ds_guid != fromguid) {
1319 dsl_dataset_rele(origin, FTAG);
1320 dsl_dataset_rele(ds, FTAG);
1321 return (SET_ERROR(ENODEV));
1322 }
1323 dsl_dataset_rele(origin, FTAG);
1324 }
1325 dsl_dataset_rele(ds, FTAG);
1326 error = 0;
1327 }
1328 return (error);
1329 }
1330
1331 static void
1332 dmu_recv_begin_sync(void *arg, dmu_tx_t *tx)
1333 {
1334 dmu_recv_begin_arg_t *drba = arg;
1335 dsl_pool_t *dp = dmu_tx_pool(tx);
1336 struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
1337 const char *tofs = drba->drba_cookie->drc_tofs;
1338 dsl_dataset_t *ds, *newds;
1339 uint64_t dsobj;
1340 int error;
1341 uint64_t crflags;
1342
1343 crflags = (drrb->drr_flags & DRR_FLAG_CI_DATA) ?
1344 DS_FLAG_CI_DATASET : 0;
1345
1346 error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1347 if (error == 0) {
1348 /* create temporary clone */
1349 dsl_dataset_t *snap = NULL;
1350 if (drba->drba_snapobj != 0) {
1351 VERIFY0(dsl_dataset_hold_obj(dp,
1352 drba->drba_snapobj, FTAG, &snap));
1353 }
1354 dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name,
1355 snap, crflags, drba->drba_cred, tx);
1356 if (drba->drba_snapobj != 0)
1357 dsl_dataset_rele(snap, FTAG);
1358 dsl_dataset_rele(ds, FTAG);
1359 } else {
1360 dsl_dir_t *dd;
1361 const char *tail;
1362 dsl_dataset_t *origin = NULL;
1363
1364 VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail));
1365
1366 if (drba->drba_origin != NULL) {
1367 VERIFY0(dsl_dataset_hold(dp, drba->drba_origin,
1368 FTAG, &origin));
1369 }
1370
1371 /* Create new dataset. */
1372 dsobj = dsl_dataset_create_sync(dd,
1373 strrchr(tofs, '/') + 1,
1374 origin, crflags, drba->drba_cred, tx);
1375 if (origin != NULL)
1376 dsl_dataset_rele(origin, FTAG);
1377 dsl_dir_rele(dd, FTAG);
1378 drba->drba_cookie->drc_newfs = B_TRUE;
1379 }
1380 VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &newds));
1381
1382 dmu_buf_will_dirty(newds->ds_dbuf, tx);
1383 dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT;
1384
1385 /*
1386 * If we actually created a non-clone, we need to create the
1387 * objset in our new dataset.
1388 */
1389 if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds))) {
1390 (void) dmu_objset_create_impl(dp->dp_spa,
1391 newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx);
1392 }
1393
1394 drba->drba_cookie->drc_ds = newds;
1395
1396 spa_history_log_internal_ds(newds, "receive", tx, "");
1397 }
1398
1399 /*
1400 * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin()
1401 * succeeds; otherwise we will leak the holds on the datasets.
1402 */
1403 int
1404 dmu_recv_begin(char *tofs, char *tosnap, struct drr_begin *drrb,
1405 boolean_t force, char *origin, dmu_recv_cookie_t *drc)
1406 {
1407 dmu_recv_begin_arg_t drba = { 0 };
1408 dmu_replay_record_t *drr;
1409
1410 bzero(drc, sizeof (dmu_recv_cookie_t));
1411 drc->drc_drrb = drrb;
1412 drc->drc_tosnap = tosnap;
1413 drc->drc_tofs = tofs;
1414 drc->drc_force = force;
1415 drc->drc_cred = CRED();
1416
1417 if (drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC))
1418 drc->drc_byteswap = B_TRUE;
1419 else if (drrb->drr_magic != DMU_BACKUP_MAGIC)
1420 return (SET_ERROR(EINVAL));
1421
1422 drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
1423 drr->drr_type = DRR_BEGIN;
1424 drr->drr_u.drr_begin = *drc->drc_drrb;
1425 if (drc->drc_byteswap) {
1426 fletcher_4_incremental_byteswap(drr,
1427 sizeof (dmu_replay_record_t), &drc->drc_cksum);
1428 } else {
1429 fletcher_4_incremental_native(drr,
1430 sizeof (dmu_replay_record_t), &drc->drc_cksum);
1431 }
1432 kmem_free(drr, sizeof (dmu_replay_record_t));
1433
1434 if (drc->drc_byteswap) {
1435 drrb->drr_magic = BSWAP_64(drrb->drr_magic);
1436 drrb->drr_versioninfo = BSWAP_64(drrb->drr_versioninfo);
1437 drrb->drr_creation_time = BSWAP_64(drrb->drr_creation_time);
1438 drrb->drr_type = BSWAP_32(drrb->drr_type);
1439 drrb->drr_toguid = BSWAP_64(drrb->drr_toguid);
1440 drrb->drr_fromguid = BSWAP_64(drrb->drr_fromguid);
1441 }
1442
1443 drba.drba_origin = origin;
1444 drba.drba_cookie = drc;
1445 drba.drba_cred = CRED();
1446
1447 return (dsl_sync_task(tofs, dmu_recv_begin_check, dmu_recv_begin_sync,
1448 &drba, 5, ZFS_SPACE_CHECK_NORMAL));
1449 }
1450
1451 struct receive_record_arg {
1452 dmu_replay_record_t header;
1453 void *payload; /* Pointer to a buffer containing the payload */
1454 /*
1455 * If the record is a write, pointer to the arc_buf_t containing the
1456 * payload.
1457 */
1458 arc_buf_t *write_buf;
1459 int payload_size;
1460 boolean_t eos_marker; /* Marks the end of the stream */
1461 bqueue_node_t node;
1462 };
1463
1464 struct receive_writer_arg {
1465 objset_t *os;
1466 boolean_t byteswap;
1467 bqueue_t q;
1468 /*
1469 * These three args are used to signal to the main thread that we're
1470 * done.
1471 */
1472 kmutex_t mutex;
1473 kcondvar_t cv;
1474 boolean_t done;
1475 int err;
1476 /* A map from guid to dataset to help handle dedup'd streams. */
1477 avl_tree_t *guid_to_ds_map;
1478 };
1479
1480 struct receive_arg {
1481 objset_t *os;
1482 vnode_t *vp; /* The vnode to read the stream from */
1483 uint64_t voff; /* The current offset in the stream */
1484 /*
1485 * A record that has had its payload read in, but hasn't yet been handed
1486 * off to the worker thread.
1487 */
1488 struct receive_record_arg *rrd;
1489 /* A record that has had its header read in, but not its payload. */
1490 struct receive_record_arg *next_rrd;
1491 zio_cksum_t cksum;
1492 zio_cksum_t prev_cksum;
1493 int err;
1494 boolean_t byteswap;
1495 /* Sorted list of objects not to issue prefetches for. */
1496 list_t ignore_obj_list;
1497 };
1498
1499 struct receive_ign_obj_node {
1500 list_node_t node;
1501 uint64_t object;
1502 };
1503
1504 typedef struct guid_map_entry {
1505 uint64_t guid;
1506 dsl_dataset_t *gme_ds;
1507 avl_node_t avlnode;
1508 } guid_map_entry_t;
1509
1510 static int
1511 guid_compare(const void *arg1, const void *arg2)
1512 {
1513 const guid_map_entry_t *gmep1 = arg1;
1514 const guid_map_entry_t *gmep2 = arg2;
1515
1516 if (gmep1->guid < gmep2->guid)
1517 return (-1);
1518 else if (gmep1->guid > gmep2->guid)
1519 return (1);
1520 return (0);
1521 }
1522
1523 static void
1524 free_guid_map_onexit(void *arg)
1525 {
1526 avl_tree_t *ca = arg;
1527 void *cookie = NULL;
1528 guid_map_entry_t *gmep;
1529
1530 while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) {
1531 dsl_dataset_long_rele(gmep->gme_ds, gmep);
1532 dsl_dataset_rele(gmep->gme_ds, gmep);
1533 kmem_free(gmep, sizeof (guid_map_entry_t));
1534 }
1535 avl_destroy(ca);
1536 kmem_free(ca, sizeof (avl_tree_t));
1537 }
1538
1539 static int
1540 receive_read(struct receive_arg *ra, int len, void *buf)
1541 {
1542 int done = 0;
1543
1544 /* some things will require 8-byte alignment, so everything must */
1545 ASSERT0(len % 8);
1546
1547 while (done < len) {
1548 ssize_t resid;
1549
1550 ra->err = vn_rdwr(UIO_READ, ra->vp,
1551 (char *)buf + done, len - done,
1552 ra->voff, UIO_SYSSPACE, FAPPEND,
1553 RLIM64_INFINITY, CRED(), &resid);
1554
1555 if (resid == len - done)
1556 ra->err = SET_ERROR(EINVAL);
1557 ra->voff += len - done - resid;
1558 done = len - resid;
1559 if (ra->err != 0)
1560 return (ra->err);
1561 }
1562
1563 ASSERT3U(done, ==, len);
1564 return (0);
1565 }
1566
1567 noinline static void
1568 byteswap_record(dmu_replay_record_t *drr)
1569 {
1570 #define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
1571 #define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
1572 drr->drr_type = BSWAP_32(drr->drr_type);
1573 drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
1574
1575 switch (drr->drr_type) {
1576 case DRR_BEGIN:
1577 DO64(drr_begin.drr_magic);
1578 DO64(drr_begin.drr_versioninfo);
1579 DO64(drr_begin.drr_creation_time);
1580 DO32(drr_begin.drr_type);
1581 DO32(drr_begin.drr_flags);
1582 DO64(drr_begin.drr_toguid);
1583 DO64(drr_begin.drr_fromguid);
1584 break;
1585 case DRR_OBJECT:
1586 DO64(drr_object.drr_object);
1587 DO32(drr_object.drr_type);
1588 DO32(drr_object.drr_bonustype);
1589 DO32(drr_object.drr_blksz);
1590 DO32(drr_object.drr_bonuslen);
1591 DO64(drr_object.drr_toguid);
1592 break;
1593 case DRR_FREEOBJECTS:
1594 DO64(drr_freeobjects.drr_firstobj);
1595 DO64(drr_freeobjects.drr_numobjs);
1596 DO64(drr_freeobjects.drr_toguid);
1597 break;
1598 case DRR_WRITE:
1599 DO64(drr_write.drr_object);
1600 DO32(drr_write.drr_type);
1601 DO64(drr_write.drr_offset);
1602 DO64(drr_write.drr_length);
1603 DO64(drr_write.drr_toguid);
1604 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum);
1605 DO64(drr_write.drr_key.ddk_prop);
1606 break;
1607 case DRR_WRITE_BYREF:
1608 DO64(drr_write_byref.drr_object);
1609 DO64(drr_write_byref.drr_offset);
1610 DO64(drr_write_byref.drr_length);
1611 DO64(drr_write_byref.drr_toguid);
1612 DO64(drr_write_byref.drr_refguid);
1613 DO64(drr_write_byref.drr_refobject);
1614 DO64(drr_write_byref.drr_refoffset);
1615 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref.
1616 drr_key.ddk_cksum);
1617 DO64(drr_write_byref.drr_key.ddk_prop);
1618 break;
1619 case DRR_WRITE_EMBEDDED:
1620 DO64(drr_write_embedded.drr_object);
1621 DO64(drr_write_embedded.drr_offset);
1622 DO64(drr_write_embedded.drr_length);
1623 DO64(drr_write_embedded.drr_toguid);
1624 DO32(drr_write_embedded.drr_lsize);
1625 DO32(drr_write_embedded.drr_psize);
1626 break;
1627 case DRR_FREE:
1628 DO64(drr_free.drr_object);
1629 DO64(drr_free.drr_offset);
1630 DO64(drr_free.drr_length);
1631 DO64(drr_free.drr_toguid);
1632 break;
1633 case DRR_SPILL:
1634 DO64(drr_spill.drr_object);
1635 DO64(drr_spill.drr_length);
1636 DO64(drr_spill.drr_toguid);
1637 break;
1638 case DRR_END:
1639 DO64(drr_end.drr_toguid);
1640 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum);
1641 break;
1642 default:
1643 break;
1644 }
1645
1646 if (drr->drr_type != DRR_BEGIN) {
1647 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum);
1648 }
1649
1650 #undef DO64
1651 #undef DO32
1652 }
1653
1654 static inline uint8_t
1655 deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size)
1656 {
1657 if (bonus_type == DMU_OT_SA) {
1658 return (1);
1659 } else {
1660 return (1 +
1661 ((DN_MAX_BONUSLEN - bonus_size) >> SPA_BLKPTRSHIFT));
1662 }
1663 }
1664
1665 noinline static int
1666 receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
1667 void *data)
1668 {
1669 dmu_object_info_t doi;
1670 dmu_tx_t *tx;
1671 uint64_t object;
1672 int err;
1673
1674 if (drro->drr_type == DMU_OT_NONE ||
1675 !DMU_OT_IS_VALID(drro->drr_type) ||
1676 !DMU_OT_IS_VALID(drro->drr_bonustype) ||
1677 drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS ||
1678 drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
1679 P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
1680 drro->drr_blksz < SPA_MINBLOCKSIZE ||
1681 drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) ||
1682 drro->drr_bonuslen > DN_MAX_BONUSLEN) {
1683 return (SET_ERROR(EINVAL));
1684 }
1685
1686 err = dmu_object_info(rwa->os, drro->drr_object, &doi);
1687
1688 if (err != 0 && err != ENOENT)
1689 return (SET_ERROR(EINVAL));
1690 object = err == 0 ? drro->drr_object : DMU_NEW_OBJECT;
1691
1692 /*
1693 * If we are losing blkptrs or changing the block size this must
1694 * be a new file instance. We must clear out the previous file
1695 * contents before we can change this type of metadata in the dnode.
1696 */
1697 if (err == 0) {
1698 int nblkptr;
1699
1700 nblkptr = deduce_nblkptr(drro->drr_bonustype,
1701 drro->drr_bonuslen);
1702
1703 if (drro->drr_blksz != doi.doi_data_block_size ||
1704 nblkptr < doi.doi_nblkptr) {
1705 err = dmu_free_long_range(rwa->os, drro->drr_object,
1706 0, DMU_OBJECT_END);
1707 if (err != 0)
1708 return (SET_ERROR(EINVAL));
1709 }
1710 }
1711
1712 tx = dmu_tx_create(rwa->os);
1713 dmu_tx_hold_bonus(tx, object);
1714 err = dmu_tx_assign(tx, TXG_WAIT);
1715 if (err != 0) {
1716 dmu_tx_abort(tx);
1717 return (err);
1718 }
1719
1720 if (object == DMU_NEW_OBJECT) {
1721 /* currently free, want to be allocated */
1722 err = dmu_object_claim(rwa->os, drro->drr_object,
1723 drro->drr_type, drro->drr_blksz,
1724 drro->drr_bonustype, drro->drr_bonuslen, tx);
1725 } else if (drro->drr_type != doi.doi_type ||
1726 drro->drr_blksz != doi.doi_data_block_size ||
1727 drro->drr_bonustype != doi.doi_bonus_type ||
1728 drro->drr_bonuslen != doi.doi_bonus_size) {
1729 /* currently allocated, but with different properties */
1730 err = dmu_object_reclaim(rwa->os, drro->drr_object,
1731 drro->drr_type, drro->drr_blksz,
1732 drro->drr_bonustype, drro->drr_bonuslen, tx);
1733 }
1734 if (err != 0) {
1735 dmu_tx_commit(tx);
1736 return (SET_ERROR(EINVAL));
1737 }
1738
1739 dmu_object_set_checksum(rwa->os, drro->drr_object,
1740 drro->drr_checksumtype, tx);
1741 dmu_object_set_compress(rwa->os, drro->drr_object,
1742 drro->drr_compress, tx);
1743
1744 if (data != NULL) {
1745 dmu_buf_t *db;
1746
1747 VERIFY0(dmu_bonus_hold(rwa->os, drro->drr_object, FTAG, &db));
1748 dmu_buf_will_dirty(db, tx);
1749
1750 ASSERT3U(db->db_size, >=, drro->drr_bonuslen);
1751 bcopy(data, db->db_data, drro->drr_bonuslen);
1752 if (rwa->byteswap) {
1753 dmu_object_byteswap_t byteswap =
1754 DMU_OT_BYTESWAP(drro->drr_bonustype);
1755 dmu_ot_byteswap[byteswap].ob_func(db->db_data,
1756 drro->drr_bonuslen);
1757 }
1758 dmu_buf_rele(db, FTAG);
1759 }
1760 dmu_tx_commit(tx);
1761 return (0);
1762 }
1763
1764 /* ARGSUSED */
1765 noinline static int
1766 receive_freeobjects(struct receive_writer_arg *rwa,
1767 struct drr_freeobjects *drrfo)
1768 {
1769 uint64_t obj;
1770
1771 if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj)
1772 return (SET_ERROR(EINVAL));
1773
1774 for (obj = drrfo->drr_firstobj;
1775 obj < drrfo->drr_firstobj + drrfo->drr_numobjs;
1776 (void) dmu_object_next(rwa->os, &obj, FALSE, 0)) {
1777 int err;
1778
1779 if (dmu_object_info(rwa->os, obj, NULL) != 0)
1780 continue;
1781
1782 err = dmu_free_long_object(rwa->os, obj);
1783 if (err != 0)
1784 return (err);
1785 }
1786 return (0);
1787 }
1788
1789 noinline static int
1790 receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw,
1791 arc_buf_t *abuf)
1792 {
1793 dmu_tx_t *tx;
1794 dmu_buf_t *bonus;
1795 int err;
1796
1797 if (drrw->drr_offset + drrw->drr_length < drrw->drr_offset ||
1798 !DMU_OT_IS_VALID(drrw->drr_type))
1799 return (SET_ERROR(EINVAL));
1800
1801 if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0)
1802 return (SET_ERROR(EINVAL));
1803
1804 tx = dmu_tx_create(rwa->os);
1805
1806 dmu_tx_hold_write(tx, drrw->drr_object,
1807 drrw->drr_offset, drrw->drr_length);
1808 err = dmu_tx_assign(tx, TXG_WAIT);
1809 if (err != 0) {
1810 dmu_tx_abort(tx);
1811 return (err);
1812 }
1813 if (rwa->byteswap) {
1814 dmu_object_byteswap_t byteswap =
1815 DMU_OT_BYTESWAP(drrw->drr_type);
1816 dmu_ot_byteswap[byteswap].ob_func(abuf->b_data,
1817 drrw->drr_length);
1818 }
1819
1820 if (dmu_bonus_hold(rwa->os, drrw->drr_object, FTAG, &bonus) != 0)
1821 return (SET_ERROR(EINVAL));
1822 dmu_assign_arcbuf(bonus, drrw->drr_offset, abuf, tx);
1823 dmu_tx_commit(tx);
1824 dmu_buf_rele(bonus, FTAG);
1825 return (0);
1826 }
1827
1828 /*
1829 * Handle a DRR_WRITE_BYREF record. This record is used in dedup'ed
1830 * streams to refer to a copy of the data that is already on the
1831 * system because it came in earlier in the stream. This function
1832 * finds the earlier copy of the data, and uses that copy instead of
1833 * data from the stream to fulfill this write.
1834 */
1835 static int
1836 receive_write_byref(struct receive_writer_arg *rwa,
1837 struct drr_write_byref *drrwbr)
1838 {
1839 dmu_tx_t *tx;
1840 int err;
1841 guid_map_entry_t gmesrch;
1842 guid_map_entry_t *gmep;
1843 avl_index_t where;
1844 objset_t *ref_os = NULL;
1845 dmu_buf_t *dbp;
1846
1847 if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset)
1848 return (SET_ERROR(EINVAL));
1849
1850 /*
1851 * If the GUID of the referenced dataset is different from the
1852 * GUID of the target dataset, find the referenced dataset.
1853 */
1854 if (drrwbr->drr_toguid != drrwbr->drr_refguid) {
1855 gmesrch.guid = drrwbr->drr_refguid;
1856 if ((gmep = avl_find(rwa->guid_to_ds_map, &gmesrch,
1857 &where)) == NULL) {
1858 return (SET_ERROR(EINVAL));
1859 }
1860 if (dmu_objset_from_ds(gmep->gme_ds, &ref_os))
1861 return (SET_ERROR(EINVAL));
1862 } else {
1863 ref_os = rwa->os;
1864 }
1865
1866 err = dmu_buf_hold(ref_os, drrwbr->drr_refobject,
1867 drrwbr->drr_refoffset, FTAG, &dbp, DMU_READ_PREFETCH);
1868 if (err != 0)
1869 return (err);
1870
1871 tx = dmu_tx_create(rwa->os);
1872
1873 dmu_tx_hold_write(tx, drrwbr->drr_object,
1874 drrwbr->drr_offset, drrwbr->drr_length);
1875 err = dmu_tx_assign(tx, TXG_WAIT);
1876 if (err != 0) {
1877 dmu_tx_abort(tx);
1878 return (err);
1879 }
1880 dmu_write(rwa->os, drrwbr->drr_object,
1881 drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx);
1882 dmu_buf_rele(dbp, FTAG);
1883 dmu_tx_commit(tx);
1884 return (0);
1885 }
1886
1887 static int
1888 receive_write_embedded(struct receive_writer_arg *rwa,
1889 struct drr_write_embedded *drrwnp, void *data)
1890 {
1891 dmu_tx_t *tx;
1892 int err;
1893
1894 if (drrwnp->drr_offset + drrwnp->drr_length < drrwnp->drr_offset)
1895 return (EINVAL);
1896
1897 if (drrwnp->drr_psize > BPE_PAYLOAD_SIZE)
1898 return (EINVAL);
1899
1900 if (drrwnp->drr_etype >= NUM_BP_EMBEDDED_TYPES)
1901 return (EINVAL);
1902 if (drrwnp->drr_compression >= ZIO_COMPRESS_FUNCTIONS)
1903 return (EINVAL);
1904
1905 tx = dmu_tx_create(rwa->os);
1906
1907 dmu_tx_hold_write(tx, drrwnp->drr_object,
1908 drrwnp->drr_offset, drrwnp->drr_length);
1909 err = dmu_tx_assign(tx, TXG_WAIT);
1910 if (err != 0) {
1911 dmu_tx_abort(tx);
1912 return (err);
1913 }
1914
1915 dmu_write_embedded(rwa->os, drrwnp->drr_object,
1916 drrwnp->drr_offset, data, drrwnp->drr_etype,
1917 drrwnp->drr_compression, drrwnp->drr_lsize, drrwnp->drr_psize,
1918 rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx);
1919
1920 dmu_tx_commit(tx);
1921 return (0);
1922 }
1923
1924 static int
1925 receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs,
1926 void *data)
1927 {
1928 dmu_tx_t *tx;
1929 dmu_buf_t *db, *db_spill;
1930 int err;
1931
1932 if (drrs->drr_length < SPA_MINBLOCKSIZE ||
1933 drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os)))
1934 return (SET_ERROR(EINVAL));
1935
1936 if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0)
1937 return (SET_ERROR(EINVAL));
1938
1939 VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db));
1940 if ((err = dmu_spill_hold_by_bonus(db, FTAG, &db_spill)) != 0) {
1941 dmu_buf_rele(db, FTAG);
1942 return (err);
1943 }
1944
1945 tx = dmu_tx_create(rwa->os);
1946
1947 dmu_tx_hold_spill(tx, db->db_object);
1948
1949 err = dmu_tx_assign(tx, TXG_WAIT);
1950 if (err != 0) {
1951 dmu_buf_rele(db, FTAG);
1952 dmu_buf_rele(db_spill, FTAG);
1953 dmu_tx_abort(tx);
1954 return (err);
1955 }
1956 dmu_buf_will_dirty(db_spill, tx);
1957
1958 if (db_spill->db_size < drrs->drr_length)
1959 VERIFY(0 == dbuf_spill_set_blksz(db_spill,
1960 drrs->drr_length, tx));
1961 bcopy(data, db_spill->db_data, drrs->drr_length);
1962
1963 dmu_buf_rele(db, FTAG);
1964 dmu_buf_rele(db_spill, FTAG);
1965
1966 dmu_tx_commit(tx);
1967 return (0);
1968 }
1969
1970 /* ARGSUSED */
1971 noinline static int
1972 receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf)
1973 {
1974 int err;
1975
1976 if (drrf->drr_length != -1ULL &&
1977 drrf->drr_offset + drrf->drr_length < drrf->drr_offset)
1978 return (SET_ERROR(EINVAL));
1979
1980 if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0)
1981 return (SET_ERROR(EINVAL));
1982
1983 err = dmu_free_long_range(rwa->os, drrf->drr_object,
1984 drrf->drr_offset, drrf->drr_length);
1985
1986 return (err);
1987 }
1988
1989 /* used to destroy the drc_ds on error */
1990 static void
1991 dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc)
1992 {
1993 char name[MAXNAMELEN];
1994 dsl_dataset_name(drc->drc_ds, name);
1995 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
1996 (void) dsl_destroy_head(name);
1997 }
1998
1999 static void
2000 receive_cksum(struct receive_arg *ra, int len, void *buf)
2001 {
2002 if (ra->byteswap) {
2003 fletcher_4_incremental_byteswap(buf, len, &ra->cksum);
2004 } else {
2005 fletcher_4_incremental_native(buf, len, &ra->cksum);
2006 }
2007 }
2008
2009 /*
2010 * Read the payload into a buffer of size len, and update the current record's
2011 * payload field.
2012 * Allocate ra->next_rrd and read the next record's header into
2013 * ra->next_rrd->header.
2014 * Verify checksum of payload and next record.
2015 */
2016 static int
2017 receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf)
2018 {
2019 int err;
2020 zio_cksum_t cksum_orig;
2021 zio_cksum_t *cksump;
2022
2023 if (len != 0) {
2024 ASSERT3U(len, <=, SPA_MAXBLOCKSIZE);
2025 ra->rrd->payload = buf;
2026 ra->rrd->payload_size = len;
2027 err = receive_read(ra, len, ra->rrd->payload);
2028 if (err != 0)
2029 return (err);
2030 receive_cksum(ra, len, ra->rrd->payload);
2031 }
2032
2033 ra->prev_cksum = ra->cksum;
2034
2035 ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
2036 err = receive_read(ra, sizeof (ra->next_rrd->header),
2037 &ra->next_rrd->header);
2038 if (err != 0) {
2039 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2040 ra->next_rrd = NULL;
2041 return (err);
2042 }
2043 if (ra->next_rrd->header.drr_type == DRR_BEGIN) {
2044 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2045 ra->next_rrd = NULL;
2046 return (SET_ERROR(EINVAL));
2047 }
2048
2049 /*
2050 * Note: checksum is of everything up to but not including the
2051 * checksum itself.
2052 */
2053 ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
2054 ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
2055 receive_cksum(ra,
2056 offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
2057 &ra->next_rrd->header);
2058
2059 cksum_orig = ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2060 cksump = &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2061
2062 if (ra->byteswap)
2063 byteswap_record(&ra->next_rrd->header);
2064
2065 if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) &&
2066 !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) {
2067 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2068 ra->next_rrd = NULL;
2069 return (SET_ERROR(ECKSUM));
2070 }
2071
2072 receive_cksum(ra, sizeof (cksum_orig), &cksum_orig);
2073
2074 return (0);
2075 }
2076
2077 /*
2078 * Issue the prefetch reads for any necessary indirect blocks.
2079 *
2080 * We use the object ignore list to tell us whether or not to issue prefetches
2081 * for a given object. We do this for both correctness (in case the blocksize
2082 * of an object has changed) and performance (if the object doesn't exist, don't
2083 * needlessly try to issue prefetches). We also trim the list as we go through
2084 * the stream to prevent it from growing to an unbounded size.
2085 *
2086 * The object numbers within will always be in sorted order, and any write
2087 * records we see will also be in sorted order, but they're not sorted with
2088 * respect to each other (i.e. we can get several object records before
2089 * receiving each object's write records). As a result, once we've reached a
2090 * given object number, we can safely remove any reference to lower object
2091 * numbers in the ignore list. In practice, we receive up to 32 object records
2092 * before receiving write records, so the list can have up to 32 nodes in it.
2093 */
2094 /* ARGSUSED */
2095 static void
2096 receive_read_prefetch(struct receive_arg *ra,
2097 uint64_t object, uint64_t offset, uint64_t length)
2098 {
2099 struct receive_ign_obj_node *node = list_head(&ra->ignore_obj_list);
2100 while (node != NULL && node->object < object) {
2101 VERIFY3P(node, ==, list_remove_head(&ra->ignore_obj_list));
2102 kmem_free(node, sizeof (*node));
2103 node = list_head(&ra->ignore_obj_list);
2104 }
2105 if (node == NULL || node->object > object) {
2106 dmu_prefetch(ra->os, object, 1, offset, length,
2107 ZIO_PRIORITY_SYNC_READ);
2108 }
2109 }
2110
2111 /*
2112 * Read records off the stream, issuing any necessary prefetches.
2113 */
2114 static int
2115 receive_read_record(struct receive_arg *ra)
2116 {
2117 int err;
2118
2119 switch (ra->rrd->header.drr_type) {
2120 case DRR_OBJECT:
2121 {
2122 struct drr_object *drro = &ra->rrd->header.drr_u.drr_object;
2123 uint32_t size = P2ROUNDUP(drro->drr_bonuslen, 8);
2124 void *buf = kmem_zalloc(size, KM_SLEEP);
2125 dmu_object_info_t doi;
2126 err = receive_read_payload_and_next_header(ra, size, buf);
2127 if (err != 0) {
2128 kmem_free(buf, size);
2129 return (err);
2130 }
2131 err = dmu_object_info(ra->os, drro->drr_object, &doi);
2132 /*
2133 * See receive_read_prefetch for an explanation why we're
2134 * storing this object in the ignore_obj_list.
2135 */
2136 if (err == ENOENT ||
2137 (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) {
2138 struct receive_ign_obj_node *node =
2139 kmem_zalloc(sizeof (*node),
2140 KM_SLEEP);
2141 node->object = drro->drr_object;
2142 #ifdef ZFS_DEBUG
2143 {
2144 struct receive_ign_obj_node *last_object =
2145 list_tail(&ra->ignore_obj_list);
2146 uint64_t last_objnum = (last_object != NULL ?
2147 last_object->object : 0);
2148 ASSERT3U(node->object, >, last_objnum);
2149 }
2150 #endif
2151 list_insert_tail(&ra->ignore_obj_list, node);
2152 err = 0;
2153 }
2154 return (err);
2155 }
2156 case DRR_FREEOBJECTS:
2157 {
2158 err = receive_read_payload_and_next_header(ra, 0, NULL);
2159 return (err);
2160 }
2161 case DRR_WRITE:
2162 {
2163 struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write;
2164 arc_buf_t *abuf = arc_loan_buf(dmu_objset_spa(ra->os),
2165 drrw->drr_length);
2166
2167 err = receive_read_payload_and_next_header(ra,
2168 drrw->drr_length, abuf->b_data);
2169 if (err != 0) {
2170 dmu_return_arcbuf(abuf);
2171 return (err);
2172 }
2173 ra->rrd->write_buf = abuf;
2174 receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset,
2175 drrw->drr_length);
2176 return (err);
2177 }
2178 case DRR_WRITE_BYREF:
2179 {
2180 struct drr_write_byref *drrwb =
2181 &ra->rrd->header.drr_u.drr_write_byref;
2182 err = receive_read_payload_and_next_header(ra, 0, NULL);
2183 receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset,
2184 drrwb->drr_length);
2185 return (err);
2186 }
2187 case DRR_WRITE_EMBEDDED:
2188 {
2189 struct drr_write_embedded *drrwe =
2190 &ra->rrd->header.drr_u.drr_write_embedded;
2191 uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8);
2192 void *buf = kmem_zalloc(size, KM_SLEEP);
2193
2194 err = receive_read_payload_and_next_header(ra, size, buf);
2195 if (err != 0) {
2196 kmem_free(buf, size);
2197 return (err);
2198 }
2199
2200 receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset,
2201 drrwe->drr_length);
2202 return (err);
2203 }
2204 case DRR_FREE:
2205 {
2206 /*
2207 * It might be beneficial to prefetch indirect blocks here, but
2208 * we don't really have the data to decide for sure.
2209 */
2210 err = receive_read_payload_and_next_header(ra, 0, NULL);
2211 return (err);
2212 }
2213 case DRR_END:
2214 {
2215 struct drr_end *drre = &ra->rrd->header.drr_u.drr_end;
2216 if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum))
2217 return (SET_ERROR(EINVAL));
2218 return (0);
2219 }
2220 case DRR_SPILL:
2221 {
2222 struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill;
2223 void *buf = kmem_zalloc(drrs->drr_length, KM_SLEEP);
2224 err = receive_read_payload_and_next_header(ra, drrs->drr_length,
2225 buf);
2226 if (err != 0)
2227 kmem_free(buf, drrs->drr_length);
2228 return (err);
2229 }
2230 default:
2231 return (SET_ERROR(EINVAL));
2232 }
2233 }
2234
2235 /*
2236 * Commit the records to the pool.
2237 */
2238 static int
2239 receive_process_record(struct receive_writer_arg *rwa,
2240 struct receive_record_arg *rrd)
2241 {
2242 int err;
2243
2244 switch (rrd->header.drr_type) {
2245 case DRR_OBJECT:
2246 {
2247 struct drr_object *drro = &rrd->header.drr_u.drr_object;
2248 err = receive_object(rwa, drro, rrd->payload);
2249 kmem_free(rrd->payload, rrd->payload_size);
2250 rrd->payload = NULL;
2251 return (err);
2252 }
2253 case DRR_FREEOBJECTS:
2254 {
2255 struct drr_freeobjects *drrfo =
2256 &rrd->header.drr_u.drr_freeobjects;
2257 return (receive_freeobjects(rwa, drrfo));
2258 }
2259 case DRR_WRITE:
2260 {
2261 struct drr_write *drrw = &rrd->header.drr_u.drr_write;
2262 err = receive_write(rwa, drrw, rrd->write_buf);
2263 /* if receive_write() is successful, it consumes the arc_buf */
2264 if (err != 0)
2265 dmu_return_arcbuf(rrd->write_buf);
2266 rrd->write_buf = NULL;
2267 rrd->payload = NULL;
2268 return (err);
2269 }
2270 case DRR_WRITE_BYREF:
2271 {
2272 struct drr_write_byref *drrwbr =
2273 &rrd->header.drr_u.drr_write_byref;
2274 return (receive_write_byref(rwa, drrwbr));
2275 }
2276 case DRR_WRITE_EMBEDDED:
2277 {
2278 struct drr_write_embedded *drrwe =
2279 &rrd->header.drr_u.drr_write_embedded;
2280 err = receive_write_embedded(rwa, drrwe, rrd->payload);
2281 kmem_free(rrd->payload, rrd->payload_size);
2282 rrd->payload = NULL;
2283 return (err);
2284 }
2285 case DRR_FREE:
2286 {
2287 struct drr_free *drrf = &rrd->header.drr_u.drr_free;
2288 return (receive_free(rwa, drrf));
2289 }
2290 case DRR_SPILL:
2291 {
2292 struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
2293 err = receive_spill(rwa, drrs, rrd->payload);
2294 kmem_free(rrd->payload, rrd->payload_size);
2295 rrd->payload = NULL;
2296 return (err);
2297 }
2298 default:
2299 return (SET_ERROR(EINVAL));
2300 }
2301 }
2302
2303 /*
2304 * dmu_recv_stream's worker thread; pull records off the queue, and then call
2305 * receive_process_record When we're done, signal the main thread and exit.
2306 */
2307 static void
2308 receive_writer_thread(void *arg)
2309 {
2310 struct receive_writer_arg *rwa = arg;
2311 struct receive_record_arg *rrd;
2312 for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker;
2313 rrd = bqueue_dequeue(&rwa->q)) {
2314 /*
2315 * If there's an error, the main thread will stop putting things
2316 * on the queue, but we need to clear everything in it before we
2317 * can exit.
2318 */
2319 if (rwa->err == 0) {
2320 rwa->err = receive_process_record(rwa, rrd);
2321 } else if (rrd->write_buf != NULL) {
2322 dmu_return_arcbuf(rrd->write_buf);
2323 rrd->write_buf = NULL;
2324 rrd->payload = NULL;
2325 } else if (rrd->payload != NULL) {
2326 kmem_free(rrd->payload, rrd->payload_size);
2327 rrd->payload = NULL;
2328 }
2329 kmem_free(rrd, sizeof (*rrd));
2330 }
2331 kmem_free(rrd, sizeof (*rrd));
2332 mutex_enter(&rwa->mutex);
2333 rwa->done = B_TRUE;
2334 cv_signal(&rwa->cv);
2335 mutex_exit(&rwa->mutex);
2336 }
2337
2338 /*
2339 * Read in the stream's records, one by one, and apply them to the pool. There
2340 * are two threads involved; the thread that calls this function will spin up a
2341 * worker thread, read the records off the stream one by one, and issue
2342 * prefetches for any necessary indirect blocks. It will then push the records
2343 * onto an internal blocking queue. The worker thread will pull the records off
2344 * the queue, and actually write the data into the DMU. This way, the worker
2345 * thread doesn't have to wait for reads to complete, since everything it needs
2346 * (the indirect blocks) will be prefetched.
2347 *
2348 * NB: callers *must* call dmu_recv_end() if this succeeds.
2349 */
2350 int
2351 dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp,
2352 int cleanup_fd, uint64_t *action_handlep)
2353 {
2354 int err = 0;
2355 struct receive_arg *ra;
2356 struct receive_writer_arg *rwa;
2357 int featureflags;
2358 struct receive_ign_obj_node *n;
2359
2360 ra = kmem_zalloc(sizeof (*ra), KM_SLEEP);
2361 rwa = kmem_zalloc(sizeof (*rwa), KM_SLEEP);
2362
2363 ra->byteswap = drc->drc_byteswap;
2364 ra->cksum = drc->drc_cksum;
2365 ra->vp = vp;
2366 ra->voff = *voffp;
2367 list_create(&ra->ignore_obj_list, sizeof (struct receive_ign_obj_node),
2368 offsetof(struct receive_ign_obj_node, node));
2369
2370 /* these were verified in dmu_recv_begin */
2371 ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==,
2372 DMU_SUBSTREAM);
2373 ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES);
2374
2375 /*
2376 * Open the objset we are modifying.
2377 */
2378 VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra->os));
2379
2380 ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT);
2381
2382 featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
2383
2384 /* if this stream is dedup'ed, set up the avl tree for guid mapping */
2385 if (featureflags & DMU_BACKUP_FEATURE_DEDUP) {
2386 minor_t minor;
2387
2388 if (cleanup_fd == -1) {
2389 ra->err = SET_ERROR(EBADF);
2390 goto out;
2391 }
2392 ra->err = zfs_onexit_fd_hold(cleanup_fd, &minor);
2393 if (ra->err != 0) {
2394 cleanup_fd = -1;
2395 goto out;
2396 }
2397
2398 if (*action_handlep == 0) {
2399 rwa->guid_to_ds_map =
2400 kmem_alloc(sizeof (avl_tree_t), KM_SLEEP);
2401 avl_create(rwa->guid_to_ds_map, guid_compare,
2402 sizeof (guid_map_entry_t),
2403 offsetof(guid_map_entry_t, avlnode));
2404 err = zfs_onexit_add_cb(minor,
2405 free_guid_map_onexit, rwa->guid_to_ds_map,
2406 action_handlep);
2407 if (ra->err != 0)
2408 goto out;
2409 } else {
2410 err = zfs_onexit_cb_data(minor, *action_handlep,
2411 (void **)&rwa->guid_to_ds_map);
2412 if (ra->err != 0)
2413 goto out;
2414 }
2415
2416 drc->drc_guid_to_ds_map = rwa->guid_to_ds_map;
2417 }
2418
2419 err = receive_read_payload_and_next_header(ra, 0, NULL);
2420 if (err)
2421 goto out;
2422
2423 (void) bqueue_init(&rwa->q, zfs_recv_queue_length,
2424 offsetof(struct receive_record_arg, node));
2425 cv_init(&rwa->cv, NULL, CV_DEFAULT, NULL);
2426 mutex_init(&rwa->mutex, NULL, MUTEX_DEFAULT, NULL);
2427 rwa->os = ra->os;
2428 rwa->byteswap = drc->drc_byteswap;
2429
2430 (void) thread_create(NULL, 0, receive_writer_thread, rwa, 0, curproc,
2431 TS_RUN, minclsyspri);
2432 /*
2433 * We're reading rwa->err without locks, which is safe since we are the
2434 * only reader, and the worker thread is the only writer. It's ok if we
2435 * miss a write for an iteration or two of the loop, since the writer
2436 * thread will keep freeing records we send it until we send it an eos
2437 * marker.
2438 *
2439 * We can leave this loop in 3 ways: First, if rwa->err is
2440 * non-zero. In that case, the writer thread will free the rrd we just
2441 * pushed. Second, if we're interrupted; in that case, either it's the
2442 * first loop and ra->rrd was never allocated, or it's later, and ra.rrd
2443 * has been handed off to the writer thread who will free it. Finally,
2444 * if receive_read_record fails or we're at the end of the stream, then
2445 * we free ra->rrd and exit.
2446 */
2447 while (rwa->err == 0) {
2448 if (issig(JUSTLOOKING) && issig(FORREAL)) {
2449 err = SET_ERROR(EINTR);
2450 break;
2451 }
2452
2453 ASSERT3P(ra->rrd, ==, NULL);
2454 ra->rrd = ra->next_rrd;
2455 ra->next_rrd = NULL;
2456 /* Allocates and loads header into ra->next_rrd */
2457 err = receive_read_record(ra);
2458
2459 if (ra->rrd->header.drr_type == DRR_END || err != 0) {
2460 kmem_free(ra->rrd, sizeof (*ra->rrd));
2461 ra->rrd = NULL;
2462 break;
2463 }
2464
2465 bqueue_enqueue(&rwa->q, ra->rrd,
2466 sizeof (struct receive_record_arg) + ra->rrd->payload_size);
2467 ra->rrd = NULL;
2468 }
2469 if (ra->next_rrd == NULL)
2470 ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
2471 ra->next_rrd->eos_marker = B_TRUE;
2472 bqueue_enqueue(&rwa->q, ra->next_rrd, 1);
2473
2474 mutex_enter(&rwa->mutex);
2475 while (!rwa->done) {
2476 cv_wait(&rwa->cv, &rwa->mutex);
2477 }
2478 mutex_exit(&rwa->mutex);
2479
2480 cv_destroy(&rwa->cv);
2481 mutex_destroy(&rwa->mutex);
2482 bqueue_destroy(&rwa->q);
2483 if (err == 0)
2484 err = rwa->err;
2485
2486 out:
2487 if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1))
2488 zfs_onexit_fd_rele(cleanup_fd);
2489
2490 if (err != 0) {
2491 /*
2492 * destroy what we created, so we don't leave it in the
2493 * inconsistent restoring state.
2494 */
2495 dmu_recv_cleanup_ds(drc);
2496 }
2497
2498 *voffp = ra->voff;
2499
2500 for (n = list_remove_head(&ra->ignore_obj_list); n != NULL;
2501 n = list_remove_head(&ra->ignore_obj_list)) {
2502 kmem_free(n, sizeof (*n));
2503 }
2504 list_destroy(&ra->ignore_obj_list);
2505 kmem_free(ra, sizeof (*ra));
2506 kmem_free(rwa, sizeof (*rwa));
2507 return (err);
2508 }
2509
2510 static int
2511 dmu_recv_end_check(void *arg, dmu_tx_t *tx)
2512 {
2513 dmu_recv_cookie_t *drc = arg;
2514 dsl_pool_t *dp = dmu_tx_pool(tx);
2515 int error;
2516
2517 ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag);
2518
2519 if (!drc->drc_newfs) {
2520 dsl_dataset_t *origin_head;
2521
2522 error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head);
2523 if (error != 0)
2524 return (error);
2525 if (drc->drc_force) {
2526 /*
2527 * We will destroy any snapshots in tofs (i.e. before
2528 * origin_head) that are after the origin (which is
2529 * the snap before drc_ds, because drc_ds can not
2530 * have any snaps of its own).
2531 */
2532 uint64_t obj;
2533
2534 obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
2535 while (obj !=
2536 dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
2537 dsl_dataset_t *snap;
2538 error = dsl_dataset_hold_obj(dp, obj, FTAG,
2539 &snap);
2540 if (error != 0)
2541 break;
2542 if (snap->ds_dir != origin_head->ds_dir)
2543 error = SET_ERROR(EINVAL);
2544 if (error == 0) {
2545 error = dsl_destroy_snapshot_check_impl(
2546 snap, B_FALSE);
2547 }
2548 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
2549 dsl_dataset_rele(snap, FTAG);
2550 if (error != 0)
2551 break;
2552 }
2553 if (error != 0) {
2554 dsl_dataset_rele(origin_head, FTAG);
2555 return (error);
2556 }
2557 }
2558 error = dsl_dataset_clone_swap_check_impl(drc->drc_ds,
2559 origin_head, drc->drc_force, drc->drc_owner, tx);
2560 if (error != 0) {
2561 dsl_dataset_rele(origin_head, FTAG);
2562 return (error);
2563 }
2564 error = dsl_dataset_snapshot_check_impl(origin_head,
2565 drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
2566 dsl_dataset_rele(origin_head, FTAG);
2567 if (error != 0)
2568 return (error);
2569
2570 error = dsl_destroy_head_check_impl(drc->drc_ds, 1);
2571 } else {
2572 error = dsl_dataset_snapshot_check_impl(drc->drc_ds,
2573 drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
2574 }
2575 return (error);
2576 }
2577
2578 static void
2579 dmu_recv_end_sync(void *arg, dmu_tx_t *tx)
2580 {
2581 dmu_recv_cookie_t *drc = arg;
2582 dsl_pool_t *dp = dmu_tx_pool(tx);
2583
2584 spa_history_log_internal_ds(drc->drc_ds, "finish receiving",
2585 tx, "snap=%s", drc->drc_tosnap);
2586
2587 if (!drc->drc_newfs) {
2588 dsl_dataset_t *origin_head;
2589
2590 VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG,
2591 &origin_head));
2592
2593 if (drc->drc_force) {
2594 /*
2595 * Destroy any snapshots of drc_tofs (origin_head)
2596 * after the origin (the snap before drc_ds).
2597 */
2598 uint64_t obj;
2599
2600 obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
2601 while (obj !=
2602 dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
2603 dsl_dataset_t *snap;
2604 VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG,
2605 &snap));
2606 ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir);
2607 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
2608 dsl_destroy_snapshot_sync_impl(snap,
2609 B_FALSE, tx);
2610 dsl_dataset_rele(snap, FTAG);
2611 }
2612 }
2613 VERIFY3P(drc->drc_ds->ds_prev, ==,
2614 origin_head->ds_prev);
2615
2616 dsl_dataset_clone_swap_sync_impl(drc->drc_ds,
2617 origin_head, tx);
2618 dsl_dataset_snapshot_sync_impl(origin_head,
2619 drc->drc_tosnap, tx);
2620
2621 /* set snapshot's creation time and guid */
2622 dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx);
2623 dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time =
2624 drc->drc_drrb->drr_creation_time;
2625 dsl_dataset_phys(origin_head->ds_prev)->ds_guid =
2626 drc->drc_drrb->drr_toguid;
2627 dsl_dataset_phys(origin_head->ds_prev)->ds_flags &=
2628 ~DS_FLAG_INCONSISTENT;
2629
2630 dmu_buf_will_dirty(origin_head->ds_dbuf, tx);
2631 dsl_dataset_phys(origin_head)->ds_flags &=
2632 ~DS_FLAG_INCONSISTENT;
2633
2634 dsl_dataset_rele(origin_head, FTAG);
2635 dsl_destroy_head_sync_impl(drc->drc_ds, tx);
2636
2637 if (drc->drc_owner != NULL)
2638 VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner);
2639 } else {
2640 dsl_dataset_t *ds = drc->drc_ds;
2641
2642 dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx);
2643
2644 /* set snapshot's creation time and guid */
2645 dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
2646 dsl_dataset_phys(ds->ds_prev)->ds_creation_time =
2647 drc->drc_drrb->drr_creation_time;
2648 dsl_dataset_phys(ds->ds_prev)->ds_guid =
2649 drc->drc_drrb->drr_toguid;
2650 dsl_dataset_phys(ds->ds_prev)->ds_flags &=
2651 ~DS_FLAG_INCONSISTENT;
2652
2653 dmu_buf_will_dirty(ds->ds_dbuf, tx);
2654 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
2655 }
2656 drc->drc_newsnapobj = dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj;
2657 zvol_create_minors(dp->dp_spa, drc->drc_tofs, B_TRUE);
2658 /*
2659 * Release the hold from dmu_recv_begin. This must be done before
2660 * we return to open context, so that when we free the dataset's dnode,
2661 * we can evict its bonus buffer.
2662 */
2663 dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
2664 drc->drc_ds = NULL;
2665 }
2666
2667 static int
2668 add_ds_to_guidmap(const char *name, avl_tree_t *guid_map, uint64_t snapobj)
2669 {
2670 dsl_pool_t *dp;
2671 dsl_dataset_t *snapds;
2672 guid_map_entry_t *gmep;
2673 int err;
2674
2675 ASSERT(guid_map != NULL);
2676
2677 err = dsl_pool_hold(name, FTAG, &dp);
2678 if (err != 0)
2679 return (err);
2680 gmep = kmem_alloc(sizeof (*gmep), KM_SLEEP);
2681 err = dsl_dataset_hold_obj(dp, snapobj, gmep, &snapds);
2682 if (err == 0) {
2683 gmep->guid = dsl_dataset_phys(snapds)->ds_guid;
2684 gmep->gme_ds = snapds;
2685 avl_add(guid_map, gmep);
2686 dsl_dataset_long_hold(snapds, gmep);
2687 } else {
2688 kmem_free(gmep, sizeof (*gmep));
2689 }
2690
2691 dsl_pool_rele(dp, FTAG);
2692 return (err);
2693 }
2694
2695 static int dmu_recv_end_modified_blocks = 3;
2696
2697 static int
2698 dmu_recv_existing_end(dmu_recv_cookie_t *drc)
2699 {
2700 int error;
2701
2702 #ifdef _KERNEL
2703 char *name;
2704
2705 /*
2706 * We will be destroying the ds; make sure its origin is unmounted if
2707 * necessary.
2708 */
2709 name = kmem_alloc(MAXNAMELEN, KM_SLEEP);
2710 dsl_dataset_name(drc->drc_ds, name);
2711 zfs_destroy_unmount_origin(name);
2712 kmem_free(name, MAXNAMELEN);
2713 #endif
2714
2715 error = dsl_sync_task(drc->drc_tofs,
2716 dmu_recv_end_check, dmu_recv_end_sync, drc,
2717 dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL);
2718
2719 if (error != 0)
2720 dmu_recv_cleanup_ds(drc);
2721 return (error);
2722 }
2723
2724 static int
2725 dmu_recv_new_end(dmu_recv_cookie_t *drc)
2726 {
2727 int error;
2728
2729 error = dsl_sync_task(drc->drc_tofs,
2730 dmu_recv_end_check, dmu_recv_end_sync, drc,
2731 dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL);
2732
2733 if (error != 0) {
2734 dmu_recv_cleanup_ds(drc);
2735 } else if (drc->drc_guid_to_ds_map != NULL) {
2736 (void) add_ds_to_guidmap(drc->drc_tofs,
2737 drc->drc_guid_to_ds_map,
2738 drc->drc_newsnapobj);
2739 }
2740 return (error);
2741 }
2742
2743 int
2744 dmu_recv_end(dmu_recv_cookie_t *drc, void *owner)
2745 {
2746 drc->drc_owner = owner;
2747
2748 if (drc->drc_newfs)
2749 return (dmu_recv_new_end(drc));
2750 else
2751 return (dmu_recv_existing_end(drc));
2752 }
2753
2754 /*
2755 * Return TRUE if this objset is currently being received into.
2756 */
2757 boolean_t
2758 dmu_objset_is_receiving(objset_t *os)
2759 {
2760 return (os->os_dsl_dataset != NULL &&
2761 os->os_dsl_dataset->ds_owner == dmu_recv_tag);
2762 }
2763
2764 #if defined(_KERNEL)
2765 module_param(zfs_send_corrupt_data, int, 0644);
2766 MODULE_PARM_DESC(zfs_send_corrupt_data, "Allow sending corrupt data");
2767 #endif