Print this page
NEX-15281 zfs_panic_recover() during hpr disable/enable
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-15281 zfs_panic_recover() during hpr disable/enable
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-13629 zfs send -s: assertion failed: err != 0 || (dsp->dsa_sent_begin && dsp->dsa_sent_end), file: ../../common/fs/zfs/dmu_send.c, line: 1010
Reviewed by: Alex Deiter <alex.deiter@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
NEX-9752 backport illumos 6950 ARC should cache compressed data
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Yuri Pankov <yuri.pankov@nexenta.com>
6950 ARC should cache compressed data
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Dan Kimmel <dan.kimmel@delphix.com>
Reviewed by: Matt Ahrens <mahrens@delphix.com>
Reviewed by: Paul Dagnelie <pcd@delphix.com>
Reviewed by: Don Brady <don.brady@intel.com>
Reviewed by: Richard Elling <Richard.Elling@RichardElling.com>
Approved by: Richard Lowe <richlowe@richlowe.net>
NEX-9575 zfs send -s panics
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Roman Strashkin <roman.strashkin@nexenta.com>
Revert "NEX-7251 Resume_token is not cleared right after finishing receive"
This reverts commit 9e97a45e8cf6ca59307a39e2d3c11c6e845e4187.
NEX-7251 Resume_token is not cleared right after finishing receive
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Alexey Komarov <alexey.komarov@nexenta.com>
NEX-5928 KRRP: Integrate illumos/openzfs resume-token, to resume replication from a given synced offset
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Alexey Komarov <alexey.komarov@nexenta.com>
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
NEX-5795 Rename 'wrc' as 'wbc' in the source and in the tech docs
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
NEX-5272 KRRP: replicate snapshot properties
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Alexey Komarov <alexey.komarov@nexenta.com>
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
NEX-5270 WBC: Incorrect error message when trying to 'zfs recv' into wrcached dataset
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
NEX-5132 WBC: Do not allow recv to datasets with enabled writecache
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
6358 A faulted pool with only unavailable vdevs triggers assertion failure in libzfs
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Andrew Stormont <andyjstormont@gmail.com>
Reviewed by: Serban Maduta <serban.maduta@gmail.com>
Approved by: Dan McDonald <danmcd@omniti.com>
6393 zfs receive a full send as a clone
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Richard Elling <Richard.Elling@RichardElling.com>
Approved by: Dan McDonald <danmcd@omniti.com>
2605 want to resume interrupted zfs send
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Paul Dagnelie <pcd@delphix.com>
Reviewed by: Richard Elling <Richard.Elling@RichardElling.com>
Reviewed by: Xin Li <delphij@freebsd.org>
Reviewed by: Arne Jansen <sensille@gmx.net>
Approved by: Dan McDonald <danmcd@omniti.com>
4185 add new cryptographic checksums to ZFS: SHA-512, Skein, Edon-R (fix studio build)
4185 add new cryptographic checksums to ZFS: SHA-512, Skein, Edon-R
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
Reviewed by: Richard Lowe <richlowe@richlowe.net>
Approved by: Garrett D'Amore <garrett@damore.org>
6047 SPARC boot should support feature@embedded_data
Reviewed by: Igor Kozhukhov <ikozhukhov@gmail.com>
Approved by: Dan McDonald <danmcd@omniti.com>
5959 clean up per-dataset feature count code
Reviewed by: Toomas Soome <tsoome@me.com>
Reviewed by: George Wilson <george@delphix.com>
Reviewed by: Alex Reece <alex@delphix.com>
Approved by: Richard Lowe <richlowe@richlowe.net>
NEX-4582 update wrc test cases for allow to use write back cache per tree of datasets
Reviewed by: Steve Peng <steve.peng@nexenta.com>
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
5960 zfs recv should prefetch indirect blocks
5925 zfs receive -o origin=
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
5946 zfs_ioc_space_snaps must check that firstsnap and lastsnap refer to snapshots
5945 zfs_ioc_send_space must ensure that fromsnap refers to a snapshot
Reviewed by: Steven Hartland <killing@multiplay.co.uk>
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Approved by: Gordon Ross <gordon.ross@nexenta.com>
5870 dmu_recv_end_check() leaks origin_head hold if error happens in drc_force branch
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Andrew Stormont <andyjstormont@gmail.com>
Approved by: Dan McDonald <danmcd@omniti.com>
5912 full stream can not be force-received into a dataset if it has a snapshot
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Paul Dagnelie <pcd@delphix.com>
Approved by: Dan McDonald <danmcd@omniti.com>
5809 Blowaway full receive in v1 pool causes kernel panic
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Alex Reece <alex@delphix.com>
Reviewed by: Will Andrews <will@freebsd.org>
Approved by: Gordon Ross <gwr@nexenta.com>
5746 more checksumming in zfs send
Reviewed by: Christopher Siden <christopher.siden@delphix.com>
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Bayard Bell <buffer.g.overflow@gmail.com>
Approved by: Albert Lee <trisk@omniti.com>
5765 add support for estimating send stream size with lzc_send_space when source is a bookmark
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Christopher Siden <christopher.siden@delphix.com>
Reviewed by: Steven Hartland <killing@multiplay.co.uk>
Reviewed by: Bayard Bell <buffer.g.overflow@gmail.com>
Approved by: Albert Lee <trisk@nexenta.com>
5769 Cast 'zfs bad bloc' to ULL for x86
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Paul Dagnelie <paul.dagnelie@delphix.com>
Reviewed by: Richard PALO <richard@NetBSD.org>
Approved by: Dan McDonald <danmcd@omniti.com>
NEX-4476 WRC: Allow to use write back cache per tree of datasets
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
Revert "NEX-4476 WRC: Allow to use write back cache per tree of datasets"
This reverts commit fe97b74444278a6f36fec93179133641296312da.
NEX-4476 WRC: Allow to use write back cache per tree of datasets
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Alex Aizman <alex.aizman@nexenta.com>
NEX-3588 krrp panics in zfs:dmu_recv_end_check+13b () when running zfs tests.
Reviewed by: Alek Pinchuk <alek.pinchuk@nexenta.com>
Reviewed by: Kevin Crowe <kevin.crowe@nexenta.com>
NEX-3558 KRRP Integration
4370 avoid transmitting holes during zfs send
4371 DMU code clean up
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: George Wilson <george.wilson@delphix.com>
Reviewed by: Christopher Siden <christopher.siden@delphix.com>
Reviewed by: Josef 'Jeff' Sipek <jeffpc@josefsipek.net>
Approved by: Garrett D'Amore <garrett@damore.org>
Fixup merge results
re #12619 rb4429 More dp->dp_config_rwlock holds
Bug 10481 - Dry run option in 'zfs send' isn't the same as in NexentaStor 3.1


   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
  24  * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
  25  * Copyright (c) 2014, Joyent, Inc. All rights reserved.
  26  * Copyright 2014 HybridCluster. All rights reserved.

  27  * Copyright 2016 RackTop Systems.
  28  * Copyright (c) 2014 Integros [integros.com]
  29  */
  30 
  31 #include <sys/dmu.h>
  32 #include <sys/dmu_impl.h>
  33 #include <sys/dmu_tx.h>
  34 #include <sys/dbuf.h>
  35 #include <sys/dnode.h>
  36 #include <sys/zfs_context.h>
  37 #include <sys/dmu_objset.h>
  38 #include <sys/dmu_traverse.h>
  39 #include <sys/dsl_dataset.h>
  40 #include <sys/dsl_dir.h>
  41 #include <sys/dsl_prop.h>
  42 #include <sys/dsl_pool.h>
  43 #include <sys/dsl_synctask.h>
  44 #include <sys/zfs_ioctl.h>
  45 #include <sys/zap.h>
  46 #include <sys/zio_checksum.h>
  47 #include <sys/zfs_znode.h>
  48 #include <zfs_fletcher.h>
  49 #include <sys/avl.h>
  50 #include <sys/ddt.h>
  51 #include <sys/zfs_onexit.h>
  52 #include <sys/dmu_send.h>
  53 #include <sys/dsl_destroy.h>
  54 #include <sys/blkptr.h>
  55 #include <sys/dsl_bookmark.h>
  56 #include <sys/zfeature.h>

  57 #include <sys/bqueue.h>
  58 


  59 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
  60 int zfs_send_corrupt_data = B_FALSE;
  61 int zfs_send_queue_length = 16 * 1024 * 1024;
  62 int zfs_recv_queue_length = 16 * 1024 * 1024;
  63 /* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
  64 int zfs_send_set_freerecords_bit = B_TRUE;
  65 
  66 static char *dmu_recv_tag = "dmu_recv_tag";
  67 const char *recv_clone_name = "%recv";
  68 
  69 #define BP_SPAN(datablkszsec, indblkshift, level) \
  70         (((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \
  71         (level) * (indblkshift - SPA_BLKPTRSHIFT)))
  72 
  73 static void byteswap_record(dmu_replay_record_t *drr);
  74 
  75 struct send_thread_arg {
  76         bqueue_t        q;
  77         dsl_dataset_t   *ds;            /* Dataset to traverse */
  78         uint64_t        fromtxg;        /* Traverse from this txg */


  93 
  94 static int
  95 dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
  96 {
  97         dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os);
  98         ssize_t resid; /* have to get resid to get detailed errno */
  99 
 100         /*
 101          * The code does not rely on this (len being a multiple of 8).  We keep
 102          * this assertion because of the corresponding assertion in
 103          * receive_read().  Keeping this assertion ensures that we do not
 104          * inadvertently break backwards compatibility (causing the assertion
 105          * in receive_read() to trigger on old software).
 106          *
 107          * Removing the assertions could be rolled into a new feature that uses
 108          * data that isn't 8-byte aligned; if the assertions were removed, a
 109          * feature flag would have to be added.
 110          */
 111 
 112         ASSERT0(len % 8);

 113 




 114         dsp->dsa_err = vn_rdwr(UIO_WRITE, dsp->dsa_vp,
 115             (caddr_t)buf, len,
 116             0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
 117 






 118         mutex_enter(&ds->ds_sendstream_lock);
 119         *dsp->dsa_off += len;
 120         mutex_exit(&ds->ds_sendstream_lock);
 121 
 122         return (dsp->dsa_err);
 123 }
 124 











 125 /*
 126  * For all record types except BEGIN, fill in the checksum (overlaid in
 127  * drr_u.drr_checksum.drr_checksum).  The checksum verifies everything
 128  * up to the start of the checksum itself.
 129  */
 130 static int
 131 dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len)
 132 {



 133         ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
 134             ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
 135         (void) fletcher_4_incremental_native(dsp->dsa_drr,
 136             offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
 137             &dsp->dsa_zc);
 138         if (dsp->dsa_drr->drr_type == DRR_BEGIN) {
 139                 dsp->dsa_sent_begin = B_TRUE;
 140         } else {
 141                 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u.
 142                     drr_checksum.drr_checksum));
 143                 dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc;
 144         }

 145         if (dsp->dsa_drr->drr_type == DRR_END) {
 146                 dsp->dsa_sent_end = B_TRUE;
 147         }













 148         (void) fletcher_4_incremental_native(&dsp->dsa_drr->
 149             drr_u.drr_checksum.drr_checksum,
 150             sizeof (zio_cksum_t), &dsp->dsa_zc);


 151         if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0)
 152                 return (SET_ERROR(EINTR));
 153         if (payload_len != 0) {
 154                 (void) fletcher_4_incremental_native(payload, payload_len,
 155                     &dsp->dsa_zc);
 156                 if (dump_bytes(dsp, payload, payload_len) != 0)
 157                         return (SET_ERROR(EINTR));
 158         }
 159         return (0);
 160 }
 161 
 162 /*
 163  * Fill in the drr_free struct, or perform aggregation if the previous record is
 164  * also a free record, and the two are adjacent.
 165  *
 166  * Note that we send free records even for a full send, because we want to be
 167  * able to receive a full send as a clone, which requires a list of all the free
 168  * and freeobject records that were generated on the source.
 169  */
 170 static int
 171 dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
 172     uint64_t length)
 173 {
 174         struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free);
 175 
 176         /*


 344 
 345         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 346         dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED;
 347         drrw->drr_object = object;
 348         drrw->drr_offset = offset;
 349         drrw->drr_length = blksz;
 350         drrw->drr_toguid = dsp->dsa_toguid;
 351         drrw->drr_compression = BP_GET_COMPRESS(bp);
 352         drrw->drr_etype = BPE_GET_ETYPE(bp);
 353         drrw->drr_lsize = BPE_GET_LSIZE(bp);
 354         drrw->drr_psize = BPE_GET_PSIZE(bp);
 355 
 356         decode_embedded_bp_compressed(bp, buf);
 357 
 358         if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
 359                 return (EINTR);
 360         return (0);
 361 }
 362 
 363 static int
 364 dump_spill(dmu_sendarg_t *dsp, uint64_t object, int blksz, void *data)

 365 {

 366         struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill);



 367 
 368         if (dsp->dsa_pending_op != PENDING_NONE) {
 369                 if (dump_record(dsp, NULL, 0) != 0)
 370                         return (SET_ERROR(EINTR));
 371                 dsp->dsa_pending_op = PENDING_NONE;
 372         }
 373 
 374         /* write a SPILL record */
 375         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 376         dsp->dsa_drr->drr_type = DRR_SPILL;
 377         drrs->drr_object = object;
 378         drrs->drr_length = blksz;
 379         drrs->drr_toguid = dsp->dsa_toguid;
 380 
 381         if (dump_record(dsp, data, blksz) != 0)
 382                 return (SET_ERROR(EINTR));






























 383         return (0);
 384 }
 385 
 386 static int
 387 dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs)
 388 {
 389         struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects);
 390 
 391         /*
 392          * If there is a pending op, but it's not PENDING_FREEOBJECTS,
 393          * push it out, since free block aggregation can only be done for
 394          * blocks of the same type (i.e., DRR_FREE records can only be
 395          * aggregated with other DRR_FREE records.  DRR_FREEOBJECTS records
 396          * can only be aggregated with other DRR_FREEOBJECTS records.
 397          */
 398         if (dsp->dsa_pending_op != PENDING_NONE &&
 399             dsp->dsa_pending_op != PENDING_FREEOBJECTS) {
 400                 if (dump_record(dsp, NULL, 0) != 0)
 401                         return (SET_ERROR(EINTR));
 402                 dsp->dsa_pending_op = PENDING_NONE;


 619                 int blksz = BP_GET_LSIZE(bp);
 620                 arc_flags_t aflags = ARC_FLAG_WAIT;
 621                 arc_buf_t *abuf;
 622 
 623                 ASSERT0(zb->zb_level);
 624 
 625                 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
 626                     ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
 627                     &aflags, zb) != 0)
 628                         return (SET_ERROR(EIO));
 629 
 630                 dnode_phys_t *blk = abuf->b_data;
 631                 uint64_t dnobj = zb->zb_blkid * (blksz >> DNODE_SHIFT);
 632                 for (int i = 0; i < blksz >> DNODE_SHIFT; i++) {
 633                         err = dump_dnode(dsa, dnobj + i, blk + i);
 634                         if (err != 0)
 635                                 break;
 636                 }
 637                 arc_buf_destroy(abuf, &abuf);
 638         } else if (type == DMU_OT_SA) {
 639                 arc_flags_t aflags = ARC_FLAG_WAIT;
 640                 arc_buf_t *abuf;
 641                 int blksz = BP_GET_LSIZE(bp);
 642 
 643                 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
 644                     ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
 645                     &aflags, zb) != 0)
 646                         return (SET_ERROR(EIO));
 647 
 648                 err = dump_spill(dsa, zb->zb_object, blksz, abuf->b_data);
 649                 arc_buf_destroy(abuf, &abuf);
 650         } else if (backup_do_embed(dsa, bp)) {
 651                 /* it's an embedded level-0 block of a regular object */
 652                 int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
 653                 ASSERT0(zb->zb_level);
 654                 err = dump_write_embedded(dsa, zb->zb_object,
 655                     zb->zb_blkid * blksz, blksz, bp);
 656         } else {
 657                 /* it's a level-0 block of a regular object */
 658                 arc_flags_t aflags = ARC_FLAG_WAIT;
 659                 arc_buf_t *abuf;
 660                 int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
 661                 uint64_t offset;
 662 
 663                 /*
 664                  * If we have large blocks stored on disk but the send flags
 665                  * don't allow us to send large blocks, we split the data from
 666                  * the arc buf into chunks.
 667                  */
 668                 boolean_t split_large_blocks = blksz > SPA_OLD_MAXBLOCKSIZE &&
 669                     !(dsa->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
 670                 /*
 671                  * We should only request compressed data from the ARC if all
 672                  * the following are true:
 673                  *  - stream compression was requested
 674                  *  - we aren't splitting large blocks into smaller chunks
 675                  *  - the data won't need to be byteswapped before sending
 676                  *  - this isn't an embedded block
 677                  *  - this isn't metadata (if receiving on a different endian
 678                  *    system it can be byteswapped more easily)
 679                  */
 680                 boolean_t request_compressed =
 681                     (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
 682                     !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
 683                     !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
 684 
 685                 ASSERT0(zb->zb_level);
 686                 ASSERT(zb->zb_object > dsa->dsa_resume_object ||
 687                     (zb->zb_object == dsa->dsa_resume_object &&
 688                     zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
 689 
 690                 ASSERT0(zb->zb_level);
 691                 ASSERT(zb->zb_object > dsa->dsa_resume_object ||
 692                     (zb->zb_object == dsa->dsa_resume_object &&
 693                     zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
 694 
 695                 ASSERT3U(blksz, ==, BP_GET_LSIZE(bp));
 696 
 697                 enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
 698                 if (request_compressed)
 699                         zioflags |= ZIO_FLAG_RAW;
 700                 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
 701                     ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0) {
 702                         if (zfs_send_corrupt_data) {
 703                                 /* Send a block filled with 0x"zfs badd bloc" */
 704                                 abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA,
 705                                     blksz);
 706                                 uint64_t *ptr;
 707                                 for (ptr = abuf->b_data;
 708                                     (char *)ptr < (char *)abuf->b_data + blksz;
 709                                     ptr++)
 710                                         *ptr = 0x2f5baddb10cULL;
 711                         } else {
 712                                 return (SET_ERROR(EIO));
 713                         }
 714                 }
 715 
 716                 offset = zb->zb_blkid * blksz;
 717 
 718                 if (split_large_blocks) {
 719                         ASSERT3U(arc_get_compression(abuf), ==,
 720                             ZIO_COMPRESS_OFF);
 721                         char *buf = abuf->b_data;
 722                         while (blksz > 0 && err == 0) {
 723                                 int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE);
 724                                 err = dump_write(dsa, type, zb->zb_object,
 725                                     offset, n, n, NULL, buf);
 726                                 offset += n;
 727                                 buf += n;
 728                                 blksz -= n;
 729                         }
 730                 } else {
 731                         err = dump_write(dsa, type, zb->zb_object, offset,
 732                             blksz, arc_buf_size(abuf), bp, abuf->b_data);
 733                 }
 734                 arc_buf_destroy(abuf, &abuf);
 735         }
 736 
 737         ASSERT(err == 0 || err == EINTR);
 738         return (err);
 739 }
 740 
 741 /*
 742  * Pop the new data off the queue, and free the old data.
 743  */
 744 static struct send_block_record *
 745 get_next_record(bqueue_t *bq, struct send_block_record *data)
 746 {
 747         struct send_block_record *tmp = bqueue_dequeue(bq);
 748         kmem_free(data, sizeof (*data));
 749         return (tmp);
 750 }
 751 
 752 /*
 753  * Actually do the bulk of the work in a zfs send.
 754  *
 755  * Note: Releases dp using the specified tag.
 756  */
 757 static int
 758 dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds,
 759     zfs_bookmark_phys_t *ancestor_zb, boolean_t is_clone,
 760     boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
 761     int outfd, uint64_t resumeobj, uint64_t resumeoff,
 762     vnode_t *vp, offset_t *off)
 763 {
 764         objset_t *os;
 765         dmu_replay_record_t *drr;
 766         dmu_sendarg_t *dsp;
 767         int err;
 768         uint64_t fromtxg = 0;
 769         uint64_t featureflags = 0;
 770         struct send_thread_arg to_arg = { 0 };
 771 
 772         err = dmu_objset_from_ds(to_ds, &os);
 773         if (err != 0) {
 774                 dsl_pool_rele(dp, tag);
 775                 return (err);
 776         }
 777 
 778         drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
 779         drr->drr_type = DRR_BEGIN;
 780         drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
 781         DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo,
 782             DMU_SUBSTREAM);


 833         if (ancestor_zb != NULL) {
 834                 drr->drr_u.drr_begin.drr_fromguid =
 835                     ancestor_zb->zbm_guid;
 836                 fromtxg = ancestor_zb->zbm_creation_txg;
 837         }
 838         dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname);
 839         if (!to_ds->ds_is_snapshot) {
 840                 (void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--",
 841                     sizeof (drr->drr_u.drr_begin.drr_toname));
 842         }
 843 
 844         dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP);
 845 
 846         dsp->dsa_drr = drr;
 847         dsp->dsa_vp = vp;
 848         dsp->dsa_outfd = outfd;
 849         dsp->dsa_proc = curproc;
 850         dsp->dsa_os = os;
 851         dsp->dsa_off = off;
 852         dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid;

 853         dsp->dsa_pending_op = PENDING_NONE;
 854         dsp->dsa_featureflags = featureflags;

 855         dsp->dsa_resume_object = resumeobj;
 856         dsp->dsa_resume_offset = resumeoff;
 857 
 858         mutex_enter(&to_ds->ds_sendstream_lock);
 859         list_insert_head(&to_ds->ds_sendstreams, dsp);
 860         mutex_exit(&to_ds->ds_sendstream_lock);
 861 
 862         dsl_dataset_long_hold(to_ds, FTAG);
 863         dsl_pool_rele(dp, tag);
 864 
 865         void *payload = NULL;
 866         size_t payload_len = 0;
 867         if (resumeobj != 0 || resumeoff != 0) {
 868                 dmu_object_info_t to_doi;
 869                 err = dmu_object_info(os, resumeobj, &to_doi);
 870                 if (err != 0)
 871                         goto out;
 872                 SET_BOOKMARK(&to_arg.resume, to_ds->ds_object, resumeobj, 0,
 873                     resumeoff / to_doi.doi_data_block_size);
 874 


 886                 err = dsp->dsa_err;
 887                 goto out;
 888         }
 889 
 890         err = bqueue_init(&to_arg.q, zfs_send_queue_length,
 891             offsetof(struct send_block_record, ln));
 892         to_arg.error_code = 0;
 893         to_arg.cancel = B_FALSE;
 894         to_arg.ds = to_ds;
 895         to_arg.fromtxg = fromtxg;
 896         to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH;
 897         (void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, curproc,
 898             TS_RUN, minclsyspri);
 899 
 900         struct send_block_record *to_data;
 901         to_data = bqueue_dequeue(&to_arg.q);
 902 
 903         while (!to_data->eos_marker && err == 0) {
 904                 err = do_dump(dsp, to_data);
 905                 to_data = get_next_record(&to_arg.q, to_data);
 906                 if (issig(JUSTLOOKING) && issig(FORREAL))
 907                         err = EINTR;
 908         }
 909 
 910         if (err != 0) {
 911                 to_arg.cancel = B_TRUE;
 912                 while (!to_data->eos_marker) {
 913                         to_data = get_next_record(&to_arg.q, to_data);
 914                 }
 915         }
 916         kmem_free(to_data, sizeof (*to_data));
 917 
 918         bqueue_destroy(&to_arg.q);
 919 
 920         if (err == 0 && to_arg.error_code != 0)
 921                 err = to_arg.error_code;
 922 
 923         if (err != 0)
 924                 goto out;
 925 
 926         if (dsp->dsa_pending_op != PENDING_NONE)


 940 
 941         if (dump_record(dsp, NULL, 0) != 0)
 942                 err = dsp->dsa_err;
 943 
 944 out:
 945         mutex_enter(&to_ds->ds_sendstream_lock);
 946         list_remove(&to_ds->ds_sendstreams, dsp);
 947         mutex_exit(&to_ds->ds_sendstream_lock);
 948 
 949         VERIFY(err != 0 || (dsp->dsa_sent_begin && dsp->dsa_sent_end));
 950 
 951         kmem_free(drr, sizeof (dmu_replay_record_t));
 952         kmem_free(dsp, sizeof (dmu_sendarg_t));
 953 
 954         dsl_dataset_long_rele(to_ds, FTAG);
 955 
 956         return (err);
 957 }
 958 
 959 int












 960 dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
 961     boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
 962     int outfd, vnode_t *vp, offset_t *off)
 963 {
 964         dsl_pool_t *dp;
 965         dsl_dataset_t *ds;
 966         dsl_dataset_t *fromds = NULL;
 967         int err;
 968 
 969         err = dsl_pool_hold(pool, FTAG, &dp);
 970         if (err != 0)
 971                 return (err);
 972 
 973         err = dsl_dataset_hold_obj(dp, tosnap, FTAG, &ds);
 974         if (err != 0) {
 975                 dsl_pool_rele(dp, FTAG);
 976                 return (err);
 977         }
 978 
 979         if (fromsnap != 0) {
 980                 zfs_bookmark_phys_t zb;
 981                 boolean_t is_clone;
 982 
 983                 err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds);
 984                 if (err != 0) {
 985                         dsl_dataset_rele(ds, FTAG);
 986                         dsl_pool_rele(dp, FTAG);
 987                         return (err);
 988                 }
 989                 if (!dsl_dataset_is_before(ds, fromds, 0))
 990                         err = SET_ERROR(EXDEV);
 991                 zb.zbm_creation_time =
 992                     dsl_dataset_phys(fromds)->ds_creation_time;
 993                 zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg;
 994                 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
 995                 is_clone = (fromds->ds_dir != ds->ds_dir);
 996                 dsl_dataset_rele(fromds, FTAG);
 997                 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
 998                     embedok, large_block_ok, compressok, outfd, 0, 0, vp, off);

 999         } else {
1000                 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1001                     embedok, large_block_ok, compressok, outfd, 0, 0, vp, off);

1002         }
1003         dsl_dataset_rele(ds, FTAG);
1004         return (err);
1005 }
1006 
1007 int
1008 dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
1009     boolean_t large_block_ok, boolean_t compressok, int outfd,
1010     uint64_t resumeobj, uint64_t resumeoff,
1011     vnode_t *vp, offset_t *off)
1012 {
1013         dsl_pool_t *dp;
1014         dsl_dataset_t *ds;
1015         int err;
1016         boolean_t owned = B_FALSE;
1017 
1018         if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
1019                 return (SET_ERROR(EINVAL));
1020 
1021         err = dsl_pool_hold(tosnap, FTAG, &dp);


1058                         if (err == 0) {
1059                                 if (!dsl_dataset_is_before(ds, fromds, 0))
1060                                         err = SET_ERROR(EXDEV);
1061                                 zb.zbm_creation_time =
1062                                     dsl_dataset_phys(fromds)->ds_creation_time;
1063                                 zb.zbm_creation_txg =
1064                                     dsl_dataset_phys(fromds)->ds_creation_txg;
1065                                 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
1066                                 is_clone = (ds->ds_dir != fromds->ds_dir);
1067                                 dsl_dataset_rele(fromds, FTAG);
1068                         }
1069                 } else {
1070                         err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb);
1071                 }
1072                 if (err != 0) {
1073                         dsl_dataset_rele(ds, FTAG);
1074                         dsl_pool_rele(dp, FTAG);
1075                         return (err);
1076                 }
1077                 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
1078                     embedok, large_block_ok, compressok,
1079                     outfd, resumeobj, resumeoff, vp, off);
1080         } else {
1081                 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1082                     embedok, large_block_ok, compressok,
1083                     outfd, resumeobj, resumeoff, vp, off);
1084         }
1085         if (owned)
1086                 dsl_dataset_disown(ds, FTAG);
1087         else
1088                 dsl_dataset_rele(ds, FTAG);
1089         return (err);
1090 }
1091 
1092 static int
1093 dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t uncompressed,
1094     uint64_t compressed, boolean_t stream_compressed, uint64_t *sizep)
1095 {
1096         int err;
1097         uint64_t size;
1098         /*
1099          * Assume that space (both on-disk and in-stream) is dominated by
1100          * data.  We will adjust for indirect blocks and the copies property,
1101          * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
1102          */
1103         uint64_t recordsize;


1240          */
1241         err = traverse_dataset(ds, from_txg, TRAVERSE_POST,
1242             dmu_calculate_send_traversal, &size);
1243         if (err)
1244                 return (err);
1245 
1246         err = dmu_adjust_send_estimate_for_indirects(ds, size.uncompressed,
1247             size.compressed, stream_compressed, sizep);
1248         return (err);
1249 }
1250 
1251 typedef struct dmu_recv_begin_arg {
1252         const char *drba_origin;
1253         dmu_recv_cookie_t *drba_cookie;
1254         cred_t *drba_cred;
1255         uint64_t drba_snapobj;
1256 } dmu_recv_begin_arg_t;
1257 
1258 static int
1259 recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
1260     uint64_t fromguid)
1261 {
1262         uint64_t val;
1263         int error;
1264         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1265 

1266         /* temporary clone name must not exist */
1267         error = zap_lookup(dp->dp_meta_objset,
1268             dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name,
1269             8, 1, &val);
1270         if (error != ENOENT)
1271                 return (error == 0 ? EBUSY : error);
1272 
























1273         /* new snapshot name must not exist */
1274         error = zap_lookup(dp->dp_meta_objset,
1275             dsl_dataset_phys(ds)->ds_snapnames_zapobj,
1276             drba->drba_cookie->drc_tosnap, 8, 1, &val);
1277         if (error != ENOENT)
1278                 return (error == 0 ? EEXIST : error);
1279 
1280         /*
1281          * Check snapshot limit before receiving. We'll recheck again at the
1282          * end, but might as well abort before receiving if we're already over
1283          * the limit.
1284          *
1285          * Note that we do not check the file system limit with
1286          * dsl_dir_fscount_check because the temporary %clones don't count
1287          * against that limit.
1288          */
1289         error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT,
1290             NULL, drba->drba_cred);
1291         if (error != 0)
1292                 return (error);
1293 
1294         if (fromguid != 0) {
1295                 dsl_dataset_t *snap;
1296                 uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj;
1297 
1298                 /* Find snapshot in this dir that matches fromguid. */


1382         if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
1383             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
1384                 return (SET_ERROR(ENOTSUP));
1385         if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
1386             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
1387                 return (SET_ERROR(ENOTSUP));
1388 
1389         /*
1390          * The receiving code doesn't know how to translate large blocks
1391          * to smaller ones, so the pool must have the LARGE_BLOCKS
1392          * feature enabled if the stream has LARGE_BLOCKS.
1393          */
1394         if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
1395             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
1396                 return (SET_ERROR(ENOTSUP));
1397 
1398         error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1399         if (error == 0) {
1400                 /* target fs already exists; recv into temp clone */
1401 
















1402                 /* Can't recv a clone into an existing fs */
1403                 if (flags & DRR_FLAG_CLONE || drba->drba_origin) {
1404                         dsl_dataset_rele(ds, FTAG);
1405                         return (SET_ERROR(EINVAL));
1406                 }
1407 
1408                 error = recv_begin_check_existing_impl(drba, ds, fromguid);
1409                 dsl_dataset_rele(ds, FTAG);
1410         } else if (error == ENOENT) {
1411                 /* target fs does not exist; must be a full backup or clone */
1412                 char buf[ZFS_MAX_DATASET_NAME_LEN];
1413 
1414                 /*
1415                  * If it's a non-clone incremental, we are missing the
1416                  * target fs, so fail the recv.
1417                  */
1418                 if (fromguid != 0 && !(flags & DRR_FLAG_CLONE ||
1419                     drba->drba_origin))
1420                         return (SET_ERROR(ENOENT));
1421 
1422                 /*
1423                  * If we're receiving a full send as a clone, and it doesn't
1424                  * contain all the necessary free records and freeobject
1425                  * records, reject it.
1426                  */
1427                 if (fromguid == 0 && drba->drba_origin &&
1428                     !(flags & DRR_FLAG_FREERECORDS))
1429                         return (SET_ERROR(EINVAL));
1430 
1431                 /* Open the parent of tofs */
1432                 ASSERT3U(strlen(tofs), <, sizeof (buf));
1433                 (void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1);
1434                 error = dsl_dataset_hold(dp, buf, FTAG, &ds);
1435                 if (error != 0)
1436                         return (error);
1437 
















1438                 /*
1439                  * Check filesystem and snapshot limits before receiving. We'll
1440                  * recheck snapshot limits again at the end (we create the
1441                  * filesystems and increment those counts during begin_sync).
1442                  */
1443                 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1444                     ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred);
1445                 if (error != 0) {
1446                         dsl_dataset_rele(ds, FTAG);
1447                         return (error);
1448                 }
1449 
1450                 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1451                     ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred);
1452                 if (error != 0) {
1453                         dsl_dataset_rele(ds, FTAG);
1454                         return (error);
1455                 }
1456 
1457                 if (drba->drba_origin != NULL) {


1632             tofs, recv_clone_name);
1633 
1634         if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) {
1635                 /* %recv does not exist; continue in tofs */
1636                 error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1637                 if (error != 0)
1638                         return (error);
1639         }
1640 
1641         /* check that ds is marked inconsistent */
1642         if (!DS_IS_INCONSISTENT(ds)) {
1643                 dsl_dataset_rele(ds, FTAG);
1644                 return (SET_ERROR(EINVAL));
1645         }
1646 
1647         /* check that there is resuming data, and that the toguid matches */
1648         if (!dsl_dataset_is_zapified(ds)) {
1649                 dsl_dataset_rele(ds, FTAG);
1650                 return (SET_ERROR(EINVAL));
1651         }
1652         uint64_t val;
1653         error = zap_lookup(dp->dp_meta_objset, ds->ds_object,
1654             DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val);
1655         if (error != 0 || drrb->drr_toguid != val) {
1656                 dsl_dataset_rele(ds, FTAG);
1657                 return (SET_ERROR(EINVAL));
1658         }
1659 
1660         /*
1661          * Check if the receive is still running.  If so, it will be owned.
1662          * Note that nothing else can own the dataset (e.g. after the receive
1663          * fails) because it will be marked inconsistent.
1664          */
1665         if (dsl_dataset_has_owner(ds)) {
1666                 dsl_dataset_rele(ds, FTAG);
1667                 return (SET_ERROR(EBUSY));
1668         }
1669 
1670         /* There should not be any snapshots of this fs yet. */
1671         if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) {
1672                 dsl_dataset_rele(ds, FTAG);


1721         VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &ds));
1722 
1723         dmu_buf_will_dirty(ds->ds_dbuf, tx);
1724         dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT;
1725 
1726         rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
1727         ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds)));
1728         rrw_exit(&ds->ds_bp_rwlock, FTAG);
1729 
1730         drba->drba_cookie->drc_ds = ds;
1731 
1732         spa_history_log_internal_ds(ds, "resume receive", tx, "");
1733 }
1734 
1735 /*
1736  * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin()
1737  * succeeds; otherwise we will leak the holds on the datasets.
1738  */
1739 int
1740 dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
1741     boolean_t force, boolean_t resumable, char *origin, dmu_recv_cookie_t *drc)

1742 {
1743         dmu_recv_begin_arg_t drba = { 0 };
1744 
1745         bzero(drc, sizeof (dmu_recv_cookie_t));
1746         drc->drc_drr_begin = drr_begin;
1747         drc->drc_drrb = &drr_begin->drr_u.drr_begin;
1748         drc->drc_tosnap = tosnap;
1749         drc->drc_tofs = tofs;
1750         drc->drc_force = force;
1751         drc->drc_resumable = resumable;
1752         drc->drc_cred = CRED();
1753 
1754         if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
1755                 drc->drc_byteswap = B_TRUE;



1756                 (void) fletcher_4_incremental_byteswap(drr_begin,
1757                     sizeof (dmu_replay_record_t), &drc->drc_cksum);
1758                 byteswap_record(drr_begin);

1759         } else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) {


1760                 (void) fletcher_4_incremental_native(drr_begin,
1761                     sizeof (dmu_replay_record_t), &drc->drc_cksum);

1762         } else {
1763                 return (SET_ERROR(EINVAL));
1764         }
1765 
1766         drba.drba_origin = origin;
1767         drba.drba_cookie = drc;
1768         drba.drba_cred = CRED();
1769 
1770         if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) &
1771             DMU_BACKUP_FEATURE_RESUMING) {
1772                 return (dsl_sync_task(tofs,
1773                     dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync,
1774                     &drba, 5, ZFS_SPACE_CHECK_NORMAL));
1775         } else  {
1776                 return (dsl_sync_task(tofs,
1777                     dmu_recv_begin_check, dmu_recv_begin_sync,
1778                     &drba, 5, ZFS_SPACE_CHECK_NORMAL));
1779         }
1780 }
1781 


1825 
1826 struct receive_objnode {
1827         list_node_t node;
1828         uint64_t object;
1829 };
1830 
1831 struct receive_arg {
1832         objset_t *os;
1833         vnode_t *vp; /* The vnode to read the stream from */
1834         uint64_t voff; /* The current offset in the stream */
1835         uint64_t bytes_read;
1836         /*
1837          * A record that has had its payload read in, but hasn't yet been handed
1838          * off to the worker thread.
1839          */
1840         struct receive_record_arg *rrd;
1841         /* A record that has had its header read in, but not its payload. */
1842         struct receive_record_arg *next_rrd;
1843         zio_cksum_t cksum;
1844         zio_cksum_t prev_cksum;

1845         int err;
1846         boolean_t byteswap;
1847         /* Sorted list of objects not to issue prefetches for. */
1848         struct objlist ignore_objlist;
1849 };
1850 
1851 typedef struct guid_map_entry {
1852         uint64_t        guid;
1853         dsl_dataset_t   *gme_ds;
1854         avl_node_t      avlnode;
1855 } guid_map_entry_t;
1856 
1857 static int
1858 guid_compare(const void *arg1, const void *arg2)
1859 {
1860         const guid_map_entry_t *gmep1 = arg1;
1861         const guid_map_entry_t *gmep2 = arg2;
1862 
1863         if (gmep1->guid < gmep2->guid)
1864                 return (-1);


1877         while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) {
1878                 dsl_dataset_long_rele(gmep->gme_ds, gmep);
1879                 dsl_dataset_rele(gmep->gme_ds, gmep);
1880                 kmem_free(gmep, sizeof (guid_map_entry_t));
1881         }
1882         avl_destroy(ca);
1883         kmem_free(ca, sizeof (avl_tree_t));
1884 }
1885 
1886 static int
1887 receive_read(struct receive_arg *ra, int len, void *buf)
1888 {
1889         int done = 0;
1890 
1891         /*
1892          * The code doesn't rely on this (lengths being multiples of 8).  See
1893          * comment in dump_bytes.
1894          */
1895         ASSERT0(len % 8);
1896 





1897         while (done < len) {
1898                 ssize_t resid;
1899 
1900                 ra->err = vn_rdwr(UIO_READ, ra->vp,
1901                     (char *)buf + done, len - done,
1902                     ra->voff, UIO_SYSSPACE, FAPPEND,
1903                     RLIM64_INFINITY, CRED(), &resid);
1904 
1905                 if (resid == len - done) {
1906                         /*
1907                          * Note: ECKSUM indicates that the receive
1908                          * was interrupted and can potentially be resumed.
1909                          */
1910                         ra->err = SET_ERROR(ECKSUM);
1911                 }
1912                 ra->voff += len - done - resid;
1913                 done = len - resid;
1914                 if (ra->err != 0)
1915                         return (ra->err);
1916         }





1917 



1918         ra->bytes_read += len;
1919 
1920         ASSERT3U(done, ==, len);
1921         return (0);
1922 }
1923 
1924 static void
1925 byteswap_record(dmu_replay_record_t *drr)
1926 {
1927 #define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
1928 #define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
1929         drr->drr_type = BSWAP_32(drr->drr_type);
1930         drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
1931 
1932         switch (drr->drr_type) {
1933         case DRR_BEGIN:
1934                 DO64(drr_begin.drr_magic);
1935                 DO64(drr_begin.drr_versioninfo);
1936                 DO64(drr_begin.drr_creation_time);
1937                 DO32(drr_begin.drr_type);


2201         if (drrw->drr_object < rwa->last_object ||
2202             (drrw->drr_object == rwa->last_object &&
2203             drrw->drr_offset < rwa->last_offset)) {
2204                 return (SET_ERROR(EINVAL));
2205         }
2206         rwa->last_object = drrw->drr_object;
2207         rwa->last_offset = drrw->drr_offset;
2208 
2209         if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0)
2210                 return (SET_ERROR(EINVAL));
2211 
2212         tx = dmu_tx_create(rwa->os);
2213 
2214         dmu_tx_hold_write(tx, drrw->drr_object,
2215             drrw->drr_offset, drrw->drr_logical_size);
2216         err = dmu_tx_assign(tx, TXG_WAIT);
2217         if (err != 0) {
2218                 dmu_tx_abort(tx);
2219                 return (err);
2220         }

2221         if (rwa->byteswap) {
2222                 dmu_object_byteswap_t byteswap =
2223                     DMU_OT_BYTESWAP(drrw->drr_type);
2224                 dmu_ot_byteswap[byteswap].ob_func(abuf->b_data,
2225                     DRR_WRITE_PAYLOAD_SIZE(drrw));
2226         }
2227 
2228         /* use the bonus buf to look up the dnode in dmu_assign_arcbuf */
2229         dmu_buf_t *bonus;
2230         if (dmu_bonus_hold(rwa->os, drrw->drr_object, FTAG, &bonus) != 0)
2231                 return (SET_ERROR(EINVAL));
2232         dmu_assign_arcbuf(bonus, drrw->drr_offset, abuf, tx);
2233 
2234         /*
2235          * Note: If the receive fails, we want the resume stream to start
2236          * with the same record that we last successfully received (as opposed
2237          * to the next record), so that we can verify that we are
2238          * resuming from the correct location.
2239          */
2240         save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx);


2430 receive_cksum(struct receive_arg *ra, int len, void *buf)
2431 {
2432         if (ra->byteswap) {
2433                 (void) fletcher_4_incremental_byteswap(buf, len, &ra->cksum);
2434         } else {
2435                 (void) fletcher_4_incremental_native(buf, len, &ra->cksum);
2436         }
2437 }
2438 
2439 /*
2440  * Read the payload into a buffer of size len, and update the current record's
2441  * payload field.
2442  * Allocate ra->next_rrd and read the next record's header into
2443  * ra->next_rrd->header.
2444  * Verify checksum of payload and next record.
2445  */
2446 static int
2447 receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf)
2448 {
2449         int err;


2450 
2451         if (len != 0) {
2452                 ASSERT3U(len, <=, SPA_MAXBLOCKSIZE);
2453                 err = receive_read(ra, len, buf);
2454                 if (err != 0)
2455                         return (err);
2456                 receive_cksum(ra, len, buf);
2457 
2458                 /* note: rrd is NULL when reading the begin record's payload */
2459                 if (ra->rrd != NULL) {
2460                         ra->rrd->payload = buf;
2461                         ra->rrd->payload_size = len;
2462                         ra->rrd->bytes_read = ra->bytes_read;
2463                 }
2464         }
2465 
2466         ra->prev_cksum = ra->cksum;
2467 
2468         ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
2469         err = receive_read(ra, sizeof (ra->next_rrd->header),
2470             &ra->next_rrd->header);
2471         ra->next_rrd->bytes_read = ra->bytes_read;
2472         if (err != 0) {
2473                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2474                 ra->next_rrd = NULL;
2475                 return (err);
2476         }
2477         if (ra->next_rrd->header.drr_type == DRR_BEGIN) {
2478                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2479                 ra->next_rrd = NULL;
2480                 return (SET_ERROR(EINVAL));
2481         }
2482 

2483         /*
2484          * Note: checksum is of everything up to but not including the
2485          * checksum itself.
2486          */
2487         ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),

2488             ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
2489         receive_cksum(ra,
2490             offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),

2491             &ra->next_rrd->header);
2492 
2493         zio_cksum_t cksum_orig =
2494             ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2495         zio_cksum_t *cksump =
2496             &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2497 
2498         if (ra->byteswap)
2499                 byteswap_record(&ra->next_rrd->header);
2500 
2501         if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) &&
2502             !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) {
2503                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2504                 ra->next_rrd = NULL;
2505                 return (SET_ERROR(ECKSUM));
2506         }
2507 
2508         receive_cksum(ra, sizeof (cksum_orig), &cksum_orig);

2509 
2510         return (0);
2511 }
2512 
2513 static void
2514 objlist_create(struct objlist *list)
2515 {
2516         list_create(&list->list, sizeof (struct receive_objnode),
2517             offsetof(struct receive_objnode, node));
2518         list->last_lookup = 0;
2519 }
2520 
2521 static void
2522 objlist_destroy(struct objlist *list)
2523 {
2524         for (struct receive_objnode *n = list_remove_head(&list->list);
2525             n != NULL; n = list_remove_head(&list->list)) {
2526                 kmem_free(n, sizeof (*n));
2527         }
2528         list_destroy(&list->list);


2685                 if (err != 0) {
2686                         kmem_free(buf, size);
2687                         return (err);
2688                 }
2689 
2690                 receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset,
2691                     drrwe->drr_length);
2692                 return (err);
2693         }
2694         case DRR_FREE:
2695         {
2696                 /*
2697                  * It might be beneficial to prefetch indirect blocks here, but
2698                  * we don't really have the data to decide for sure.
2699                  */
2700                 err = receive_read_payload_and_next_header(ra, 0, NULL);
2701                 return (err);
2702         }
2703         case DRR_END:
2704         {


2705                 struct drr_end *drre = &ra->rrd->header.drr_u.drr_end;
2706                 if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum))

2707                         return (SET_ERROR(ECKSUM));

2708                 return (0);
2709         }
2710         case DRR_SPILL:
2711         {
2712                 struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill;
2713                 void *buf = kmem_zalloc(drrs->drr_length, KM_SLEEP);
2714                 err = receive_read_payload_and_next_header(ra, drrs->drr_length,
2715                     buf);
2716                 if (err != 0)
2717                         kmem_free(buf, drrs->drr_length);
2718                 return (err);
2719         }
2720         default:
2721                 return (SET_ERROR(EINVAL));
2722         }
2723 }
2724 
2725 /*
2726  * Commit the records to the pool.
2727  */


2853         if (resume_off != val)
2854                 return (SET_ERROR(EINVAL));
2855 
2856         return (0);
2857 }
2858 
2859 /*
2860  * Read in the stream's records, one by one, and apply them to the pool.  There
2861  * are two threads involved; the thread that calls this function will spin up a
2862  * worker thread, read the records off the stream one by one, and issue
2863  * prefetches for any necessary indirect blocks.  It will then push the records
2864  * onto an internal blocking queue.  The worker thread will pull the records off
2865  * the queue, and actually write the data into the DMU.  This way, the worker
2866  * thread doesn't have to wait for reads to complete, since everything it needs
2867  * (the indirect blocks) will be prefetched.
2868  *
2869  * NB: callers *must* call dmu_recv_end() if this succeeds.
2870  */
2871 int
2872 dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp,
2873     int cleanup_fd, uint64_t *action_handlep)
2874 {
2875         int err = 0;
2876         struct receive_arg ra = { 0 };
2877         struct receive_writer_arg rwa = { 0 };
2878         int featureflags;
2879         nvlist_t *begin_nvl = NULL;
2880 
2881         ra.byteswap = drc->drc_byteswap;
2882         ra.cksum = drc->drc_cksum;
2883         ra.vp = vp;
2884         ra.voff = *voffp;

2885 
2886         if (dsl_dataset_is_zapified(drc->drc_ds)) {
2887                 (void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset,
2888                     drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES,
2889                     sizeof (ra.bytes_read), 1, &ra.bytes_read);
2890         }
2891 
2892         objlist_create(&ra.ignore_objlist);
2893 
2894         /* these were verified in dmu_recv_begin */
2895         ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==,
2896             DMU_SUBSTREAM);
2897         ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES);
2898 
2899         /*
2900          * Open the objset we are modifying.
2901          */
2902         VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra.os));
2903 
2904         ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT);


2973         rwa.resumable = drc->drc_resumable;
2974 
2975         (void) thread_create(NULL, 0, receive_writer_thread, &rwa, 0, curproc,
2976             TS_RUN, minclsyspri);
2977         /*
2978          * We're reading rwa.err without locks, which is safe since we are the
2979          * only reader, and the worker thread is the only writer.  It's ok if we
2980          * miss a write for an iteration or two of the loop, since the writer
2981          * thread will keep freeing records we send it until we send it an eos
2982          * marker.
2983          *
2984          * We can leave this loop in 3 ways:  First, if rwa.err is
2985          * non-zero.  In that case, the writer thread will free the rrd we just
2986          * pushed.  Second, if  we're interrupted; in that case, either it's the
2987          * first loop and ra.rrd was never allocated, or it's later, and ra.rrd
2988          * has been handed off to the writer thread who will free it.  Finally,
2989          * if receive_read_record fails or we're at the end of the stream, then
2990          * we free ra.rrd and exit.
2991          */
2992         while (rwa.err == 0) {
2993                 if (issig(JUSTLOOKING) && issig(FORREAL)) {
2994                         err = SET_ERROR(EINTR);
2995                         break;
2996                 }
2997 
2998                 ASSERT3P(ra.rrd, ==, NULL);
2999                 ra.rrd = ra.next_rrd;
3000                 ra.next_rrd = NULL;
3001                 /* Allocates and loads header into ra.next_rrd */
3002                 err = receive_read_record(&ra);
3003 
3004                 if (ra.rrd->header.drr_type == DRR_END || err != 0) {
3005                         kmem_free(ra.rrd, sizeof (*ra.rrd));
3006                         ra.rrd = NULL;
3007                         break;
3008                 }
3009 
3010                 bqueue_enqueue(&rwa.q, ra.rrd,
3011                     sizeof (struct receive_record_arg) + ra.rrd->payload_size);
3012                 ra.rrd = NULL;
3013         }


3039                  * destroy what we created, so we don't leave it in
3040                  * the inconsistent state.
3041                  */
3042                 dmu_recv_cleanup_ds(drc);
3043         }
3044 
3045         *voffp = ra.voff;
3046         objlist_destroy(&ra.ignore_objlist);
3047         return (err);
3048 }
3049 
3050 static int
3051 dmu_recv_end_check(void *arg, dmu_tx_t *tx)
3052 {
3053         dmu_recv_cookie_t *drc = arg;
3054         dsl_pool_t *dp = dmu_tx_pool(tx);
3055         int error;
3056 
3057         ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag);
3058 












3059         if (!drc->drc_newfs) {
3060                 dsl_dataset_t *origin_head;
3061 
3062                 error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head);
3063                 if (error != 0)
3064                         return (error);
3065                 if (drc->drc_force) {
3066                         /*
3067                          * We will destroy any snapshots in tofs (i.e. before
3068                          * origin_head) that are after the origin (which is
3069                          * the snap before drc_ds, because drc_ds can not
3070                          * have any snaps of its own).
3071                          */
3072                         uint64_t obj;
3073 
3074                         obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
3075                         while (obj !=
3076                             dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
3077                                 dsl_dataset_t *snap;
3078                                 error = dsl_dataset_hold_obj(dp, obj, FTAG,


3095                                 return (error);
3096                         }
3097                 }
3098                 error = dsl_dataset_clone_swap_check_impl(drc->drc_ds,
3099                     origin_head, drc->drc_force, drc->drc_owner, tx);
3100                 if (error != 0) {
3101                         dsl_dataset_rele(origin_head, FTAG);
3102                         return (error);
3103                 }
3104                 error = dsl_dataset_snapshot_check_impl(origin_head,
3105                     drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
3106                 dsl_dataset_rele(origin_head, FTAG);
3107                 if (error != 0)
3108                         return (error);
3109 
3110                 error = dsl_destroy_head_check_impl(drc->drc_ds, 1);
3111         } else {
3112                 error = dsl_dataset_snapshot_check_impl(drc->drc_ds,
3113                     drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
3114         }













3115         return (error);
3116 }
3117 
3118 static void
3119 dmu_recv_end_sync(void *arg, dmu_tx_t *tx)
3120 {
3121         dmu_recv_cookie_t *drc = arg;
3122         dsl_pool_t *dp = dmu_tx_pool(tx);
3123 
3124         spa_history_log_internal_ds(drc->drc_ds, "finish receiving",
3125             tx, "snap=%s", drc->drc_tosnap);
3126 
3127         if (!drc->drc_newfs) {
3128                 dsl_dataset_t *origin_head;
3129 
3130                 VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG,
3131                     &origin_head));
3132 
3133                 if (drc->drc_force) {
3134                         /*




   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.

  23  * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
  24  * Copyright (c) 2014, Joyent, Inc. All rights reserved.
  25  * Copyright 2014 HybridCluster. All rights reserved.
  26  * Copyright 2017 Nexenta Systems, Inc. All rights reserved.
  27  * Copyright 2016 RackTop Systems.
  28  * Copyright (c) 2014 Integros [integros.com]
  29  */
  30 
  31 #include <sys/dmu.h>
  32 #include <sys/dmu_impl.h>
  33 #include <sys/dmu_tx.h>
  34 #include <sys/dbuf.h>
  35 #include <sys/dnode.h>
  36 #include <sys/zfs_context.h>
  37 #include <sys/dmu_objset.h>
  38 #include <sys/dmu_traverse.h>
  39 #include <sys/dsl_dataset.h>
  40 #include <sys/dsl_dir.h>
  41 #include <sys/dsl_prop.h>
  42 #include <sys/dsl_pool.h>
  43 #include <sys/dsl_synctask.h>
  44 #include <sys/zfs_ioctl.h>
  45 #include <sys/zap.h>
  46 #include <sys/zio_checksum.h>
  47 #include <sys/zfs_znode.h>
  48 #include <zfs_fletcher.h>
  49 #include <sys/avl.h>
  50 #include <sys/ddt.h>
  51 #include <sys/zfs_onexit.h>
  52 #include <sys/dmu_send.h>
  53 #include <sys/dsl_destroy.h>
  54 #include <sys/blkptr.h>
  55 #include <sys/dsl_bookmark.h>
  56 #include <sys/zfeature.h>
  57 #include <sys/autosnap.h>
  58 #include <sys/bqueue.h>
  59 
  60 #include "zfs_errno.h"
  61 
  62 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
  63 int zfs_send_corrupt_data = B_FALSE;
  64 int zfs_send_queue_length = 16 * 1024 * 1024;
  65 int zfs_recv_queue_length = 16 * 1024 * 1024;
  66 /* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
  67 int zfs_send_set_freerecords_bit = B_TRUE;
  68 
  69 static char *dmu_recv_tag = "dmu_recv_tag";
  70 const char *recv_clone_name = "%recv";
  71 
  72 #define BP_SPAN(datablkszsec, indblkshift, level) \
  73         (((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \
  74         (level) * (indblkshift - SPA_BLKPTRSHIFT)))
  75 
  76 static void byteswap_record(dmu_replay_record_t *drr);
  77 
  78 struct send_thread_arg {
  79         bqueue_t        q;
  80         dsl_dataset_t   *ds;            /* Dataset to traverse */
  81         uint64_t        fromtxg;        /* Traverse from this txg */


  96 
  97 static int
  98 dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
  99 {
 100         dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os);
 101         ssize_t resid; /* have to get resid to get detailed errno */
 102 
 103         /*
 104          * The code does not rely on this (len being a multiple of 8).  We keep
 105          * this assertion because of the corresponding assertion in
 106          * receive_read().  Keeping this assertion ensures that we do not
 107          * inadvertently break backwards compatibility (causing the assertion
 108          * in receive_read() to trigger on old software).
 109          *
 110          * Removing the assertions could be rolled into a new feature that uses
 111          * data that isn't 8-byte aligned; if the assertions were removed, a
 112          * feature flag would have to be added.
 113          */
 114 
 115         ASSERT0(len % 8);
 116         ASSERT(buf != NULL);
 117 
 118         dsp->dsa_err = 0;
 119         if (!dsp->sendsize) {
 120                 /* if vp is NULL, then the send is from krrp */
 121                 if (dsp->dsa_vp != NULL) {
 122                         dsp->dsa_err = vn_rdwr(UIO_WRITE, dsp->dsa_vp,
 123                             (caddr_t)buf, len,
 124                             0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY,
 125                             CRED(), &resid);
 126                 } else {
 127                         ASSERT(dsp->dsa_krrp_task != NULL);
 128                         dsp->dsa_err = dmu_krrp_buffer_write(buf, len,
 129                             dsp->dsa_krrp_task);
 130                 }
 131         }
 132         mutex_enter(&ds->ds_sendstream_lock);
 133         *dsp->dsa_off += len;
 134         mutex_exit(&ds->ds_sendstream_lock);
 135 
 136         return (dsp->dsa_err);
 137 }
 138 
 139 static int
 140 dump_bytes_with_checksum(dmu_sendarg_t *dsp, void *buf, int len)
 141 {
 142         if (!dsp->sendsize && (dsp->dsa_krrp_task == NULL ||
 143             dsp->dsa_krrp_task->buffer_args.force_cksum)) {
 144                 (void) fletcher_4_incremental_native(buf, len, &dsp->dsa_zc);
 145         }
 146 
 147         return (dump_bytes(dsp, buf, len));
 148 }
 149 
 150 /*
 151  * For all record types except BEGIN, fill in the checksum (overlaid in
 152  * drr_u.drr_checksum.drr_checksum).  The checksum verifies everything
 153  * up to the start of the checksum itself.
 154  */
 155 static int
 156 dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len)
 157 {
 158         boolean_t do_checksum = (dsp->dsa_krrp_task == NULL ||
 159             dsp->dsa_krrp_task->buffer_args.force_cksum);
 160 
 161         ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
 162             ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
 163 


 164         if (dsp->dsa_drr->drr_type == DRR_BEGIN) {
 165                 dsp->dsa_sent_begin = B_TRUE;




 166         }
 167 
 168         if (dsp->dsa_drr->drr_type == DRR_END) {
 169                 dsp->dsa_sent_end = B_TRUE;
 170         }
 171 
 172         if (!dsp->sendsize && do_checksum) {
 173                 (void) fletcher_4_incremental_native(dsp->dsa_drr,
 174                     offsetof(dmu_replay_record_t,
 175                     drr_u.drr_checksum.drr_checksum),
 176                     &dsp->dsa_zc);
 177                 if (dsp->dsa_drr->drr_type != DRR_BEGIN) {
 178                         ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u.
 179                             drr_checksum.drr_checksum));
 180                         dsp->dsa_drr->drr_u.drr_checksum.drr_checksum =
 181                             dsp->dsa_zc;
 182                 }
 183 
 184                 (void) fletcher_4_incremental_native(&dsp->dsa_drr->
 185                     drr_u.drr_checksum.drr_checksum,
 186                     sizeof (zio_cksum_t), &dsp->dsa_zc);
 187         }
 188 
 189         if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0)
 190                 return (SET_ERROR(EINTR));
 191         if (payload_len != 0) {
 192                 if (dump_bytes_with_checksum(dsp, payload, payload_len) != 0)


 193                         return (SET_ERROR(EINTR));
 194         }
 195         return (0);
 196 }
 197 
 198 /*
 199  * Fill in the drr_free struct, or perform aggregation if the previous record is
 200  * also a free record, and the two are adjacent.
 201  *
 202  * Note that we send free records even for a full send, because we want to be
 203  * able to receive a full send as a clone, which requires a list of all the free
 204  * and freeobject records that were generated on the source.
 205  */
 206 static int
 207 dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
 208     uint64_t length)
 209 {
 210         struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free);
 211 
 212         /*


 380 
 381         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 382         dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED;
 383         drrw->drr_object = object;
 384         drrw->drr_offset = offset;
 385         drrw->drr_length = blksz;
 386         drrw->drr_toguid = dsp->dsa_toguid;
 387         drrw->drr_compression = BP_GET_COMPRESS(bp);
 388         drrw->drr_etype = BPE_GET_ETYPE(bp);
 389         drrw->drr_lsize = BPE_GET_LSIZE(bp);
 390         drrw->drr_psize = BPE_GET_PSIZE(bp);
 391 
 392         decode_embedded_bp_compressed(bp, buf);
 393 
 394         if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
 395                 return (EINTR);
 396         return (0);
 397 }
 398 
 399 static int
 400 dump_spill(dmu_sendarg_t *dsp, uint64_t object,
 401     const blkptr_t *bp, const zbookmark_phys_t *zb)
 402 {
 403         int rc = 0;
 404         struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill);
 405         enum arc_flags aflags = ARC_FLAG_WAIT;
 406         int blksz = BP_GET_LSIZE(bp);
 407         arc_buf_t *abuf;
 408 
 409         if (dsp->dsa_pending_op != PENDING_NONE) {
 410                 if (dump_record(dsp, NULL, 0) != 0)
 411                         return (SET_ERROR(EINTR));
 412                 dsp->dsa_pending_op = PENDING_NONE;
 413         }
 414 
 415         /* write a SPILL record */
 416         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
 417         dsp->dsa_drr->drr_type = DRR_SPILL;
 418         drrs->drr_object = object;
 419         drrs->drr_length = blksz;
 420         drrs->drr_toguid = dsp->dsa_toguid;
 421 
 422         if (dump_record(dsp, NULL, 0))
 423                 return (SET_ERROR(EINTR));
 424 
 425         /*
 426          * if dsa_krrp task is not NULL, then the send is from krrp and we can
 427          * try to bypass copying data to an intermediate buffer.
 428          */
 429         if (!dsp->sendsize && dsp->dsa_krrp_task != NULL) {
 430                 rc = dmu_krrp_direct_arc_read(dsp->dsa_os->os_spa,
 431                     dsp->dsa_krrp_task, &dsp->dsa_zc, bp);
 432                 /*
 433                  * rc == 0 means that we successfully copy
 434                  * the data directly from ARC to krrp buffer
 435                  * rc != 0 && rc != EINTR means that we cannot
 436                  * zerocopy the data and need to use slow-path
 437                  */
 438                 if (rc == 0 || rc == EINTR)
 439                         return (rc);
 440 
 441                 ASSERT3U(rc, ==, ENODATA);
 442         }
 443 
 444         if (arc_read(NULL, dsp->dsa_os->os_spa, bp, arc_getbuf_func, &abuf,
 445             ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
 446             &aflags, zb) != 0)
 447                 return (SET_ERROR(EIO));
 448 
 449         rc = dump_bytes_with_checksum(dsp, abuf->b_data, blksz);
 450         arc_buf_destroy(abuf, &abuf);
 451         if (rc != 0)
 452                 return (SET_ERROR(EINTR));
 453 
 454         return (0);
 455 }
 456 
 457 static int
 458 dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs)
 459 {
 460         struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects);
 461 
 462         /*
 463          * If there is a pending op, but it's not PENDING_FREEOBJECTS,
 464          * push it out, since free block aggregation can only be done for
 465          * blocks of the same type (i.e., DRR_FREE records can only be
 466          * aggregated with other DRR_FREE records.  DRR_FREEOBJECTS records
 467          * can only be aggregated with other DRR_FREEOBJECTS records.
 468          */
 469         if (dsp->dsa_pending_op != PENDING_NONE &&
 470             dsp->dsa_pending_op != PENDING_FREEOBJECTS) {
 471                 if (dump_record(dsp, NULL, 0) != 0)
 472                         return (SET_ERROR(EINTR));
 473                 dsp->dsa_pending_op = PENDING_NONE;


 690                 int blksz = BP_GET_LSIZE(bp);
 691                 arc_flags_t aflags = ARC_FLAG_WAIT;
 692                 arc_buf_t *abuf;
 693 
 694                 ASSERT0(zb->zb_level);
 695 
 696                 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
 697                     ZIO_PRIORITY_ASYNC_READ, ZIO_FLAG_CANFAIL,
 698                     &aflags, zb) != 0)
 699                         return (SET_ERROR(EIO));
 700 
 701                 dnode_phys_t *blk = abuf->b_data;
 702                 uint64_t dnobj = zb->zb_blkid * (blksz >> DNODE_SHIFT);
 703                 for (int i = 0; i < blksz >> DNODE_SHIFT; i++) {
 704                         err = dump_dnode(dsa, dnobj + i, blk + i);
 705                         if (err != 0)
 706                                 break;
 707                 }
 708                 arc_buf_destroy(abuf, &abuf);
 709         } else if (type == DMU_OT_SA) {
 710                 /*
 711                  * The upstream code has arc_read() call here, but we moved
 712                  * it to dump_spill() since we want to take advantage of
 713                  * zero copy of the buffer if possible
 714                  */
 715                 err = dump_spill(dsa, zb->zb_object, bp, zb);





 716         } else if (backup_do_embed(dsa, bp)) {
 717                 /* it's an embedded level-0 block of a regular object */
 718                 int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
 719                 ASSERT0(zb->zb_level);
 720                 err = dump_write_embedded(dsa, zb->zb_object,
 721                     zb->zb_blkid * blksz, blksz, bp);
 722         } else {
 723                 /* it's a level-0 block of a regular object */
 724                 arc_flags_t aflags = ARC_FLAG_WAIT;
 725                 arc_buf_t *abuf;
 726                 int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
 727                 uint64_t offset;
 728 
 729                 /*
 730                  * If we have large blocks stored on disk but the send flags
 731                  * don't allow us to send large blocks, we split the data from
 732                  * the arc buf into chunks.
 733                  */
 734                 boolean_t split_large_blocks = blksz > SPA_OLD_MAXBLOCKSIZE &&
 735                     !(dsa->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
 736                 /*
 737                  * We should only request compressed data from the ARC if all
 738                  * the following are true:
 739                  *  - stream compression was requested
 740                  *  - we aren't splitting large blocks into smaller chunks
 741                  *  - the data won't need to be byteswapped before sending
 742                  *  - this isn't an embedded block
 743                  *  - this isn't metadata (if receiving on a different endian
 744                  *    system it can be byteswapped more easily)
 745                  */
 746                 boolean_t request_compressed =
 747                     (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
 748                     !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
 749                     !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
 750 
 751                 ASSERT0(zb->zb_level);
 752                 ASSERT(zb->zb_object > dsa->dsa_resume_object ||
 753                     (zb->zb_object == dsa->dsa_resume_object &&
 754                     zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
 755 





 756                 ASSERT3U(blksz, ==, BP_GET_LSIZE(bp));
 757 
 758                 enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
 759                 if (request_compressed)
 760                         zioflags |= ZIO_FLAG_RAW;
 761                 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
 762                     ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0) {
 763                         if (zfs_send_corrupt_data) {
 764                                 /* Send a block filled with 0x"zfs badd bloc" */
 765                                 abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA,
 766                                     blksz);
 767                                 uint64_t *ptr;
 768                                 for (ptr = abuf->b_data;
 769                                     (char *)ptr < (char *)abuf->b_data + blksz;
 770                                     ptr++)
 771                                         *ptr = 0x2f5baddb10cULL;
 772                         } else {
 773                                 return (SET_ERROR(EIO));
 774                         }
 775                 }
 776 
 777                 offset = zb->zb_blkid * blksz;
 778 
 779                 if (split_large_blocks) {
 780                         ASSERT3U(arc_get_compression(abuf), ==,
 781                             ZIO_COMPRESS_OFF);
 782                         char *buf = abuf->b_data;
 783                         while (blksz > 0 && err == 0) {
 784                                 int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE);
 785                                 err = dump_write(dsa, type, zb->zb_object,
 786                                     offset, n, n, NULL, buf);
 787                                 offset += n;

 788                                 blksz -= n;
 789                         }
 790                 } else {
 791                         err = dump_write(dsa, type, zb->zb_object, offset,
 792                             blksz, arc_buf_size(abuf), bp, abuf->b_data);
 793                 }
 794                 arc_buf_destroy(abuf, &abuf);
 795         }
 796 
 797         ASSERT(err == 0 || err == EINTR);
 798         return (err);
 799 }
 800 
 801 /*
 802  * Pop the new data off the queue, and free the old data.
 803  */
 804 static struct send_block_record *
 805 get_next_record(bqueue_t *bq, struct send_block_record *data)
 806 {
 807         struct send_block_record *tmp = bqueue_dequeue(bq);
 808         kmem_free(data, sizeof (*data));
 809         return (tmp);
 810 }
 811 
 812 /*
 813  * Actually do the bulk of the work in a zfs send.
 814  *
 815  * Note: Releases dp using the specified tag.
 816  */
 817 static int
 818 dmu_send_impl_ss(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds,
 819     zfs_bookmark_phys_t *ancestor_zb, boolean_t is_clone,
 820     boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
 821     int outfd, uint64_t resumeobj, uint64_t resumeoff, vnode_t *vp,
 822     offset_t *off, boolean_t sendsize, dmu_krrp_task_t *krrp_task)
 823 {
 824         objset_t *os;
 825         dmu_replay_record_t *drr;
 826         dmu_sendarg_t *dsp;
 827         int err;
 828         uint64_t fromtxg = 0;
 829         uint64_t featureflags = 0;
 830         struct send_thread_arg to_arg = { 0 };
 831 
 832         err = dmu_objset_from_ds(to_ds, &os);
 833         if (err != 0) {
 834                 dsl_pool_rele(dp, tag);
 835                 return (err);
 836         }
 837 
 838         drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
 839         drr->drr_type = DRR_BEGIN;
 840         drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
 841         DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo,
 842             DMU_SUBSTREAM);


 893         if (ancestor_zb != NULL) {
 894                 drr->drr_u.drr_begin.drr_fromguid =
 895                     ancestor_zb->zbm_guid;
 896                 fromtxg = ancestor_zb->zbm_creation_txg;
 897         }
 898         dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname);
 899         if (!to_ds->ds_is_snapshot) {
 900                 (void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--",
 901                     sizeof (drr->drr_u.drr_begin.drr_toname));
 902         }
 903 
 904         dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP);
 905 
 906         dsp->dsa_drr = drr;
 907         dsp->dsa_vp = vp;
 908         dsp->dsa_outfd = outfd;
 909         dsp->dsa_proc = curproc;
 910         dsp->dsa_os = os;
 911         dsp->dsa_off = off;
 912         dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid;
 913         dsp->dsa_krrp_task = krrp_task;
 914         dsp->dsa_pending_op = PENDING_NONE;
 915         dsp->dsa_featureflags = featureflags;
 916         dsp->sendsize = sendsize;
 917         dsp->dsa_resume_object = resumeobj;
 918         dsp->dsa_resume_offset = resumeoff;
 919 
 920         mutex_enter(&to_ds->ds_sendstream_lock);
 921         list_insert_head(&to_ds->ds_sendstreams, dsp);
 922         mutex_exit(&to_ds->ds_sendstream_lock);
 923 
 924         dsl_dataset_long_hold(to_ds, FTAG);
 925         dsl_pool_rele(dp, tag);
 926 
 927         void *payload = NULL;
 928         size_t payload_len = 0;
 929         if (resumeobj != 0 || resumeoff != 0) {
 930                 dmu_object_info_t to_doi;
 931                 err = dmu_object_info(os, resumeobj, &to_doi);
 932                 if (err != 0)
 933                         goto out;
 934                 SET_BOOKMARK(&to_arg.resume, to_ds->ds_object, resumeobj, 0,
 935                     resumeoff / to_doi.doi_data_block_size);
 936 


 948                 err = dsp->dsa_err;
 949                 goto out;
 950         }
 951 
 952         err = bqueue_init(&to_arg.q, zfs_send_queue_length,
 953             offsetof(struct send_block_record, ln));
 954         to_arg.error_code = 0;
 955         to_arg.cancel = B_FALSE;
 956         to_arg.ds = to_ds;
 957         to_arg.fromtxg = fromtxg;
 958         to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH;
 959         (void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, curproc,
 960             TS_RUN, minclsyspri);
 961 
 962         struct send_block_record *to_data;
 963         to_data = bqueue_dequeue(&to_arg.q);
 964 
 965         while (!to_data->eos_marker && err == 0) {
 966                 err = do_dump(dsp, to_data);
 967                 to_data = get_next_record(&to_arg.q, to_data);
 968                 if (vp != NULL && issig(JUSTLOOKING) && issig(FORREAL))
 969                         err = EINTR;
 970         }
 971 
 972         if (err != 0) {
 973                 to_arg.cancel = B_TRUE;
 974                 while (!to_data->eos_marker) {
 975                         to_data = get_next_record(&to_arg.q, to_data);
 976                 }
 977         }
 978         kmem_free(to_data, sizeof (*to_data));
 979 
 980         bqueue_destroy(&to_arg.q);
 981 
 982         if (err == 0 && to_arg.error_code != 0)
 983                 err = to_arg.error_code;
 984 
 985         if (err != 0)
 986                 goto out;
 987 
 988         if (dsp->dsa_pending_op != PENDING_NONE)


1002 
1003         if (dump_record(dsp, NULL, 0) != 0)
1004                 err = dsp->dsa_err;
1005 
1006 out:
1007         mutex_enter(&to_ds->ds_sendstream_lock);
1008         list_remove(&to_ds->ds_sendstreams, dsp);
1009         mutex_exit(&to_ds->ds_sendstream_lock);
1010 
1011         VERIFY(err != 0 || (dsp->dsa_sent_begin && dsp->dsa_sent_end));
1012 
1013         kmem_free(drr, sizeof (dmu_replay_record_t));
1014         kmem_free(dsp, sizeof (dmu_sendarg_t));
1015 
1016         dsl_dataset_long_rele(to_ds, FTAG);
1017 
1018         return (err);
1019 }
1020 
1021 int
1022 dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds,
1023     zfs_bookmark_phys_t *ancestor_zb, boolean_t is_clone, boolean_t embedok,
1024     boolean_t large_block_ok, boolean_t compressok, int outfd,
1025     uint64_t resumeobj, uint64_t resumeoff, vnode_t *vp, offset_t *off,
1026     dmu_krrp_task_t *krrp_task)
1027 {
1028         return (dmu_send_impl_ss(tag, dp, to_ds, ancestor_zb, is_clone,
1029             embedok, large_block_ok, compressok, outfd, resumeobj, resumeoff,
1030             vp, off, B_FALSE, krrp_task));
1031 }
1032 
1033 int
1034 dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
1035     boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
1036     int outfd, vnode_t *vp, offset_t *off, boolean_t sendsize)
1037 {
1038         dsl_pool_t *dp;
1039         dsl_dataset_t *ds;
1040         dsl_dataset_t *fromds = NULL;
1041         int err;
1042 
1043         err = dsl_pool_hold(pool, FTAG, &dp);
1044         if (err != 0)
1045                 return (err);
1046 
1047         err = dsl_dataset_hold_obj(dp, tosnap, FTAG, &ds);
1048         if (err != 0) {
1049                 dsl_pool_rele(dp, FTAG);
1050                 return (err);
1051         }
1052 
1053         if (fromsnap != 0) {
1054                 zfs_bookmark_phys_t zb;
1055                 boolean_t is_clone;
1056 
1057                 err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds);
1058                 if (err != 0) {
1059                         dsl_dataset_rele(ds, FTAG);
1060                         dsl_pool_rele(dp, FTAG);
1061                         return (err);
1062                 }
1063                 if (!dsl_dataset_is_before(ds, fromds, 0))
1064                         err = SET_ERROR(EXDEV);
1065                 zb.zbm_creation_time =
1066                     dsl_dataset_phys(fromds)->ds_creation_time;
1067                 zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg;
1068                 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
1069                 is_clone = (fromds->ds_dir != ds->ds_dir);
1070                 dsl_dataset_rele(fromds, FTAG);
1071                 err = dmu_send_impl_ss(FTAG, dp, ds, &zb, is_clone,
1072                     embedok, large_block_ok, compressok, outfd, 0, 0, vp, off,
1073                         sendsize, NULL);
1074         } else {
1075                 err = dmu_send_impl_ss(FTAG, dp, ds, NULL, B_FALSE,
1076                     embedok, large_block_ok, compressok, outfd, 0, 0, vp, off,
1077                         sendsize, NULL);
1078         }
1079         dsl_dataset_rele(ds, FTAG);
1080         return (err);
1081 }
1082 
1083 int
1084 dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
1085     boolean_t large_block_ok, boolean_t compressok, int outfd,
1086     uint64_t resumeobj, uint64_t resumeoff,
1087     vnode_t *vp, offset_t *off)
1088 {
1089         dsl_pool_t *dp;
1090         dsl_dataset_t *ds;
1091         int err;
1092         boolean_t owned = B_FALSE;
1093 
1094         if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
1095                 return (SET_ERROR(EINVAL));
1096 
1097         err = dsl_pool_hold(tosnap, FTAG, &dp);


1134                         if (err == 0) {
1135                                 if (!dsl_dataset_is_before(ds, fromds, 0))
1136                                         err = SET_ERROR(EXDEV);
1137                                 zb.zbm_creation_time =
1138                                     dsl_dataset_phys(fromds)->ds_creation_time;
1139                                 zb.zbm_creation_txg =
1140                                     dsl_dataset_phys(fromds)->ds_creation_txg;
1141                                 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
1142                                 is_clone = (ds->ds_dir != fromds->ds_dir);
1143                                 dsl_dataset_rele(fromds, FTAG);
1144                         }
1145                 } else {
1146                         err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb);
1147                 }
1148                 if (err != 0) {
1149                         dsl_dataset_rele(ds, FTAG);
1150                         dsl_pool_rele(dp, FTAG);
1151                         return (err);
1152                 }
1153                 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
1154                     embedok, large_block_ok, compressok, outfd,
1155                     resumeobj, resumeoff, vp, off, NULL);
1156         } else {
1157                 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1158                     embedok, large_block_ok, compressok, outfd,
1159                     resumeobj, resumeoff, vp, off, NULL);
1160         }
1161         if (owned)
1162                 dsl_dataset_disown(ds, FTAG);
1163         else
1164                 dsl_dataset_rele(ds, FTAG);
1165         return (err);
1166 }
1167 
1168 static int
1169 dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t uncompressed,
1170     uint64_t compressed, boolean_t stream_compressed, uint64_t *sizep)
1171 {
1172         int err;
1173         uint64_t size;
1174         /*
1175          * Assume that space (both on-disk and in-stream) is dominated by
1176          * data.  We will adjust for indirect blocks and the copies property,
1177          * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
1178          */
1179         uint64_t recordsize;


1316          */
1317         err = traverse_dataset(ds, from_txg, TRAVERSE_POST,
1318             dmu_calculate_send_traversal, &size);
1319         if (err)
1320                 return (err);
1321 
1322         err = dmu_adjust_send_estimate_for_indirects(ds, size.uncompressed,
1323             size.compressed, stream_compressed, sizep);
1324         return (err);
1325 }
1326 
1327 typedef struct dmu_recv_begin_arg {
1328         const char *drba_origin;
1329         dmu_recv_cookie_t *drba_cookie;
1330         cred_t *drba_cred;
1331         uint64_t drba_snapobj;
1332 } dmu_recv_begin_arg_t;
1333 
1334 static int
1335 recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
1336     uint64_t fromguid, dmu_tx_t *tx)
1337 {
1338         uint64_t val;
1339         int error;
1340         dsl_pool_t *dp = ds->ds_dir->dd_pool;
1341 
1342         if (dmu_tx_is_syncing(tx)) {
1343                 /* temporary clone name must not exist */
1344                 error = zap_lookup(dp->dp_meta_objset,
1345                     dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj,
1346                     recv_clone_name, 8, 1, &val);
1347                 if (error == 0) {
1348                         dsl_dataset_t *tds;
1349 
1350                         /* check that if it is currently used */
1351                         error = dsl_dataset_own_obj(dp, val, FTAG, &tds);
1352                         if (!error) {
1353                                 char name[ZFS_MAX_DATASET_NAME_LEN];
1354 
1355                                 dsl_dataset_name(tds, name);
1356                                 dsl_dataset_disown(tds, FTAG);
1357 
1358                                 error = dsl_dataset_hold(dp, name, FTAG, &tds);
1359                                 if (!error) {
1360                                         dsl_destroy_head_sync_impl(tds, tx);
1361                                         dsl_dataset_rele(tds, FTAG);
1362                                         error = ENOENT;
1363                                 }
1364                         } else {
1365                                 error = 0;
1366                         }
1367                 }
1368                 if (error != ENOENT) {
1369                         return (error == 0 ?
1370                             SET_ERROR(EBUSY) : SET_ERROR(error));
1371                 }
1372         }
1373 
1374         /* new snapshot name must not exist */
1375         error = zap_lookup(dp->dp_meta_objset,
1376             dsl_dataset_phys(ds)->ds_snapnames_zapobj,
1377             drba->drba_cookie->drc_tosnap, 8, 1, &val);
1378         if (error != ENOENT)
1379                 return (error == 0 ? SET_ERROR(EEXIST) : SET_ERROR(error));
1380 
1381         /*
1382          * Check snapshot limit before receiving. We'll recheck again at the
1383          * end, but might as well abort before receiving if we're already over
1384          * the limit.
1385          *
1386          * Note that we do not check the file system limit with
1387          * dsl_dir_fscount_check because the temporary %clones don't count
1388          * against that limit.
1389          */
1390         error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT,
1391             NULL, drba->drba_cred);
1392         if (error != 0)
1393                 return (error);
1394 
1395         if (fromguid != 0) {
1396                 dsl_dataset_t *snap;
1397                 uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj;
1398 
1399                 /* Find snapshot in this dir that matches fromguid. */


1483         if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
1484             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
1485                 return (SET_ERROR(ENOTSUP));
1486         if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
1487             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
1488                 return (SET_ERROR(ENOTSUP));
1489 
1490         /*
1491          * The receiving code doesn't know how to translate large blocks
1492          * to smaller ones, so the pool must have the LARGE_BLOCKS
1493          * feature enabled if the stream has LARGE_BLOCKS.
1494          */
1495         if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
1496             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
1497                 return (SET_ERROR(ENOTSUP));
1498 
1499         error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1500         if (error == 0) {
1501                 /* target fs already exists; recv into temp clone */
1502 
1503                 if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_WBC)) {
1504                         objset_t *os = NULL;
1505 
1506                         error = dmu_objset_from_ds(ds, &os);
1507                         if (error) {
1508                                 dsl_dataset_rele(ds, FTAG);
1509                                 return (error);
1510                         }
1511 
1512                         /* Recv is impossible into DS that uses WBC */
1513                         if (os->os_wbc_mode != ZFS_WBC_MODE_OFF) {
1514                                 dsl_dataset_rele(ds, FTAG);
1515                                 return (SET_ERROR(EKZFS_WBCNOTSUP));
1516                         }
1517                 }
1518 
1519                 /* Can't recv a clone into an existing fs */
1520                 if (flags & DRR_FLAG_CLONE || drba->drba_origin) {
1521                         dsl_dataset_rele(ds, FTAG);
1522                         return (SET_ERROR(EINVAL));
1523                 }
1524 
1525                 error = recv_begin_check_existing_impl(drba, ds, fromguid, tx);
1526                 dsl_dataset_rele(ds, FTAG);
1527         } else if (error == ENOENT) {
1528                 /* target fs does not exist; must be a full backup or clone */
1529                 char buf[ZFS_MAX_DATASET_NAME_LEN];
1530 
1531                 /*
1532                  * If it's a non-clone incremental, we are missing the
1533                  * target fs, so fail the recv.
1534                  */
1535                 if (fromguid != 0 && !(flags & DRR_FLAG_CLONE ||
1536                     drba->drba_origin))
1537                         return (SET_ERROR(ENOENT));
1538 
1539                 /*
1540                  * If we're receiving a full send as a clone, and it doesn't
1541                  * contain all the necessary free records and freeobject
1542                  * records, reject it.
1543                  */
1544                 if (fromguid == 0 && drba->drba_origin &&
1545                     !(flags & DRR_FLAG_FREERECORDS))
1546                         return (SET_ERROR(EINVAL));
1547 
1548                 /* Open the parent of tofs */
1549                 ASSERT3U(strlen(tofs), <, sizeof (buf));
1550                 (void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1);
1551                 error = dsl_dataset_hold(dp, buf, FTAG, &ds);
1552                 if (error != 0)
1553                         return (error);
1554 
1555                 if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_WBC)) {
1556                         objset_t *os = NULL;
1557 
1558                         error = dmu_objset_from_ds(ds, &os);
1559                         if (error) {
1560                                 dsl_dataset_rele(ds, FTAG);
1561                                 return (error);
1562                         }
1563 
1564                         /* Recv is impossible into DS that uses WBC */
1565                         if (os->os_wbc_mode != ZFS_WBC_MODE_OFF) {
1566                                 dsl_dataset_rele(ds, FTAG);
1567                                 return (SET_ERROR(EKZFS_WBCNOTSUP));
1568                         }
1569                 }
1570 
1571                 /*
1572                  * Check filesystem and snapshot limits before receiving. We'll
1573                  * recheck snapshot limits again at the end (we create the
1574                  * filesystems and increment those counts during begin_sync).
1575                  */
1576                 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1577                     ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred);
1578                 if (error != 0) {
1579                         dsl_dataset_rele(ds, FTAG);
1580                         return (error);
1581                 }
1582 
1583                 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
1584                     ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred);
1585                 if (error != 0) {
1586                         dsl_dataset_rele(ds, FTAG);
1587                         return (error);
1588                 }
1589 
1590                 if (drba->drba_origin != NULL) {


1765             tofs, recv_clone_name);
1766 
1767         if (dsl_dataset_hold(dp, recvname, FTAG, &ds) != 0) {
1768                 /* %recv does not exist; continue in tofs */
1769                 error = dsl_dataset_hold(dp, tofs, FTAG, &ds);
1770                 if (error != 0)
1771                         return (error);
1772         }
1773 
1774         /* check that ds is marked inconsistent */
1775         if (!DS_IS_INCONSISTENT(ds)) {
1776                 dsl_dataset_rele(ds, FTAG);
1777                 return (SET_ERROR(EINVAL));
1778         }
1779 
1780         /* check that there is resuming data, and that the toguid matches */
1781         if (!dsl_dataset_is_zapified(ds)) {
1782                 dsl_dataset_rele(ds, FTAG);
1783                 return (SET_ERROR(EINVAL));
1784         }
1785         uint64_t val = 0;
1786         error = zap_lookup(dp->dp_meta_objset, ds->ds_object,
1787             DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val);
1788         if (error != 0 || drrb->drr_toguid != val) {
1789                 dsl_dataset_rele(ds, FTAG);
1790                 return (SET_ERROR(EINVAL));
1791         }
1792 
1793         /*
1794          * Check if the receive is still running.  If so, it will be owned.
1795          * Note that nothing else can own the dataset (e.g. after the receive
1796          * fails) because it will be marked inconsistent.
1797          */
1798         if (dsl_dataset_has_owner(ds)) {
1799                 dsl_dataset_rele(ds, FTAG);
1800                 return (SET_ERROR(EBUSY));
1801         }
1802 
1803         /* There should not be any snapshots of this fs yet. */
1804         if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) {
1805                 dsl_dataset_rele(ds, FTAG);


1854         VERIFY0(dsl_dataset_own_obj(dp, dsobj, dmu_recv_tag, &ds));
1855 
1856         dmu_buf_will_dirty(ds->ds_dbuf, tx);
1857         dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT;
1858 
1859         rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
1860         ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds)));
1861         rrw_exit(&ds->ds_bp_rwlock, FTAG);
1862 
1863         drba->drba_cookie->drc_ds = ds;
1864 
1865         spa_history_log_internal_ds(ds, "resume receive", tx, "");
1866 }
1867 
1868 /*
1869  * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin()
1870  * succeeds; otherwise we will leak the holds on the datasets.
1871  */
1872 int
1873 dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
1874     boolean_t force, boolean_t resumable, boolean_t force_cksum,
1875     char *origin, dmu_recv_cookie_t *drc)
1876 {
1877         dmu_recv_begin_arg_t drba = { 0 };
1878 
1879         bzero(drc, sizeof (dmu_recv_cookie_t));
1880         drc->drc_drr_begin = drr_begin;
1881         drc->drc_drrb = &drr_begin->drr_u.drr_begin;
1882         drc->drc_tosnap = tosnap;
1883         drc->drc_tofs = tofs;
1884         drc->drc_force = force;
1885         drc->drc_resumable = resumable;
1886         drc->drc_cred = CRED();
1887 
1888         if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
1889                 drc->drc_byteswap = B_TRUE;
1890 
1891                 /* on-wire checksum can be disabled for krrp */
1892                 if (force_cksum) {
1893                         (void) fletcher_4_incremental_byteswap(drr_begin,
1894                             sizeof (dmu_replay_record_t), &drc->drc_cksum);
1895                         byteswap_record(drr_begin);
1896                 }
1897         } else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) {
1898                 /* on-wire checksum can be disabled for krrp */
1899                 if (force_cksum) {
1900                         (void) fletcher_4_incremental_native(drr_begin,
1901                             sizeof (dmu_replay_record_t), &drc->drc_cksum);
1902                 }
1903         } else {
1904                 return (SET_ERROR(EINVAL));
1905         }
1906 
1907         drba.drba_origin = origin;
1908         drba.drba_cookie = drc;
1909         drba.drba_cred = CRED();
1910 
1911         if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) &
1912             DMU_BACKUP_FEATURE_RESUMING) {
1913                 return (dsl_sync_task(tofs,
1914                     dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync,
1915                     &drba, 5, ZFS_SPACE_CHECK_NORMAL));
1916         } else  {
1917                 return (dsl_sync_task(tofs,
1918                     dmu_recv_begin_check, dmu_recv_begin_sync,
1919                     &drba, 5, ZFS_SPACE_CHECK_NORMAL));
1920         }
1921 }
1922 


1966 
1967 struct receive_objnode {
1968         list_node_t node;
1969         uint64_t object;
1970 };
1971 
1972 struct receive_arg {
1973         objset_t *os;
1974         vnode_t *vp; /* The vnode to read the stream from */
1975         uint64_t voff; /* The current offset in the stream */
1976         uint64_t bytes_read;
1977         /*
1978          * A record that has had its payload read in, but hasn't yet been handed
1979          * off to the worker thread.
1980          */
1981         struct receive_record_arg *rrd;
1982         /* A record that has had its header read in, but not its payload. */
1983         struct receive_record_arg *next_rrd;
1984         zio_cksum_t cksum;
1985         zio_cksum_t prev_cksum;
1986         dmu_krrp_task_t *krrp_task;
1987         int err;
1988         boolean_t byteswap;
1989         /* Sorted list of objects not to issue prefetches for. */
1990         struct objlist ignore_objlist;
1991 };
1992 
1993 typedef struct guid_map_entry {
1994         uint64_t        guid;
1995         dsl_dataset_t   *gme_ds;
1996         avl_node_t      avlnode;
1997 } guid_map_entry_t;
1998 
1999 static int
2000 guid_compare(const void *arg1, const void *arg2)
2001 {
2002         const guid_map_entry_t *gmep1 = arg1;
2003         const guid_map_entry_t *gmep2 = arg2;
2004 
2005         if (gmep1->guid < gmep2->guid)
2006                 return (-1);


2019         while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) {
2020                 dsl_dataset_long_rele(gmep->gme_ds, gmep);
2021                 dsl_dataset_rele(gmep->gme_ds, gmep);
2022                 kmem_free(gmep, sizeof (guid_map_entry_t));
2023         }
2024         avl_destroy(ca);
2025         kmem_free(ca, sizeof (avl_tree_t));
2026 }
2027 
2028 static int
2029 receive_read(struct receive_arg *ra, int len, void *buf)
2030 {
2031         int done = 0;
2032 
2033         /*
2034          * The code doesn't rely on this (lengths being multiples of 8).  See
2035          * comment in dump_bytes.
2036          */
2037         ASSERT0(len % 8);
2038 
2039         /*
2040          * if vp is NULL, then the send is from krrp and we can try to bypass
2041          * copying data to an intermediate buffer.
2042          */
2043         if (ra->vp != NULL) {
2044                 while (done < len) {
2045                         ssize_t resid = 0;
2046 
2047                         ra->err = vn_rdwr(UIO_READ, ra->vp,
2048                             (char *)buf + done, len - done,
2049                             ra->voff, UIO_SYSSPACE, FAPPEND,
2050                             RLIM64_INFINITY, CRED(), &resid);

2051                         if (resid == len - done) {
2052                                 /*
2053                                  * Note: ECKSUM indicates that the receive was
2054                                  * interrupted and can potentially be resumed.
2055                                  */
2056                                 ra->err = SET_ERROR(ECKSUM);
2057                         }
2058                         ra->voff += len - done - resid;
2059                         done = len - resid;
2060                         if (ra->err != 0)
2061                                 return (ra->err);
2062                 }
2063         } else {
2064                 ASSERT(ra->krrp_task != NULL);
2065                 ra->err = dmu_krrp_buffer_read(buf, len, ra->krrp_task);
2066                 if (ra->err != 0)
2067                         return (ra->err);
2068 
2069                 done = len;
2070         }
2071 
2072         ra->bytes_read += len;
2073 
2074         ASSERT3U(done, ==, len);
2075         return (0);
2076 }
2077 
2078 static void
2079 byteswap_record(dmu_replay_record_t *drr)
2080 {
2081 #define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
2082 #define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
2083         drr->drr_type = BSWAP_32(drr->drr_type);
2084         drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
2085 
2086         switch (drr->drr_type) {
2087         case DRR_BEGIN:
2088                 DO64(drr_begin.drr_magic);
2089                 DO64(drr_begin.drr_versioninfo);
2090                 DO64(drr_begin.drr_creation_time);
2091                 DO32(drr_begin.drr_type);


2355         if (drrw->drr_object < rwa->last_object ||
2356             (drrw->drr_object == rwa->last_object &&
2357             drrw->drr_offset < rwa->last_offset)) {
2358                 return (SET_ERROR(EINVAL));
2359         }
2360         rwa->last_object = drrw->drr_object;
2361         rwa->last_offset = drrw->drr_offset;
2362 
2363         if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0)
2364                 return (SET_ERROR(EINVAL));
2365 
2366         tx = dmu_tx_create(rwa->os);
2367 
2368         dmu_tx_hold_write(tx, drrw->drr_object,
2369             drrw->drr_offset, drrw->drr_logical_size);
2370         err = dmu_tx_assign(tx, TXG_WAIT);
2371         if (err != 0) {
2372                 dmu_tx_abort(tx);
2373                 return (err);
2374         }
2375 
2376         if (rwa->byteswap) {
2377                 dmu_object_byteswap_t byteswap =
2378                     DMU_OT_BYTESWAP(drrw->drr_type);
2379                 dmu_ot_byteswap[byteswap].ob_func(abuf->b_data,
2380                     DRR_WRITE_PAYLOAD_SIZE(drrw));
2381         }
2382 
2383         /* use the bonus buf to look up the dnode in dmu_assign_arcbuf */
2384         dmu_buf_t *bonus;
2385         if (dmu_bonus_hold(rwa->os, drrw->drr_object, FTAG, &bonus) != 0)
2386                 return (SET_ERROR(EINVAL));
2387         dmu_assign_arcbuf(bonus, drrw->drr_offset, abuf, tx);
2388 
2389         /*
2390          * Note: If the receive fails, we want the resume stream to start
2391          * with the same record that we last successfully received (as opposed
2392          * to the next record), so that we can verify that we are
2393          * resuming from the correct location.
2394          */
2395         save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx);


2585 receive_cksum(struct receive_arg *ra, int len, void *buf)
2586 {
2587         if (ra->byteswap) {
2588                 (void) fletcher_4_incremental_byteswap(buf, len, &ra->cksum);
2589         } else {
2590                 (void) fletcher_4_incremental_native(buf, len, &ra->cksum);
2591         }
2592 }
2593 
2594 /*
2595  * Read the payload into a buffer of size len, and update the current record's
2596  * payload field.
2597  * Allocate ra->next_rrd and read the next record's header into
2598  * ra->next_rrd->header.
2599  * Verify checksum of payload and next record.
2600  */
2601 static int
2602 receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf)
2603 {
2604         int err;
2605         boolean_t checksum_enable = (ra->krrp_task == NULL ||
2606             ra->krrp_task->buffer_args.force_cksum);
2607 
2608         if (len != 0) {
2609                 ASSERT3U(len, <=, SPA_MAXBLOCKSIZE);
2610                 err = receive_read(ra, len, buf);
2611                 if (err != 0)
2612                         return (err);
2613                 receive_cksum(ra, len, buf);
2614 
2615                 /* note: rrd is NULL when reading the begin record's payload */
2616                 if (ra->rrd != NULL) {
2617                         ra->rrd->payload = buf;
2618                         ra->rrd->payload_size = len;
2619                         ra->rrd->bytes_read = ra->bytes_read;
2620                 }
2621         }
2622 
2623         ra->prev_cksum = ra->cksum;
2624 
2625         ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
2626         err = receive_read(ra, sizeof (ra->next_rrd->header),
2627             &ra->next_rrd->header);
2628         ra->next_rrd->bytes_read = ra->bytes_read;
2629         if (err != 0) {
2630                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2631                 ra->next_rrd = NULL;
2632                 return (err);
2633         }
2634         if (ra->next_rrd->header.drr_type == DRR_BEGIN) {
2635                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2636                 ra->next_rrd = NULL;
2637                 return (SET_ERROR(EINVAL));
2638         }
2639 
2640         if (checksum_enable) {
2641                 /*
2642                  * Note: checksum is of everything up to but not including the
2643                  * checksum itself.
2644                  */
2645                 ASSERT3U(offsetof(dmu_replay_record_t,
2646                     drr_u.drr_checksum.drr_checksum),
2647                     ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
2648                 receive_cksum(ra,
2649                     offsetof(dmu_replay_record_t,
2650                     drr_u.drr_checksum.drr_checksum),
2651                     &ra->next_rrd->header);
2652 
2653                 zio_cksum_t cksum_orig =
2654                     ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2655                 zio_cksum_t *cksump =
2656                     &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
2657 
2658                 if (ra->byteswap)
2659                         byteswap_record(&ra->next_rrd->header);
2660 
2661                 if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) &&
2662                     !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) {
2663                         kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
2664                         ra->next_rrd = NULL;
2665                         return (SET_ERROR(ECKSUM));
2666                 }
2667 
2668                 receive_cksum(ra, sizeof (cksum_orig), &cksum_orig);
2669         }
2670 
2671         return (0);
2672 }
2673 
2674 static void
2675 objlist_create(struct objlist *list)
2676 {
2677         list_create(&list->list, sizeof (struct receive_objnode),
2678             offsetof(struct receive_objnode, node));
2679         list->last_lookup = 0;
2680 }
2681 
2682 static void
2683 objlist_destroy(struct objlist *list)
2684 {
2685         for (struct receive_objnode *n = list_remove_head(&list->list);
2686             n != NULL; n = list_remove_head(&list->list)) {
2687                 kmem_free(n, sizeof (*n));
2688         }
2689         list_destroy(&list->list);


2846                 if (err != 0) {
2847                         kmem_free(buf, size);
2848                         return (err);
2849                 }
2850 
2851                 receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset,
2852                     drrwe->drr_length);
2853                 return (err);
2854         }
2855         case DRR_FREE:
2856         {
2857                 /*
2858                  * It might be beneficial to prefetch indirect blocks here, but
2859                  * we don't really have the data to decide for sure.
2860                  */
2861                 err = receive_read_payload_and_next_header(ra, 0, NULL);
2862                 return (err);
2863         }
2864         case DRR_END:
2865         {
2866                 if (ra->krrp_task == NULL ||
2867                     ra->krrp_task->buffer_args.force_cksum) {
2868                         struct drr_end *drre = &ra->rrd->header.drr_u.drr_end;
2869                         if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum,
2870                             drre->drr_checksum))
2871                                 return (SET_ERROR(ECKSUM));
2872                 }
2873                 return (0);
2874         }
2875         case DRR_SPILL:
2876         {
2877                 struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill;
2878                 void *buf = kmem_zalloc(drrs->drr_length, KM_SLEEP);
2879                 err = receive_read_payload_and_next_header(ra, drrs->drr_length,
2880                     buf);
2881                 if (err != 0)
2882                         kmem_free(buf, drrs->drr_length);
2883                 return (err);
2884         }
2885         default:
2886                 return (SET_ERROR(EINVAL));
2887         }
2888 }
2889 
2890 /*
2891  * Commit the records to the pool.
2892  */


3018         if (resume_off != val)
3019                 return (SET_ERROR(EINVAL));
3020 
3021         return (0);
3022 }
3023 
3024 /*
3025  * Read in the stream's records, one by one, and apply them to the pool.  There
3026  * are two threads involved; the thread that calls this function will spin up a
3027  * worker thread, read the records off the stream one by one, and issue
3028  * prefetches for any necessary indirect blocks.  It will then push the records
3029  * onto an internal blocking queue.  The worker thread will pull the records off
3030  * the queue, and actually write the data into the DMU.  This way, the worker
3031  * thread doesn't have to wait for reads to complete, since everything it needs
3032  * (the indirect blocks) will be prefetched.
3033  *
3034  * NB: callers *must* call dmu_recv_end() if this succeeds.
3035  */
3036 int
3037 dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp,
3038     int cleanup_fd, uint64_t *action_handlep, dmu_krrp_task_t *krrp_task)
3039 {
3040         int err = 0;
3041         struct receive_arg ra = { 0 };
3042         struct receive_writer_arg rwa = { 0 };
3043         int featureflags;
3044         nvlist_t *begin_nvl = NULL;
3045 
3046         ra.byteswap = drc->drc_byteswap;
3047         ra.cksum = drc->drc_cksum;
3048         ra.vp = vp;
3049         ra.voff = *voffp;
3050         ra.krrp_task = krrp_task;
3051 
3052         if (dsl_dataset_is_zapified(drc->drc_ds)) {
3053                 (void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset,
3054                     drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES,
3055                     sizeof (ra.bytes_read), 1, &ra.bytes_read);
3056         }
3057 
3058         objlist_create(&ra.ignore_objlist);
3059 
3060         /* these were verified in dmu_recv_begin */
3061         ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==,
3062             DMU_SUBSTREAM);
3063         ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES);
3064 
3065         /*
3066          * Open the objset we are modifying.
3067          */
3068         VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra.os));
3069 
3070         ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT);


3139         rwa.resumable = drc->drc_resumable;
3140 
3141         (void) thread_create(NULL, 0, receive_writer_thread, &rwa, 0, curproc,
3142             TS_RUN, minclsyspri);
3143         /*
3144          * We're reading rwa.err without locks, which is safe since we are the
3145          * only reader, and the worker thread is the only writer.  It's ok if we
3146          * miss a write for an iteration or two of the loop, since the writer
3147          * thread will keep freeing records we send it until we send it an eos
3148          * marker.
3149          *
3150          * We can leave this loop in 3 ways:  First, if rwa.err is
3151          * non-zero.  In that case, the writer thread will free the rrd we just
3152          * pushed.  Second, if  we're interrupted; in that case, either it's the
3153          * first loop and ra.rrd was never allocated, or it's later, and ra.rrd
3154          * has been handed off to the writer thread who will free it.  Finally,
3155          * if receive_read_record fails or we're at the end of the stream, then
3156          * we free ra.rrd and exit.
3157          */
3158         while (rwa.err == 0) {
3159                 if (vp && issig(JUSTLOOKING) && issig(FORREAL)) {
3160                         err = SET_ERROR(EINTR);
3161                         break;
3162                 }
3163 
3164                 ASSERT3P(ra.rrd, ==, NULL);
3165                 ra.rrd = ra.next_rrd;
3166                 ra.next_rrd = NULL;
3167                 /* Allocates and loads header into ra.next_rrd */
3168                 err = receive_read_record(&ra);
3169 
3170                 if (ra.rrd->header.drr_type == DRR_END || err != 0) {
3171                         kmem_free(ra.rrd, sizeof (*ra.rrd));
3172                         ra.rrd = NULL;
3173                         break;
3174                 }
3175 
3176                 bqueue_enqueue(&rwa.q, ra.rrd,
3177                     sizeof (struct receive_record_arg) + ra.rrd->payload_size);
3178                 ra.rrd = NULL;
3179         }


3205                  * destroy what we created, so we don't leave it in
3206                  * the inconsistent state.
3207                  */
3208                 dmu_recv_cleanup_ds(drc);
3209         }
3210 
3211         *voffp = ra.voff;
3212         objlist_destroy(&ra.ignore_objlist);
3213         return (err);
3214 }
3215 
3216 static int
3217 dmu_recv_end_check(void *arg, dmu_tx_t *tx)
3218 {
3219         dmu_recv_cookie_t *drc = arg;
3220         dsl_pool_t *dp = dmu_tx_pool(tx);
3221         int error;
3222 
3223         ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag);
3224 
3225         if (spa_feature_is_active(dp->dp_spa, SPA_FEATURE_WBC)) {
3226                 objset_t *os = NULL;
3227 
3228                 error  = dmu_objset_from_ds(drc->drc_ds, &os);
3229                 if (error)
3230                         return (error);
3231 
3232                 /* Recv is impossible into DS that uses WBC */
3233                 if (os->os_wbc_mode != ZFS_WBC_MODE_OFF)
3234                         return (SET_ERROR(EKZFS_WBCNOTSUP));
3235         }
3236 
3237         if (!drc->drc_newfs) {
3238                 dsl_dataset_t *origin_head;
3239 
3240                 error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head);
3241                 if (error != 0)
3242                         return (error);
3243                 if (drc->drc_force) {
3244                         /*
3245                          * We will destroy any snapshots in tofs (i.e. before
3246                          * origin_head) that are after the origin (which is
3247                          * the snap before drc_ds, because drc_ds can not
3248                          * have any snaps of its own).
3249                          */
3250                         uint64_t obj;
3251 
3252                         obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
3253                         while (obj !=
3254                             dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
3255                                 dsl_dataset_t *snap;
3256                                 error = dsl_dataset_hold_obj(dp, obj, FTAG,


3273                                 return (error);
3274                         }
3275                 }
3276                 error = dsl_dataset_clone_swap_check_impl(drc->drc_ds,
3277                     origin_head, drc->drc_force, drc->drc_owner, tx);
3278                 if (error != 0) {
3279                         dsl_dataset_rele(origin_head, FTAG);
3280                         return (error);
3281                 }
3282                 error = dsl_dataset_snapshot_check_impl(origin_head,
3283                     drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
3284                 dsl_dataset_rele(origin_head, FTAG);
3285                 if (error != 0)
3286                         return (error);
3287 
3288                 error = dsl_destroy_head_check_impl(drc->drc_ds, 1);
3289         } else {
3290                 error = dsl_dataset_snapshot_check_impl(drc->drc_ds,
3291                     drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
3292         }
3293 
3294         if (dmu_tx_is_syncing(tx) && drc->drc_krrp_task != NULL) {
3295                 const char *token =
3296                     drc->drc_krrp_task->buffer_args.to_ds;
3297                 const char *cookie = drc->drc_krrp_task->cookie;
3298                 dsl_pool_t *dp = tx->tx_pool;
3299 
3300                 if (*token != '\0') {
3301                         error = zap_update(dp->dp_meta_objset,
3302                             DMU_POOL_DIRECTORY_OBJECT, token, 1,
3303                             strlen(cookie) + 1, cookie, tx);
3304                 }
3305         }
3306         return (error);
3307 }
3308 
3309 static void
3310 dmu_recv_end_sync(void *arg, dmu_tx_t *tx)
3311 {
3312         dmu_recv_cookie_t *drc = arg;
3313         dsl_pool_t *dp = dmu_tx_pool(tx);
3314 
3315         spa_history_log_internal_ds(drc->drc_ds, "finish receiving",
3316             tx, "snap=%s", drc->drc_tosnap);
3317 
3318         if (!drc->drc_newfs) {
3319                 dsl_dataset_t *origin_head;
3320 
3321                 VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG,
3322                     &origin_head));
3323 
3324                 if (drc->drc_force) {
3325                         /*