+ if (drc->drc_resumable) {
+ /* wait for our resume state to be written to disk */
+ txg_wait_synced(drc->drc_ds->ds_dir->dd_pool, 0);
+ dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
+ } else {
+ char name[ZFS_MAX_DATASET_NAME_LEN];
+ dsl_dataset_name(drc->drc_ds, name);
+ dsl_dataset_disown(drc->drc_ds, dmu_recv_tag);
+ (void) dsl_destroy_head(name);
+ }
+}
+
+static void
+receive_cksum(struct receive_arg *ra, int len, void *buf)
+{
+ if (ra->byteswap) {
+ (void) fletcher_4_incremental_byteswap(buf, len, &ra->cksum);
+ } else {
+ (void) fletcher_4_incremental_native(buf, len, &ra->cksum);
+ }
+}
+
+/*
+ * Read the payload into a buffer of size len, and update the current record's
+ * payload field.
+ * Allocate ra->next_rrd and read the next record's header into
+ * ra->next_rrd->header.
+ * Verify checksum of payload and next record.
+ */
+static int
+receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf)
+{
+ int err;
+ zio_cksum_t cksum_orig;
+ zio_cksum_t *cksump;
+
+ if (len != 0) {
+ ASSERT3U(len, <=, SPA_MAXBLOCKSIZE);
+ err = receive_read(ra, len, buf);
+ if (err != 0)
+ return (err);
+ receive_cksum(ra, len, buf);
+
+ /* note: rrd is NULL when reading the begin record's payload */
+ if (ra->rrd != NULL) {
+ ra->rrd->payload = buf;
+ ra->rrd->payload_size = len;
+ ra->rrd->bytes_read = ra->bytes_read;
+ }
+ }
+
+ ra->prev_cksum = ra->cksum;
+
+ ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
+ err = receive_read(ra, sizeof (ra->next_rrd->header),
+ &ra->next_rrd->header);
+ ra->next_rrd->bytes_read = ra->bytes_read;
+ if (err != 0) {
+ kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
+ ra->next_rrd = NULL;
+ return (err);
+ }
+ if (ra->next_rrd->header.drr_type == DRR_BEGIN) {
+ kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
+ ra->next_rrd = NULL;
+ return (SET_ERROR(EINVAL));
+ }
+
+ /*
+ * Note: checksum is of everything up to but not including the
+ * checksum itself.
+ */
+ ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
+ ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
+ receive_cksum(ra,
+ offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
+ &ra->next_rrd->header);
+
+ cksum_orig = ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
+ cksump = &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
+
+ if (ra->byteswap)
+ byteswap_record(&ra->next_rrd->header);
+
+ if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) &&
+ !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) {
+ kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
+ ra->next_rrd = NULL;
+ return (SET_ERROR(ECKSUM));
+ }
+
+ receive_cksum(ra, sizeof (cksum_orig), &cksum_orig);
+
+ return (0);
+}
+
+static void
+objlist_create(struct objlist *list)
+{
+ list_create(&list->list, sizeof (struct receive_objnode),
+ offsetof(struct receive_objnode, node));
+ list->last_lookup = 0;
+}
+
+static void
+objlist_destroy(struct objlist *list)
+{
+ struct receive_objnode *n;
+
+ for (n = list_remove_head(&list->list);
+ n != NULL; n = list_remove_head(&list->list)) {
+ kmem_free(n, sizeof (*n));
+ }
+ list_destroy(&list->list);
+}
+
+/*
+ * This function looks through the objlist to see if the specified object number
+ * is contained in the objlist. In the process, it will remove all object
+ * numbers in the list that are smaller than the specified object number. Thus,
+ * any lookup of an object number smaller than a previously looked up object
+ * number will always return false; therefore, all lookups should be done in
+ * ascending order.
+ */
+static boolean_t
+objlist_exists(struct objlist *list, uint64_t object)
+{
+ struct receive_objnode *node = list_head(&list->list);
+ ASSERT3U(object, >=, list->last_lookup);
+ list->last_lookup = object;
+ while (node != NULL && node->object < object) {
+ VERIFY3P(node, ==, list_remove_head(&list->list));
+ kmem_free(node, sizeof (*node));
+ node = list_head(&list->list);
+ }
+ return (node != NULL && node->object == object);
+}
+
+/*
+ * The objlist is a list of object numbers stored in ascending order. However,
+ * the insertion of new object numbers does not seek out the correct location to
+ * store a new object number; instead, it appends it to the list for simplicity.
+ * Thus, any users must take care to only insert new object numbers in ascending
+ * order.
+ */
+static void
+objlist_insert(struct objlist *list, uint64_t object)
+{
+ struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP);
+ node->object = object;
+#ifdef ZFS_DEBUG
+ {
+ struct receive_objnode *last_object = list_tail(&list->list);
+ uint64_t last_objnum = (last_object != NULL ? last_object->object : 0);
+ ASSERT3U(node->object, >, last_objnum);
+ }
+#endif
+ list_insert_tail(&list->list, node);
+}
+
+/*
+ * Issue the prefetch reads for any necessary indirect blocks.
+ *
+ * We use the object ignore list to tell us whether or not to issue prefetches
+ * for a given object. We do this for both correctness (in case the blocksize
+ * of an object has changed) and performance (if the object doesn't exist, don't
+ * needlessly try to issue prefetches). We also trim the list as we go through
+ * the stream to prevent it from growing to an unbounded size.
+ *
+ * The object numbers within will always be in sorted order, and any write
+ * records we see will also be in sorted order, but they're not sorted with
+ * respect to each other (i.e. we can get several object records before
+ * receiving each object's write records). As a result, once we've reached a
+ * given object number, we can safely remove any reference to lower object
+ * numbers in the ignore list. In practice, we receive up to 32 object records
+ * before receiving write records, so the list can have up to 32 nodes in it.
+ */
+/* ARGSUSED */
+static void
+receive_read_prefetch(struct receive_arg *ra,
+ uint64_t object, uint64_t offset, uint64_t length)
+{
+ if (!objlist_exists(&ra->ignore_objlist, object)) {
+ dmu_prefetch(ra->os, object, 1, offset, length,
+ ZIO_PRIORITY_SYNC_READ);
+ }
+}
+
+/*
+ * Read records off the stream, issuing any necessary prefetches.
+ */
+static int
+receive_read_record(struct receive_arg *ra)
+{
+ int err;
+
+ switch (ra->rrd->header.drr_type) {
+ case DRR_OBJECT:
+ {
+ struct drr_object *drro = &ra->rrd->header.drr_u.drr_object;
+ uint32_t size = P2ROUNDUP(drro->drr_bonuslen, 8);
+ void *buf = kmem_zalloc(size, KM_SLEEP);
+ dmu_object_info_t doi;
+ err = receive_read_payload_and_next_header(ra, size, buf);
+ if (err != 0) {
+ kmem_free(buf, size);
+ return (err);
+ }
+ err = dmu_object_info(ra->os, drro->drr_object, &doi);
+ /*
+ * See receive_read_prefetch for an explanation why we're
+ * storing this object in the ignore_obj_list.
+ */
+ if (err == ENOENT || err == EEXIST ||
+ (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) {
+ objlist_insert(&ra->ignore_objlist, drro->drr_object);
+ err = 0;
+ }
+ return (err);
+ }
+ case DRR_FREEOBJECTS:
+ {
+ err = receive_read_payload_and_next_header(ra, 0, NULL);
+ return (err);
+ }
+ case DRR_WRITE:
+ {
+ struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write;
+ arc_buf_t *abuf;
+ boolean_t is_meta = DMU_OT_IS_METADATA(drrw->drr_type);
+ if (DRR_WRITE_COMPRESSED(drrw)) {
+ ASSERT3U(drrw->drr_compressed_size, >, 0);
+ ASSERT3U(drrw->drr_logical_size, >=,
+ drrw->drr_compressed_size);
+ ASSERT(!is_meta);
+ abuf = arc_loan_compressed_buf(
+ dmu_objset_spa(ra->os),
+ drrw->drr_compressed_size, drrw->drr_logical_size,
+ drrw->drr_compressiontype);
+ } else {
+ abuf = arc_loan_buf(dmu_objset_spa(ra->os),
+ is_meta, drrw->drr_logical_size);
+ }
+
+ err = receive_read_payload_and_next_header(ra,
+ DRR_WRITE_PAYLOAD_SIZE(drrw), abuf->b_data);
+ if (err != 0) {
+ dmu_return_arcbuf(abuf);
+ return (err);
+ }
+ ra->rrd->write_buf = abuf;
+ receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset,
+ drrw->drr_logical_size);
+ return (err);
+ }
+ case DRR_WRITE_BYREF:
+ {
+ struct drr_write_byref *drrwb =
+ &ra->rrd->header.drr_u.drr_write_byref;
+ err = receive_read_payload_and_next_header(ra, 0, NULL);
+ receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset,
+ drrwb->drr_length);
+ return (err);
+ }
+ case DRR_WRITE_EMBEDDED:
+ {
+ struct drr_write_embedded *drrwe =
+ &ra->rrd->header.drr_u.drr_write_embedded;
+ uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8);
+ void *buf = kmem_zalloc(size, KM_SLEEP);
+
+ err = receive_read_payload_and_next_header(ra, size, buf);
+ if (err != 0) {
+ kmem_free(buf, size);
+ return (err);
+ }
+
+ receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset,
+ drrwe->drr_length);
+ return (err);
+ }
+ case DRR_FREE:
+ {
+ /*
+ * It might be beneficial to prefetch indirect blocks here, but
+ * we don't really have the data to decide for sure.
+ */
+ err = receive_read_payload_and_next_header(ra, 0, NULL);
+ return (err);
+ }
+ case DRR_END:
+ {
+ struct drr_end *drre = &ra->rrd->header.drr_u.drr_end;
+ if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum))
+ return (SET_ERROR(ECKSUM));
+ return (0);
+ }
+ case DRR_SPILL:
+ {
+ struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill;
+ void *buf = kmem_zalloc(drrs->drr_length, KM_SLEEP);
+ err = receive_read_payload_and_next_header(ra, drrs->drr_length,
+ buf);
+ if (err != 0)
+ kmem_free(buf, drrs->drr_length);
+ return (err);
+ }
+ default:
+ return (SET_ERROR(EINVAL));
+ }
+}
+
+static void
+dprintf_drr(struct receive_record_arg *rrd, int err)
+{
+ switch (rrd->header.drr_type) {
+ case DRR_OBJECT:
+ {
+ struct drr_object *drro = &rrd->header.drr_u.drr_object;
+ dprintf("drr_type = OBJECT obj = %llu type = %u "
+ "bonustype = %u blksz = %u bonuslen = %u cksumtype = %u "
+ "compress = %u dn_slots = %u err = %d\n",
+ drro->drr_object, drro->drr_type, drro->drr_bonustype,
+ drro->drr_blksz, drro->drr_bonuslen,
+ drro->drr_checksumtype, drro->drr_compress,
+ drro->drr_dn_slots, err);
+ break;
+ }
+ case DRR_FREEOBJECTS:
+ {
+ struct drr_freeobjects *drrfo =
+ &rrd->header.drr_u.drr_freeobjects;
+ dprintf("drr_type = FREEOBJECTS firstobj = %llu "
+ "numobjs = %llu err = %d\n",
+ drrfo->drr_firstobj, drrfo->drr_numobjs, err);
+ break;
+ }
+ case DRR_WRITE:
+ {
+ struct drr_write *drrw = &rrd->header.drr_u.drr_write;
+ dprintf("drr_type = WRITE obj = %llu type = %u offset = %llu "
+ "lsize = %llu cksumtype = %u cksumflags = %u "
+ "compress = %u psize = %llu err = %d\n",
+ drrw->drr_object, drrw->drr_type, drrw->drr_offset,
+ drrw->drr_logical_size, drrw->drr_checksumtype,
+ drrw->drr_checksumflags, drrw->drr_compressiontype,
+ drrw->drr_compressed_size, err);
+ break;
+ }
+ case DRR_WRITE_BYREF:
+ {
+ struct drr_write_byref *drrwbr =
+ &rrd->header.drr_u.drr_write_byref;
+ dprintf("drr_type = WRITE_BYREF obj = %llu offset = %llu "
+ "length = %llu toguid = %llx refguid = %llx "
+ "refobject = %llu refoffset = %llu cksumtype = %u "
+ "cksumflags = %u err = %d\n",
+ drrwbr->drr_object, drrwbr->drr_offset,
+ drrwbr->drr_length, drrwbr->drr_toguid,
+ drrwbr->drr_refguid, drrwbr->drr_refobject,
+ drrwbr->drr_refoffset, drrwbr->drr_checksumtype,
+ drrwbr->drr_checksumflags, err);
+ break;
+ }
+ case DRR_WRITE_EMBEDDED:
+ {
+ struct drr_write_embedded *drrwe =
+ &rrd->header.drr_u.drr_write_embedded;
+ dprintf("drr_type = WRITE_EMBEDDED obj = %llu offset = %llu "
+ "length = %llu compress = %u etype = %u lsize = %u "
+ "psize = %u err = %d\n",
+ drrwe->drr_object, drrwe->drr_offset, drrwe->drr_length,
+ drrwe->drr_compression, drrwe->drr_etype,
+ drrwe->drr_lsize, drrwe->drr_psize, err);
+ break;
+ }
+ case DRR_FREE:
+ {
+ struct drr_free *drrf = &rrd->header.drr_u.drr_free;
+ dprintf("drr_type = FREE obj = %llu offset = %llu "
+ "length = %lld err = %d\n",
+ drrf->drr_object, drrf->drr_offset, drrf->drr_length,
+ err);
+ break;
+ }
+ case DRR_SPILL:
+ {
+ struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
+ dprintf("drr_type = SPILL obj = %llu length = %llu "
+ "err = %d\n", drrs->drr_object, drrs->drr_length, err);
+ break;
+ }
+ default:
+ return;
+ }
+}
+
+/*
+ * Commit the records to the pool.
+ */
+static int
+receive_process_record(struct receive_writer_arg *rwa,
+ struct receive_record_arg *rrd)
+{
+ int err;
+
+ /* Processing in order, therefore bytes_read should be increasing. */
+ ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read);
+ rwa->bytes_read = rrd->bytes_read;
+
+ switch (rrd->header.drr_type) {
+ case DRR_OBJECT:
+ {
+ struct drr_object *drro = &rrd->header.drr_u.drr_object;
+ err = receive_object(rwa, drro, rrd->payload);
+ kmem_free(rrd->payload, rrd->payload_size);
+ rrd->payload = NULL;
+ break;
+ }
+ case DRR_FREEOBJECTS:
+ {
+ struct drr_freeobjects *drrfo =
+ &rrd->header.drr_u.drr_freeobjects;
+ err = receive_freeobjects(rwa, drrfo);
+ break;
+ }
+ case DRR_WRITE:
+ {
+ struct drr_write *drrw = &rrd->header.drr_u.drr_write;
+ err = receive_write(rwa, drrw, rrd->write_buf);
+ /* if receive_write() is successful, it consumes the arc_buf */
+ if (err != 0)
+ dmu_return_arcbuf(rrd->write_buf);
+ rrd->write_buf = NULL;
+ rrd->payload = NULL;
+ break;
+ }
+ case DRR_WRITE_BYREF:
+ {
+ struct drr_write_byref *drrwbr =
+ &rrd->header.drr_u.drr_write_byref;
+ err = receive_write_byref(rwa, drrwbr);
+ break;
+ }
+ case DRR_WRITE_EMBEDDED:
+ {
+ struct drr_write_embedded *drrwe =
+ &rrd->header.drr_u.drr_write_embedded;
+ err = receive_write_embedded(rwa, drrwe, rrd->payload);
+ kmem_free(rrd->payload, rrd->payload_size);
+ rrd->payload = NULL;
+ break;
+ }
+ case DRR_FREE:
+ {
+ struct drr_free *drrf = &rrd->header.drr_u.drr_free;
+ err = receive_free(rwa, drrf);
+ break;
+ }
+ case DRR_SPILL:
+ {
+ struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
+ err = receive_spill(rwa, drrs, rrd->payload);
+ kmem_free(rrd->payload, rrd->payload_size);
+ rrd->payload = NULL;
+ break;
+ }
+ default:
+ return (SET_ERROR(EINVAL));
+ }
+
+ if (err != 0)
+ dprintf_drr(rrd, err);
+
+ return (err);
+}
+
+/*
+ * dmu_recv_stream's worker thread; pull records off the queue, and then call
+ * receive_process_record When we're done, signal the main thread and exit.
+ */
+static void
+receive_writer_thread(void *arg)
+{
+ struct receive_writer_arg *rwa = arg;
+ struct receive_record_arg *rrd;
+ fstrans_cookie_t cookie = spl_fstrans_mark();
+
+ for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker;
+ rrd = bqueue_dequeue(&rwa->q)) {
+ /*
+ * If there's an error, the main thread will stop putting things
+ * on the queue, but we need to clear everything in it before we
+ * can exit.
+ */
+ if (rwa->err == 0) {
+ rwa->err = receive_process_record(rwa, rrd);
+ } else if (rrd->write_buf != NULL) {
+ dmu_return_arcbuf(rrd->write_buf);
+ rrd->write_buf = NULL;
+ rrd->payload = NULL;
+ } else if (rrd->payload != NULL) {
+ kmem_free(rrd->payload, rrd->payload_size);
+ rrd->payload = NULL;
+ }
+ kmem_free(rrd, sizeof (*rrd));
+ }
+ kmem_free(rrd, sizeof (*rrd));
+ mutex_enter(&rwa->mutex);
+ rwa->done = B_TRUE;
+ cv_signal(&rwa->cv);
+ mutex_exit(&rwa->mutex);
+ spl_fstrans_unmark(cookie);
+ thread_exit();
+}
+
+static int
+resume_check(struct receive_arg *ra, nvlist_t *begin_nvl)
+{
+ uint64_t val;
+ objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset;
+ uint64_t dsobj = dmu_objset_id(ra->os);
+ uint64_t resume_obj, resume_off;
+
+ if (nvlist_lookup_uint64(begin_nvl,
+ "resume_object", &resume_obj) != 0 ||
+ nvlist_lookup_uint64(begin_nvl,
+ "resume_offset", &resume_off) != 0) {
+ return (SET_ERROR(EINVAL));
+ }
+ VERIFY0(zap_lookup(mos, dsobj,
+ DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val));
+ if (resume_obj != val)
+ return (SET_ERROR(EINVAL));
+ VERIFY0(zap_lookup(mos, dsobj,
+ DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val));
+ if (resume_off != val)
+ return (SET_ERROR(EINVAL));
+
+ return (0);