1 /*
   2  * This file and its contents are supplied under the terms of the
   3  * Common Development and Distribution License ("CDDL"), version 1.0.
   4  * You may only use this file in accordance with the terms of version
   5  * 1.0 of the CDDL.
   6  *
   7  * A full copy of the text of the CDDL should have accompanied this
   8  * source.  A copy of the CDDL is also available via the Internet at
   9  * http://www.illumos.org/license/CDDL.
  10  */
  11 
  12 /*
  13  * Copyright 2017 Nexenta Systems, Inc.  All rights reserved.
  14  */
  15 #include <sys/autosnap.h>
  16 #include <sys/dmu_objset.h>
  17 #include <sys/dmu_send.h>
  18 #include <sys/dmu_tx.h>
  19 #include <sys/dsl_dir.h>
  20 #include <sys/dsl_pool.h>
  21 #include <sys/dsl_prop.h>
  22 #include <sys/spa.h>
  23 #include <zfs_fletcher.h>
  24 #include <sys/zap.h>
  25 
  26 #include <zfs_sendrecv.h>
  27 
  28 #define STRING_PROP_EL_SIZE 1
  29 #define UINT64_PROP_EL_SIZE 8
  30 
  31 #define RECV_BUFFER_SIZE (1 << 20)
  32 
  33 extern int wbc_check_dataset(const char *name);
  34 
  35 int zfs_send_timeout = 5;
  36 uint64_t krrp_debug = 0;
  37 
  38 static void dmu_krrp_work_thread(void *arg);
  39 static void dmu_set_send_recv_error(void *krrp_task_void, int err);
  40 static int dmu_krrp_get_buffer(void *krrp_task_void);
  41 static int dmu_krrp_put_buffer(void *krrp_task_void);
  42 static int dmu_krrp_validate_resume_info(nvlist_t *resume_info);
  43 
  44 /* Used by zfs_lookup_origin_snapshot() */
  45 typedef struct {
  46         char *origin_name;
  47         uint64_t guid;
  48 } zfs_los_cb_arg_t;
  49 
  50 /* An element of snapshots AVL-tree of zfs_ds_node_t */
  51 typedef struct {
  52         char name[ZFS_MAX_DATASET_NAME_LEN];
  53         uint64_t txg;
  54         uint64_t guid;
  55         dsl_dataset_t *ds;
  56         avl_node_t avl_node;
  57         boolean_t origin;
  58 } zfs_snap_avl_node_t;
  59 
  60 typedef struct zfs_ds_node zfs_ds_node_t;
  61 struct zfs_ds_node {
  62         char name[ZFS_MAX_DATASET_NAME_LEN];
  63         char origin_name[ZFS_MAX_DATASET_NAME_LEN];
  64         uint64_t origin_guid;
  65         uint64_t creation_txg;
  66         boolean_t is_root;
  67         boolean_t is_clone;
  68 
  69         zfs_ds_node_t *origin;
  70         dsl_dataset_t *ds;
  71 
  72         list_node_t list_node;
  73         avl_node_t avl_node;
  74 
  75         avl_tree_t snapshots;
  76 };
  77 
  78 typedef struct {
  79         list_t *datasets;
  80         avl_tree_t clones_avl;
  81         void *owner;
  82         uint64_t root_ds_object;
  83 } zfs_collect_cb_arg_t;
  84 
  85 
  86 /*
  87  * Stream is a sequence of snapshots considered to be related
  88  * init/fini initialize and deinitialize structures which are
  89  * persistent for a stream.
  90  * Here we initialize a work-thread and all required locks.
  91  * The work-thread is used to execute stream-tasks, that are
  92  * used to process one ZFS-stream.
  93  */
  94 void *
  95 dmu_krrp_stream_init()
  96 {
  97         dmu_krrp_stream_t *stream =
  98             kmem_zalloc(sizeof (dmu_krrp_stream_t), KM_SLEEP);
  99 
 100         mutex_init(&stream->mtx, NULL, MUTEX_DEFAULT, NULL);
 101         cv_init(&stream->cv, NULL, CV_DEFAULT, NULL);
 102 
 103         mutex_enter(&stream->mtx);
 104         stream->work_thread = thread_create(NULL, 32 << 10,
 105             dmu_krrp_work_thread, stream, 0, &p0, TS_RUN, minclsyspri);
 106 
 107         while (!stream->running)
 108                 cv_wait(&stream->cv, &stream->mtx);
 109 
 110         mutex_exit(&stream->mtx);
 111 
 112         return (stream);
 113 }
 114 
 115 void
 116 dmu_krrp_stream_fini(void *handler)
 117 {
 118         dmu_krrp_stream_t *stream = handler;
 119 
 120         if (stream == NULL)
 121                 return;
 122 
 123         mutex_enter(&stream->mtx);
 124         stream->running = B_FALSE;
 125         cv_broadcast(&stream->cv);
 126         while (stream->work_thread != NULL)
 127                 cv_wait(&stream->cv, &stream->mtx);
 128 
 129         mutex_exit(&stream->mtx);
 130 
 131         mutex_destroy(&stream->mtx);
 132         cv_destroy(&stream->cv);
 133         kmem_free(stream, sizeof (dmu_krrp_stream_t));
 134 }
 135 
 136 /*
 137  * Work-thread executes stream-tasks.
 138  */
 139 static void
 140 dmu_krrp_work_thread(void *arg)
 141 {
 142         dmu_krrp_stream_t *stream = arg;
 143         dmu_krrp_task_t *task;
 144         void (*task_executor)(void *);
 145 
 146         mutex_enter(&stream->mtx);
 147         stream->running = B_TRUE;
 148         cv_broadcast(&stream->cv);
 149 
 150         while (stream->running) {
 151                 if (stream->task == NULL) {
 152                         cv_wait(&stream->cv, &stream->mtx);
 153                         continue;
 154                 }
 155 
 156                 ASSERT(stream->task_executor != NULL);
 157 
 158                 task = stream->task;
 159                 task_executor = stream->task_executor;
 160                 stream->task = NULL;
 161                 stream->task_executor = NULL;
 162 
 163                 mutex_exit(&stream->mtx);
 164 
 165                 task_executor(task);
 166 
 167                 mutex_enter(&stream->mtx);
 168         }
 169 
 170         stream->work_thread = NULL;
 171         cv_broadcast(&stream->cv);
 172         mutex_exit(&stream->mtx);
 173         thread_exit();
 174 }
 175 
 176 /*
 177  * Arc bypass is supposed to reduce amount of copying inside memory
 178  * Here os the main callback for krrp usage of arc bypass
 179  */
 180 int
 181 dmu_krrp_arc_bypass(void *buf, int len, void *arg)
 182 {
 183         dmu_krrp_arc_bypass_t *bypass = arg;
 184         dmu_krrp_task_t *task = bypass->krrp_task;
 185         kreplication_zfs_args_t *buffer_args = &task->buffer_args;
 186 
 187         if (buffer_args->mem_check_cb != NULL) {
 188                 /*
 189                  * ARC holds the target buffer while
 190                  * we read it, so to exclude deadlock need
 191                  * to be sure that we have enough memory to
 192                  * completely read the buffer without waiting
 193                  * for free of required memory space
 194                  */
 195                 boolean_t zero_copy_ready =
 196                     buffer_args->mem_check_cb(len,
 197                     buffer_args->mem_check_cb_arg);
 198                 if (!zero_copy_ready)
 199                         return (ENODATA);
 200         }
 201 
 202         if (buffer_args->force_cksum)
 203                 (void) fletcher_4_incremental_native(buf, len, bypass->zc);
 204         DTRACE_PROBE(arc_bypass_send);
 205         return (bypass->cb(buf, len, task));
 206 }
 207 
 208 /*
 209  * KRRP-SR-INV
 210  * Functions used in send/recv functions to pass data to the KRRP transport
 211  */
 212 int
 213 dmu_krrp_buffer_write(void *buf, int len,
 214     dmu_krrp_task_t *krrp_task)
 215 {
 216         int count = 0;
 217         int err = 0;
 218 
 219         while ((!err) && (count < len)) {
 220                 if (krrp_task->buffer_state == SBS_USED) {
 221                         kreplication_buffer_t *buffer = krrp_task->buffer;
 222                         size_t buf_rem = buffer->buffer_size -
 223                             buffer->data_size;
 224                         size_t rem = len - count;
 225                         size_t size = MIN(rem, buf_rem);
 226 
 227                         (void) memcpy((char *)buffer->data + buffer->data_size,
 228                             (char *)buf + count, size);
 229                         count += size;
 230                         buffer->data_size += size;
 231 
 232                         if (buffer->data_size == buffer->buffer_size) {
 233                                 krrp_task->buffer = buffer->next;
 234                                 if (!krrp_task->buffer) {
 235                                         err = dmu_krrp_put_buffer(
 236                                             krrp_task);
 237                                 }
 238                         }
 239                 } else {
 240                         err = dmu_krrp_get_buffer(krrp_task);
 241                 }
 242         }
 243 
 244         return (err);
 245 }
 246 
 247 int
 248 dmu_krrp_buffer_read(void *buf, int len,
 249     dmu_krrp_task_t *krrp_task)
 250 {
 251         int done = 0;
 252         int err = 0;
 253 
 254         while (!err && (done < len)) {
 255                 if (krrp_task->buffer_state == SBS_USED) {
 256                         kreplication_buffer_t *buffer = krrp_task->buffer;
 257                         size_t rem = len - done;
 258                         size_t buf_rem = buffer->data_size -
 259                             krrp_task->buffer_bytes_read;
 260                         size_t size = MIN(rem, buf_rem);
 261 
 262                         (void) memcpy((char *)buf + done,
 263                             (char *)buffer->data +
 264                             krrp_task->buffer_bytes_read, size);
 265                         krrp_task->buffer_bytes_read += size;
 266                         done += size;
 267                         krrp_task->is_read = B_TRUE;
 268 
 269                         if (krrp_task->buffer_bytes_read ==
 270                             buffer->data_size) {
 271                                 krrp_task->buffer = buffer->next;
 272                                 krrp_task->buffer_bytes_read = 0;
 273                                 if (!krrp_task->buffer) {
 274                                         err = dmu_krrp_put_buffer(
 275                                             krrp_task);
 276                                 }
 277                         }
 278                 } else {
 279                         err = dmu_krrp_get_buffer(krrp_task);
 280                 }
 281         }
 282 
 283         return (err);
 284 }
 285 
 286 /*
 287  * KRRP-SEND routines
 288  */
 289 
 290 /*
 291  * The common function that is called from
 292  * zfs_collect_snap_props and zfs_collect_fs_props
 293  * iterates over the given zap-object and adds zfs props
 294  * to the resulting nvlist
 295  */
 296 static int
 297 zfs_collect_props(objset_t *mos, uint64_t zapobj, nvlist_t *props)
 298 {
 299         int err = 0;
 300         zap_cursor_t zc;
 301         zap_attribute_t za;
 302 
 303         ASSERT(nvlist_empty(props));
 304 
 305         zap_cursor_init(&zc, mos, zapobj);
 306 
 307         /* walk over properties' zap */
 308         while (zap_cursor_retrieve(&zc, &za) == 0) {
 309                 uint64_t cnt, el;
 310                 zfs_prop_t prop;
 311                 const char *suffix, *prop_name;
 312                 char buf[ZAP_MAXNAMELEN];
 313 
 314                 suffix = strchr(za.za_name, '$');
 315                 prop_name = za.za_name;
 316                 if (suffix != NULL) {
 317                         char *valstr;
 318 
 319                         /*
 320                          * The following logic is similar to
 321                          * dsl_prop_get_all_impl()
 322                          * Skip props that have:
 323                          * - suffix ZPROP_INHERIT_SUFFIX
 324                          * - all unknown suffixes to be backward compatible
 325                          */
 326                         if (strcmp(suffix, ZPROP_INHERIT_SUFFIX) == 0 ||
 327                             strcmp(suffix, ZPROP_RECVD_SUFFIX) != 0) {
 328                                 zap_cursor_advance(&zc);
 329                                 continue;
 330                         }
 331 
 332                         (void) strncpy(buf, za.za_name, (suffix - za.za_name));
 333                         buf[suffix - za.za_name] = '\0';
 334                         prop_name = buf;
 335 
 336                         /* Skip if locally overridden. */
 337                         err = zap_contains(mos, zapobj, prop_name);
 338                         if (err == 0) {
 339                                 zap_cursor_advance(&zc);
 340                                 continue;
 341                         }
 342 
 343                         if (err != ENOENT)
 344                                 break;
 345 
 346                         /* Skip if explicitly inherited. */
 347                         valstr = kmem_asprintf("%s%s", prop_name,
 348                             ZPROP_INHERIT_SUFFIX);
 349                         err = zap_contains(mos, zapobj, valstr);
 350                         strfree(valstr);
 351                         if (err == 0) {
 352                                 zap_cursor_advance(&zc);
 353                                 continue;
 354                         }
 355 
 356                         if (err != ENOENT)
 357                                 break;
 358 
 359                         /*
 360                          * zero out to make sure ENOENT is not returned
 361                          * if the loop breaks in this iteration
 362                          */
 363                         err = 0;
 364                 }
 365 
 366                 prop = zfs_name_to_prop(prop_name);
 367 
 368                 /*
 369                  * This property make sense only to this dataset,
 370                  * so no reasons to include it into stream
 371                  */
 372                 if (prop == ZFS_PROP_WBC_MODE) {
 373                         zap_cursor_advance(&zc);
 374                         continue;
 375                 }
 376 
 377                 (void) zap_length(mos, zapobj, za.za_name, &el, &cnt);
 378 
 379                 if (el == STRING_PROP_EL_SIZE) {
 380                         char val[ZAP_MAXVALUELEN];
 381 
 382                         err = zap_lookup(mos, zapobj, za.za_name,
 383                             STRING_PROP_EL_SIZE, cnt, val);
 384                         if (err != 0) {
 385                                 cmn_err(CE_WARN,
 386                                     "Error while looking up a prop"
 387                                     "zap : %d", err);
 388                                 break;
 389                         }
 390 
 391                         fnvlist_add_string(props, prop_name, val);
 392                 } else if (el == UINT64_PROP_EL_SIZE) {
 393                         fnvlist_add_uint64(props, prop_name,
 394                             za.za_first_integer);
 395                 }
 396 
 397                 zap_cursor_advance(&zc);
 398         }
 399 
 400         zap_cursor_fini(&zc);
 401 
 402         return (err);
 403 }
 404 
 405 static int
 406 zfs_collect_snap_props(dsl_dataset_t *snap_ds, nvlist_t **nvsnaps_props)
 407 {
 408         int err;
 409         nvlist_t *props;
 410         uint64_t zapobj;
 411         objset_t *mos;
 412 
 413         ASSERT(nvsnaps_props != NULL && *nvsnaps_props == NULL);
 414         ASSERT(dsl_dataset_long_held(snap_ds));
 415         ASSERT(snap_ds->ds_is_snapshot);
 416 
 417         props = fnvlist_alloc();
 418         mos = snap_ds->ds_dir->dd_pool->dp_meta_objset;
 419         zapobj = dsl_dataset_phys(snap_ds)->ds_props_obj;
 420         err = zfs_collect_props(mos, zapobj, props);
 421         if (err == 0)
 422                 *nvsnaps_props = props;
 423         else
 424                 fnvlist_free(props);
 425 
 426         return (err);
 427 }
 428 
 429 static int
 430 zfs_collect_fs_props(dsl_dataset_t *fs_ds, nvlist_t *nvfs)
 431 {
 432         int err = 0;
 433         uint64_t zapobj;
 434         objset_t *mos;
 435         nvlist_t *nvfsprops;
 436 
 437         ASSERT(dsl_dataset_long_held(fs_ds));
 438 
 439         nvfsprops = fnvlist_alloc();
 440         mos = fs_ds->ds_dir->dd_pool->dp_meta_objset;
 441         zapobj = dsl_dir_phys(fs_ds->ds_dir)->dd_props_zapobj;
 442         err = zfs_collect_props(mos, zapobj, nvfsprops);
 443         if (err == 0)
 444                 fnvlist_add_nvlist(nvfs, "props", nvfsprops);
 445 
 446         fnvlist_free(nvfsprops);
 447 
 448         return (err);
 449 }
 450 
 451 /* AVL compare function for snapshots */
 452 static int
 453 zfs_snapshot_txg_compare(const void *arg1, const void *arg2)
 454 {
 455         const zfs_snap_avl_node_t *s1 = arg1;
 456         const zfs_snap_avl_node_t *s2 = arg2;
 457 
 458         if (s1->txg > s2->txg) {
 459                 return (+1);
 460         } else if (s1->txg < s2->txg) {
 461                 return (-1);
 462         } else {
 463                 return (0);
 464         }
 465 }
 466 
 467 static zfs_snap_avl_node_t *
 468 zfs_construct_snap_node(dsl_dataset_t *snap_ds, char *full_snap_name)
 469 {
 470         zfs_snap_avl_node_t *snap_el;
 471 
 472         snap_el = kmem_zalloc(sizeof (zfs_snap_avl_node_t), KM_SLEEP);
 473 
 474         (void) strlcpy(snap_el->name, full_snap_name,
 475             sizeof (snap_el->name));
 476         snap_el->guid = dsl_dataset_phys(snap_ds)->ds_guid;
 477         snap_el->txg = dsl_dataset_phys(snap_ds)->ds_creation_txg;
 478         snap_el->ds = snap_ds;
 479 
 480         return (snap_el);
 481 }
 482 
 483 /*
 484  * This function is used to make decision about include
 485  * the given snap_ds into stream or not.
 486  *
 487  * Returns B_TRUE if the given snapshot has the given
 488  * prop_name and its value is not equal to the given prop_val,
 489  * otherwise returns B_FALSE
 490  */
 491 static boolean_t
 492 zfs_skip_check(dsl_dataset_t *snap_ds,
 493     const char *prop_name, const char *prop_val)
 494 {
 495         uint64_t zapobj;
 496         objset_t *mos;
 497         char val[ZAP_MAXVALUELEN];
 498         uint64_t cnt = 0, el = 0;
 499 
 500         if (prop_name == NULL || prop_val == NULL)
 501                 return (B_FALSE);
 502 
 503         mos = snap_ds->ds_dir->dd_pool->dp_meta_objset;
 504         zapobj = dsl_dataset_phys(snap_ds)->ds_props_obj;
 505 
 506         if (zap_length(mos, zapobj, prop_name, &el, &cnt) == 0) {
 507                 if (zap_lookup(mos, zapobj, prop_name,
 508                         STRING_PROP_EL_SIZE, cnt, val) != 0)
 509                         return (B_FALSE);
 510 
 511                 if (strcmp(prop_val, val) != 0)
 512                         return (B_TRUE);
 513         }
 514 
 515         return (B_FALSE);
 516 }
 517 
 518 /*
 519  * Collects all snapshots (txg_first < Creation TXG < txg_last)
 520  * for the given FS and adds them to the resulting AVL-tree
 521  */
 522 static int
 523 zfs_collect_interim_snaps(dmu_krrp_task_t *krrp_task,
 524     zfs_ds_node_t *fs_el, uint64_t txg_first,
 525     uint64_t txg_last)
 526 {
 527         int err;
 528         uint64_t ds_creation_txg;
 529         avl_tree_t *snapshots = &fs_el->snapshots;
 530         zfs_snap_avl_node_t *snap_el;
 531         char full_snap_name[ZFS_MAX_DATASET_NAME_LEN];
 532         char *snap_name;
 533         objset_t *os = NULL;
 534         dsl_dataset_t *snap_ds = NULL;
 535         dsl_dataset_t *ds = fs_el->ds;
 536         dsl_pool_t *dp = ds->ds_dir->dd_pool;
 537         uint64_t offp = 0, obj = 0;
 538 
 539         dsl_pool_config_enter(dp, FTAG);
 540 
 541         err = dmu_objset_from_ds(ds, &os);
 542         if (err != 0) {
 543                 dsl_pool_config_exit(dp, FTAG);
 544                 return (err);
 545         }
 546 
 547         (void) snprintf(full_snap_name, sizeof (full_snap_name),
 548             "%s@", fs_el->name);
 549         snap_name = strchr(full_snap_name, '@') + 1;
 550 
 551         /* walk over snapshots and add them to the tree to sort */
 552         for (;;) {
 553                 snap_ds = NULL;
 554                 snap_name[0] = '\0';
 555                 err = dmu_snapshot_list_next(os,
 556                     sizeof (full_snap_name) - strlen(full_snap_name),
 557                     full_snap_name + strlen(full_snap_name),
 558                     &obj, &offp, NULL);
 559                 if (err != 0) {
 560                         if (err == ENOENT) {
 561                                 /*
 562                                  * ENOENT in this case means no more
 563                                  * snapshots, that is not an error
 564                                  */
 565                                 err = 0;
 566                         }
 567 
 568                         break;
 569                 }
 570 
 571                 /* We do not want intermediate autosnapshots */
 572                 if (autosnap_check_name(snap_name))
 573                         continue;
 574 
 575                 err = dsl_dataset_hold(dp, full_snap_name, krrp_task, &snap_ds);
 576                 if (err != 0) {
 577                         ASSERT(err != ENOENT);
 578                         break;
 579                 }
 580 
 581                 ds_creation_txg =
 582                     dsl_dataset_phys(snap_ds)->ds_creation_txg;
 583 
 584                 /*
 585                  * We want only snapshots that are inside of
 586                  * our boundaries
 587                  * boundary snap_el already added to avl
 588                  */
 589                 if (ds_creation_txg <= txg_first ||
 590                     ds_creation_txg >= txg_last) {
 591                         dsl_dataset_rele(snap_ds, krrp_task);
 592                         continue;
 593                 }
 594 
 595                 if (zfs_skip_check(snap_ds,
 596                     krrp_task->buffer_args.skip_snaps_prop_name,
 597                     krrp_task->buffer_args.skip_snaps_prop_val)) {
 598                         dsl_dataset_rele(snap_ds, krrp_task);
 599                         continue;
 600                 }
 601 
 602                 snap_el = zfs_construct_snap_node(snap_ds,
 603                     full_snap_name);
 604                 dsl_dataset_long_hold(snap_ds, krrp_task);
 605                 avl_add(snapshots, snap_el);
 606         }
 607 
 608         dsl_pool_config_exit(dp, FTAG);
 609 
 610         return (err);
 611 }
 612 
 613 /*
 614  * Collect snapshots of a given dataset in a given range, where
 615  *     'to_snap'   - the right boundary
 616  *     'from_snap' - the left boundary
 617  * Collects interim snapshots if incl_interim_snaps == B_TRUE
 618  */
 619 static int
 620 zfs_collect_snaps(dmu_krrp_task_t *krrp_task,
 621     zfs_ds_node_t *fs_el, char *from_snap,
 622     char *to_snap, boolean_t incl_interim_snaps)
 623 {
 624         int err = 0;
 625         dsl_dataset_t *snap_ds = NULL;
 626         dsl_dataset_t *fs_ds = fs_el->ds;
 627         dsl_pool_t *dp = fs_ds->ds_dir->dd_pool;
 628         uint64_t txg_first = 0, txg_last = UINT64_MAX;
 629         char full_snap_name[ZFS_MAX_DATASET_NAME_LEN];
 630         char *snap_name;
 631         boolean_t no_from_snap = B_TRUE;
 632 
 633         zfs_snap_avl_node_t *from_snap_el = NULL;
 634         zfs_snap_avl_node_t *to_snap_el = NULL;
 635 
 636         /* the right boundary snapshot should be exist */
 637         if (to_snap == NULL || to_snap[0] == '\0')
 638                 return (SET_ERROR(EINVAL));
 639 
 640         dsl_pool_config_enter(dp, FTAG);
 641 
 642         /*
 643          * Snapshots must be sorted in the ascending order by birth_txg
 644          */
 645         avl_create(&fs_el->snapshots, zfs_snapshot_txg_compare,
 646             sizeof (zfs_snap_avl_node_t),
 647             offsetof(zfs_snap_avl_node_t, avl_node));
 648 
 649         (void) snprintf(full_snap_name, sizeof (full_snap_name),
 650             "%s@%s", fs_el->name, to_snap);
 651         snap_name = strchr(full_snap_name, '@') + 1;
 652 
 653         err = dsl_dataset_hold(dp, full_snap_name, krrp_task, &snap_ds);
 654         if (err != 0) {
 655                 dsl_pool_config_exit(dp, FTAG);
 656 
 657                 /*
 658                  * This FS was created after 'to_snap',
 659                  * so skip it at this time
 660                  */
 661                 if (err == ENOENT)
 662                         err = 0;
 663 
 664                 return (err);
 665         }
 666 
 667         to_snap_el = zfs_construct_snap_node(snap_ds,
 668             full_snap_name);
 669         txg_last = dsl_dataset_phys(snap_ds)->ds_creation_txg;
 670         dsl_dataset_long_hold(to_snap_el->ds, krrp_task);
 671         avl_add(&fs_el->snapshots, to_snap_el);
 672 
 673         /* check left boundary */
 674         if (from_snap != NULL && from_snap[0] != '\0') {
 675                 snap_ds = NULL;
 676                 snap_name[0] = '\0';
 677                 (void) strcat(full_snap_name, from_snap);
 678                 err = dsl_dataset_hold(dp, full_snap_name,
 679                     krrp_task, &snap_ds);
 680 
 681                 if (err == 0) {
 682                         txg_first =
 683                             dsl_dataset_phys(snap_ds)->ds_creation_txg;
 684                         from_snap_el =
 685                             zfs_construct_snap_node(snap_ds, full_snap_name);
 686                         dsl_dataset_long_hold(from_snap_el->ds, krrp_task);
 687                         avl_add(&fs_el->snapshots, from_snap_el);
 688                         no_from_snap = B_FALSE;
 689                 } else {
 690                         /*
 691                          * it is possible that from_snap does not exist
 692                          * for a child FS, because the FS was created
 693                          * after from_snap
 694                          */
 695                         if (err != ENOENT || fs_el->is_root) {
 696                                 dsl_pool_config_exit(dp, FTAG);
 697                                 return (err);
 698                         }
 699 
 700                         err = 0;
 701                 }
 702         }
 703 
 704         /*
 705          * For cloned DS that doesn't have from_snap
 706          * need to  igin_snap as from_snap
 707          * The owner of the held origin will be fs_el
 708          */
 709         if (no_from_snap && fs_el->origin_name[0] != '\0') {
 710                 snap_ds = NULL;
 711                 err = dsl_dataset_hold(dp, fs_el->origin_name,
 712                     fs_el, &snap_ds);
 713                 if (err != 0) {
 714                         dsl_pool_config_exit(dp, FTAG);
 715                         return (err);
 716                 }
 717 
 718                 /*
 719                  * Need to be sure that origin's name doesn't
 720                  * match the skip_mask. If origin was not/will
 721                  * not be replicated to the destination, then
 722                  * its clone will be replicated as a regular DS.
 723                  */
 724                 if (zfs_skip_check(snap_ds,
 725                     krrp_task->buffer_args.skip_snaps_prop_name,
 726                     krrp_task->buffer_args.skip_snaps_prop_val)) {
 727                         dsl_dataset_rele(snap_ds, fs_el);
 728                 } else {
 729                         dsl_dataset_long_hold(snap_ds, fs_el);
 730                         from_snap_el = zfs_construct_snap_node(snap_ds,
 731                             fs_el->origin_name);
 732                         from_snap_el->origin = B_TRUE;
 733                         avl_add(&fs_el->snapshots, from_snap_el);
 734                 }
 735         }
 736 
 737         dsl_pool_config_exit(dp, FTAG);
 738 
 739         /*
 740          * 'FROM' snapshot cannot be created before 'TO' snapshot
 741          * and
 742          * 'FROM' and 'TO' snapshots cannot be the same snapshot
 743          */
 744         if (txg_last <= txg_first)
 745                 return (SET_ERROR(EXDEV));
 746 
 747         /*
 748          * If 'incl_interim_snaps' flag isn't presented,
 749          * only 'from' and 'to' snapshots should be in list
 750          */
 751         if (!incl_interim_snaps)
 752                 return (0);
 753 
 754         err = zfs_collect_interim_snaps(krrp_task, fs_el,
 755             txg_first, txg_last);
 756 
 757         return (err);
 758 }
 759 
 760 static boolean_t
 761 zfs_is_snapshot_belong(const char *ds_name, const char *snap_name)
 762 {
 763         char *at;
 764 
 765         ASSERT(strchr(ds_name, '@') == NULL);
 766         VERIFY((at = strrchr(snap_name, '@')) != NULL);
 767 
 768         return (strncmp(ds_name, snap_name, at - snap_name + 1) == 0);
 769 }
 770 
 771 /*
 772  * AVL compare function for cloned datasets
 773  * To be sure that a cloned dataset will be replicated
 774  * after its origin this functions does 2-stage compare.
 775  * At the first stage it compares origin_name and name
 776  * of both nodes to check that either s1 is clone of s2
 777  * or vise versa.
 778  * If s1 and s2 don't have dependencies, then at the second
 779  * stage this function compares their TXG.
 780  */
 781 static int
 782 zfs_cloned_ds_compare(const void *arg1, const void *arg2)
 783 {
 784         const zfs_ds_node_t *s1 = arg1;
 785         const zfs_ds_node_t *s2 = arg2;
 786 
 787         /* s1 is clone of s2, so s1 needs to be placed after s2 */
 788         if (zfs_is_snapshot_belong(s2->name, s1->origin_name))
 789                 return (+1);
 790 
 791         /* s2 is clone of s1, so s2 needs to be placed after s1 */
 792         if (zfs_is_snapshot_belong(s1->name, s2->origin_name))
 793                 return (-1);
 794 
 795         if (s1->creation_txg > s2->creation_txg)
 796                 return (+1);
 797 
 798         if (s1->creation_txg < s2->creation_txg)
 799                 return (-1);
 800 
 801         return (0);
 802 }
 803 
 804 /*
 805  * This function retrieves the name and
 806  * the guid of origin snapshot for the given clone
 807  */
 808 static int
 809 zfs_populate_clone_info(zfs_ds_node_t *node)
 810 {
 811         int err;
 812         dsl_dataset_t *ds_origin = NULL;
 813         dsl_dir_t *ds_dir = node->ds->ds_dir;
 814         dsl_pool_t *dp = ds_dir->dd_pool;
 815 
 816         err = dsl_dataset_hold_obj(dp,
 817             dsl_dir_phys(ds_dir)->dd_origin_obj, FTAG, &ds_origin);
 818         if (err != 0)
 819                 return (err);
 820 
 821         ASSERT(ds_origin->ds_is_snapshot);
 822         dsl_dataset_name(ds_origin, node->origin_name);
 823         ASSERT(strchr(node->origin_name, '@') != NULL);
 824         node->origin_guid = dsl_dataset_phys(ds_origin)->ds_guid;
 825         dsl_dataset_rele(ds_origin, FTAG);
 826 
 827         return (0);
 828 }
 829 
 830 /*
 831  * This function is used to lookup a node in the given list,
 832  * that points to the parent of origin snapshot for the given
 833  * clone_node. The last one is not a part of the list.
 834  *
 835  * If the list doesn't have required node, then NULL is returned.
 836  */
 837 static zfs_ds_node_t *
 838 zfs_lookup_origin_node(list_t *ds_to_send, zfs_ds_node_t *clone_node)
 839 {
 840         char *at;
 841         zfs_ds_node_t *node;
 842 
 843         at = strchr(clone_node->origin_name, '@');
 844         *at = '\0';
 845 
 846         node = list_head(ds_to_send);
 847         while (node != NULL) {
 848                 if (strcmp(node->name, clone_node->origin_name) == 0)
 849                         break;
 850 
 851                 node = list_next(ds_to_send, node);
 852         }
 853 
 854         *at = '@';
 855         return (node);
 856 }
 857 
 858 /*
 859  * This function is used to lookup a node in the given list,
 860  * that points to the parent of the tnode. The last one
 861  * is not a part of the list.
 862  *
 863  * If the list doesn't have required node, then NULL is returned.
 864  */
 865 static zfs_ds_node_t *
 866 zfs_lookup_parent_node(list_t *ds_list, zfs_ds_node_t *tnode,
 867     zfs_ds_node_t  *start_node)
 868 {
 869         char *final_slash;
 870         zfs_ds_node_t *node;
 871 
 872         final_slash = strrchr(tnode->name, '/');
 873         if (final_slash == NULL)
 874                 return (NULL);
 875 
 876         *final_slash = '\0';
 877 
 878         node = (start_node == NULL) ? list_head(ds_list) : start_node;
 879         while (node != NULL) {
 880                 if (strcmp(node->name, tnode->name) == 0)
 881                         break;
 882 
 883                 node = list_next(ds_list, node);
 884         }
 885 
 886         *final_slash = '/';
 887         return (node);
 888 }
 889 
 890 static int
 891 zfs_construct_ds_node(dsl_pool_t *dp, uint64_t ds_object,
 892     void *owner, zfs_ds_node_t **result)
 893 {
 894         zfs_ds_node_t *node;
 895         int err;
 896 
 897         ASSERT(result != NULL && *result == NULL);
 898 
 899         node = kmem_zalloc(sizeof (zfs_ds_node_t), KM_SLEEP);
 900 
 901         /* We need our own "hold" on the dataset */
 902         err = dsl_dataset_hold_obj(dp, ds_object,
 903             owner, &node->ds);
 904         if (err != 0) {
 905                 kmem_free(node, sizeof (zfs_ds_node_t));
 906                 return (err);
 907         }
 908 
 909         dsl_dataset_long_hold(node->ds, owner);
 910 
 911         dsl_dataset_name(node->ds, node->name);
 912         node->creation_txg = dsl_dataset_phys(node->ds)->ds_creation_txg;
 913 
 914         node->is_clone = dsl_dir_is_clone(node->ds->ds_dir);
 915         if (node->is_clone) {
 916                 err = zfs_populate_clone_info(node);
 917                 if (err != 0) {
 918                         dsl_dataset_long_rele(node->ds, owner);
 919                         dsl_dataset_rele(node->ds, owner);
 920                         kmem_free(node, sizeof (zfs_ds_node_t));
 921                         return (err);
 922                 }
 923         }
 924 
 925         *result = node;
 926         return (0);
 927 }
 928 
 929 /*
 930  * This function walks only next-level children (depth = 1)
 931  * and puts them into the given list.
 932  * Clones also are placed into the given AVL.
 933  */
 934 static int
 935 zfs_collect_children(dmu_krrp_task_t *krrp_task, dsl_pool_t *dp,
 936     zfs_ds_node_t *parent_node, list_t *ds_list, avl_tree_t *clones)
 937 {
 938         zap_cursor_t zc;
 939         zap_attribute_t attr;
 940         int err;
 941         objset_t *mos = dp->dp_meta_objset;
 942         uint64_t dd_child_dir_zapobj =
 943             dsl_dir_phys(parent_node->ds->ds_dir)->dd_child_dir_zapobj;
 944 
 945         zap_cursor_init(&zc, mos, dd_child_dir_zapobj);
 946         while (zap_cursor_retrieve(&zc, &attr) == 0) {
 947                 dsl_dir_t *dd = NULL;
 948                 zfs_ds_node_t *node = NULL;
 949 
 950                 ASSERT3U(attr.za_integer_length, ==,
 951                         sizeof (uint64_t));
 952                 ASSERT3U(attr.za_num_integers, ==, 1);
 953 
 954                 err = dsl_dir_hold_obj(dp, attr.za_first_integer,
 955                     attr.za_name, FTAG, &dd);
 956                 if (err != 0)
 957                         break;
 958 
 959                 err = zfs_construct_ds_node(dp,
 960                     dsl_dir_phys(dd)->dd_head_dataset_obj,
 961                     krrp_task, &node);
 962                 dsl_dir_rele(dd, FTAG);
 963                 if (err != 0) {
 964                         break;
 965                 }
 966 
 967                 list_insert_tail(ds_list, node);
 968                 if (node->is_clone)
 969                         avl_add(clones, node);
 970 
 971                 (void) zap_cursor_advance(&zc);
 972         }
 973 
 974         zap_cursor_fini(&zc);
 975 
 976         return (err);
 977 }
 978 
 979 /*
 980  * Collect datasets and snapshots of each dataset.
 981  *
 982  * This function walks ZFS-tree of datasets by using
 983  * breadth-first search (BFS) method to avoid misordering
 984  * in case of existing cloned datasets.
 985  */
 986 static int
 987 zfs_collect_ds(dmu_krrp_task_t *krrp_task, spa_t *spa, list_t *ds_list)
 988 {
 989         int err = 0;
 990         dsl_pool_t *dp;
 991         dsl_dataset_t *ds = NULL;
 992         uint64_t root_ds_object;
 993         zfs_ds_node_t *clone_node, *node, *parent_node;
 994         void *cookie = NULL;
 995         avl_tree_t clones;
 996 
 997         char *from_ds = krrp_task->buffer_args.from_ds;
 998         char *from_snap = krrp_task->buffer_args.from_incr_base;
 999         char *to_snap = krrp_task->buffer_args.from_snap;
1000         boolean_t incl_interim_snaps = krrp_task->buffer_args.do_all;
1001         boolean_t recursive = krrp_task->buffer_args.recursive;
1002 
1003         dp = spa_get_dsl(spa);
1004 
1005         dsl_pool_config_enter(dp, FTAG);
1006 
1007         err = dsl_dataset_hold(dp, from_ds, FTAG, &ds);
1008         if (err != 0) {
1009                 dsl_pool_config_exit(dp, FTAG);
1010                 return (err);
1011         }
1012 
1013         root_ds_object = ds->ds_object;
1014         dsl_dataset_rele(ds, FTAG);
1015 
1016         node = NULL;
1017         err = zfs_construct_ds_node(dp, root_ds_object,
1018             krrp_task, &node);
1019         if (err != 0) {
1020                 dsl_pool_config_exit(dp, FTAG);
1021                 return (err);
1022         }
1023 
1024         node->is_root = B_TRUE;
1025         list_insert_head(ds_list, node);
1026 
1027         avl_create(&clones, zfs_cloned_ds_compare,
1028             sizeof (zfs_ds_node_t), offsetof(zfs_ds_node_t, avl_node));
1029 
1030         if (recursive) {
1031                 /*
1032                  * The following loop walk over the list,
1033                  * that is populated by zfs_collect_children(),
1034                  * that always puts new items to the tail.
1035                  *
1036                  */
1037                 while (node != NULL) {
1038                         err = zfs_collect_children(krrp_task,
1039                             dp, node, ds_list, &clones);
1040                         if (err != 0)
1041                                 break;
1042 
1043                         node = list_next(ds_list, node);
1044                 }
1045         }
1046 
1047         dsl_pool_config_exit(dp, FTAG);
1048 
1049         if (err != 0) {
1050                 while ((node = avl_destroy_nodes(&clones, &cookie)) != NULL);
1051                 avl_destroy(&clones);
1052                 return (err);
1053         }
1054 
1055         /*
1056          * We've collected all required datasets.
1057          *
1058          * Now need to do additional resort to place cloned datasets
1059          * to the correct position. And there are 2 cases:
1060          *  (1) parent is located before the origin DS
1061          *  (2) parent is located after the origin DS
1062          * In the first case need to place clone rigth after origin,
1063          * in the second after parent.
1064          *
1065          * avl_destroy_nodes() cannot be used here, because it
1066          * travels AVL from the end.
1067          */
1068         while ((clone_node = avl_first(&clones)) != NULL) {
1069                 avl_remove(&clones, clone_node);
1070                 list_remove(ds_list, clone_node);
1071 
1072                 clone_node->origin =
1073                     zfs_lookup_origin_node(ds_list, clone_node);
1074 
1075                 if (clone_node->origin == NULL) {
1076                         /*
1077                          * It seems the origin is outside
1078                          * of replication tree and in this case doesn't
1079                          * matter where this node will be in the list
1080                          */
1081 
1082                         list_insert_tail(ds_list, clone_node);
1083                         continue;
1084                 }
1085 
1086                 /*
1087                  * We are looking for parent starting from origin,
1088                  * because cannot place clone before its origin.
1089                  *
1090                  * parent_node == NULL means that it is located
1091                  * in the list before origin, so we can just put
1092                  * it rigth after the origin.
1093                  */
1094                 parent_node = zfs_lookup_parent_node(ds_list,
1095                     clone_node, clone_node->origin);
1096                 if (parent_node == NULL) {
1097                         list_insert_after(ds_list,
1098                             clone_node->origin, clone_node);
1099                 } else {
1100                         list_insert_after(ds_list,
1101                             parent_node, clone_node);
1102                 }
1103         }
1104 
1105         avl_destroy(&clones);
1106 
1107         node = list_head(ds_list);
1108         while (err == 0 && node != NULL) {
1109                 err = zfs_collect_snaps(krrp_task, node,
1110                     from_snap, to_snap, incl_interim_snaps);
1111                 node = list_next(ds_list, node);
1112         }
1113 
1114         return (err);
1115 }
1116 
1117 /* Send a single dataset, mostly mimic regular send */
1118 static int
1119 zfs_send_one_ds(dmu_krrp_task_t *krrp_task, zfs_snap_avl_node_t *snap_el,
1120     zfs_snap_avl_node_t *snap_el_prev)
1121 {
1122         int err = 0;
1123         offset_t off = 0;
1124         dsl_pool_t *dp = NULL;
1125         dsl_dataset_t *snap_ds = NULL;
1126         dsl_dataset_t *snap_ds_prev = NULL;
1127         boolean_t embedok = krrp_task->buffer_args.embedok;
1128         boolean_t compressok = krrp_task->buffer_args.compressok;
1129         boolean_t large_block_ok = krrp_task->buffer_args.large_block_ok;
1130         nvlist_t *resume_info = krrp_task->buffer_args.resume_info;
1131         uint64_t resumeobj = 0, resumeoff = 0;
1132 
1133         /*
1134          * 'ds' of snap_ds/snap_ds_prev alredy long-held
1135          * so we do not need to hold them again
1136          */
1137 
1138         snap_ds = snap_el->ds;
1139         if (snap_el_prev != NULL)
1140                 snap_ds_prev = snap_el_prev->ds;
1141 
1142         /*
1143          * dsl_pool_config_enter() cannot be used here because
1144          * dmu_send_impl() calls dsl_pool_rele()
1145          *
1146          * VERIFY0() is used because dsl_pool_hold() opens spa,
1147          * that already is opened in our case, so it cannot fail
1148          */
1149         VERIFY0(dsl_pool_hold(snap_el->name, FTAG, &dp));
1150 
1151         if (resume_info != NULL) {
1152                 err = nvlist_lookup_uint64(resume_info, "object", &resumeobj);
1153                 ASSERT3U(err, !=, ENOENT);
1154                 if (err != 0) {
1155                         dsl_pool_rele(dp, FTAG);
1156                         return (SET_ERROR(err));
1157                 }
1158 
1159                 err = nvlist_lookup_uint64(resume_info, "offset", &resumeoff);
1160                 ASSERT3U(err, !=, ENOENT);
1161                 if (err != 0) {
1162                         dsl_pool_rele(dp, FTAG);
1163                         return (SET_ERROR(err));
1164                 }
1165         }
1166 
1167         if (krrp_debug) {
1168                 cmn_err(CE_NOTE, "KRRP SEND INC_BASE: %s -- DS: "
1169                     "%s -- GUID: %llu",
1170                     snap_el_prev == NULL ? "<none>" : snap_el_prev->name,
1171                     snap_el->name,
1172                     (unsigned long long)dsl_dataset_phys(snap_ds)->ds_guid);
1173         }
1174 
1175         if (snap_ds_prev != NULL) {
1176                 zfs_bookmark_phys_t zb;
1177                 boolean_t is_clone;
1178 
1179                 if (!dsl_dataset_is_before(snap_ds, snap_ds_prev, 0)) {
1180                         dsl_pool_rele(dp, FTAG);
1181                         return (SET_ERROR(EXDEV));
1182                 }
1183 
1184                 zb.zbm_creation_time =
1185                     dsl_dataset_phys(snap_ds_prev)->ds_creation_time;
1186                 zb.zbm_creation_txg =
1187                     dsl_dataset_phys(snap_ds_prev)->ds_creation_txg;
1188                 zb.zbm_guid = dsl_dataset_phys(snap_ds_prev)->ds_guid;
1189                 is_clone = (snap_ds_prev->ds_dir != snap_ds->ds_dir);
1190 
1191                 err = dmu_send_impl(FTAG, dp, snap_ds, &zb, is_clone,
1192                     embedok, large_block_ok, compressok, -1, resumeobj, resumeoff, NULL,
1193                     &off, krrp_task);
1194         } else {
1195                 err = dmu_send_impl(FTAG, dp, snap_ds, NULL, B_FALSE,
1196                     embedok, large_block_ok, compressok, -1, resumeobj, resumeoff, NULL,
1197                     &off, krrp_task);
1198         }
1199 
1200         /*
1201          * dsl_pool_rele() is not required here
1202          * because dmu_send_impl() already did it
1203          */
1204 
1205         return (err);
1206 }
1207 
1208 /*
1209  * Here we iterate over all collected FSs and
1210  * their SNAPs to collect props
1211  */
1212 static int
1213 zfs_prepare_compound_data(list_t *fs_list, nvlist_t **fss)
1214 {
1215         zfs_ds_node_t *fs_el;
1216         int err = 0;
1217         nvlist_t *nvfss;
1218         uint64_t guid;
1219         char sguid[64];
1220 
1221         nvfss = fnvlist_alloc();
1222 
1223         /* Traverse the list of datasetss */
1224         fs_el = list_head(fs_list);
1225         while (fs_el != NULL) {
1226                 zfs_snap_avl_node_t *snap_el;
1227                 nvlist_t *nvfs, *nvsnaps, *nvsnaps_props;
1228                 char *at;
1229 
1230                 nvfs = fnvlist_alloc();
1231                 fnvlist_add_string(nvfs, "name", fs_el->name);
1232 
1233                 if (fs_el->origin_name[0] != '\0') {
1234                         fnvlist_add_uint64(nvfs,
1235                         "origin", fs_el->origin_guid);
1236                         VERIFY((at = strchr(fs_el->origin_name, '@')) != NULL);
1237                         *at = '\0';
1238                         fnvlist_add_string(nvfs,
1239                         "origin_fsname", fs_el->origin_name);
1240                         *at = '@';
1241                 }
1242 
1243                 err = zfs_collect_fs_props(fs_el->ds, nvfs);
1244                 if (err != 0) {
1245                         fnvlist_free(nvfs);
1246                         break;
1247                 }
1248 
1249                 nvsnaps = fnvlist_alloc();
1250                 nvsnaps_props = fnvlist_alloc();
1251 
1252                 snap_el = avl_first(&fs_el->snapshots);
1253                 while (snap_el != NULL) {
1254                         nvlist_t *nvsnap_props = NULL;
1255                         char *snapname, *at;
1256 
1257                         at = strrchr(snap_el->name, '@');
1258                         ASSERT(at != NULL);
1259                         if (at == NULL) {
1260                                 err = SET_ERROR(EILSEQ);
1261                                 break;
1262                         }
1263 
1264                         err = zfs_collect_snap_props(snap_el->ds,
1265                             &nvsnap_props);
1266                         if (err != 0)
1267                                 break;
1268 
1269                         snapname = at + 1;
1270                         fnvlist_add_uint64(nvsnaps, snapname, snap_el->guid);
1271                         fnvlist_add_nvlist(nvsnaps_props,
1272                             snapname, nvsnap_props);
1273                         fnvlist_free(nvsnap_props);
1274 
1275                         snap_el = AVL_NEXT(&fs_el->snapshots, snap_el);
1276                 }
1277 
1278                 if (err == 0) {
1279                         fnvlist_add_nvlist(nvfs, "snaps", nvsnaps);
1280                         fnvlist_add_nvlist(nvfs, "snapprops",
1281                             nvsnaps_props);
1282 
1283                         guid = dsl_dataset_phys(fs_el->ds)->ds_guid;
1284                         (void) sprintf(sguid, "0x%llx",
1285                             (unsigned long long)guid);
1286                         fnvlist_add_nvlist(nvfss, sguid, nvfs);
1287                 }
1288 
1289                 fnvlist_free(nvsnaps);
1290                 fnvlist_free(nvsnaps_props);
1291                 fnvlist_free(nvfs);
1292 
1293                 if (err != 0)
1294                         break;
1295 
1296                 fs_el = list_next(fs_list, fs_el);
1297         }
1298 
1299         if (err != 0)
1300                 fnvlist_free(nvfss);
1301         else
1302                 *fss = nvfss;
1303 
1304         return (err);
1305 }
1306 
1307 static void
1308 zfs_prepare_compound_hdr(dmu_krrp_task_t *krrp_task, nvlist_t **hdrnvl)
1309 {
1310         nvlist_t *nvl;
1311 
1312         nvl = fnvlist_alloc();
1313 
1314         if (krrp_task->buffer_args.from_incr_base[0] != '\0') {
1315                 fnvlist_add_string(nvl, "fromsnap",
1316                     krrp_task->buffer_args.from_incr_base);
1317         }
1318 
1319         fnvlist_add_string(nvl, "tosnap", krrp_task->buffer_args.from_snap);
1320 
1321         if (!krrp_task->buffer_args.recursive)
1322                 fnvlist_add_boolean(nvl, "not_recursive");
1323 
1324         *hdrnvl = nvl;
1325 }
1326 
1327 static int
1328 zfs_send_compound_stream_header(dmu_krrp_task_t *krrp_task, list_t *ds_to_send)
1329 {
1330         int err;
1331         nvlist_t *fss = NULL;
1332         nvlist_t *hdrnvl = NULL;
1333         dmu_replay_record_t drr;
1334         zio_cksum_t zc = { 0 };
1335         char *packbuf = NULL;
1336         size_t buflen = 0;
1337 
1338         zfs_prepare_compound_hdr(krrp_task, &hdrnvl);
1339 
1340         err = zfs_prepare_compound_data(ds_to_send, &fss);
1341         if (err != 0)
1342                 return (err);
1343 
1344         fnvlist_add_nvlist(hdrnvl, "fss", fss);
1345         fnvlist_free(fss);
1346 
1347         VERIFY0(nvlist_pack(hdrnvl, &packbuf, &buflen,
1348             NV_ENCODE_XDR, KM_SLEEP));
1349         fnvlist_free(hdrnvl);
1350 
1351         bzero(&drr, sizeof (drr));
1352         drr.drr_type = DRR_BEGIN;
1353         drr.drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
1354         DMU_SET_STREAM_HDRTYPE(drr.drr_u.drr_begin.drr_versioninfo,
1355             DMU_COMPOUNDSTREAM);
1356         (void) snprintf(drr.drr_u.drr_begin.drr_toname,
1357             sizeof (drr.drr_u.drr_begin.drr_toname),
1358             "%s@%s", krrp_task->buffer_args.from_ds,
1359             krrp_task->buffer_args.from_snap);
1360         drr.drr_payloadlen = buflen;
1361         if (krrp_task->buffer_args.force_cksum)
1362                 (void) fletcher_4_incremental_native(&drr, sizeof (drr), &zc);
1363 
1364         err = dmu_krrp_buffer_write(&drr, sizeof (drr), krrp_task);
1365         if (err != 0)
1366                 goto out;
1367 
1368         if (buflen != 0) {
1369                 if (krrp_task->buffer_args.force_cksum)
1370                         (void) fletcher_4_incremental_native(packbuf, buflen, &zc);
1371 
1372                 err = dmu_krrp_buffer_write(packbuf, buflen, krrp_task);
1373                 if (err != 0)
1374                         goto out;
1375         }
1376 
1377         bzero(&drr, sizeof (drr));
1378         drr.drr_type = DRR_END;
1379         drr.drr_u.drr_end.drr_checksum = zc;
1380 
1381         err = dmu_krrp_buffer_write(&drr, sizeof (drr), krrp_task);
1382 
1383 out:
1384         if (packbuf != NULL)
1385                 kmem_free(packbuf, buflen);
1386 
1387         return (err);
1388 }
1389 
1390 /*
1391  * For every dataset there is a chain of snapshots. It may start with
1392  * an empty record, which means it is a non-incremental snap, after
1393  * that this dataset is considered to be under an incremental stream.
1394  * In an incremental stream, first snapshot for every dataset is
1395  * an incremental base. After sending, currently sent snapshot
1396  * becomes a base for the next one unless the next belongs to
1397  * another dataset or is an empty record.
1398  */
1399 static int
1400 zfs_send_snapshots(dmu_krrp_task_t *krrp_task, avl_tree_t *snapshots,
1401     char *resume_snap_name)
1402 {
1403         int err = 0;
1404         char *incr_base = krrp_task->buffer_args.from_incr_base;
1405         zfs_snap_avl_node_t *snap_el, *snap_el_prev = NULL;
1406 
1407         snap_el = avl_first(snapshots);
1408 
1409         /*
1410          * It is possible that a new FS does not yet have snapshots,
1411          * because the FS was created after the right border snapshot
1412          */
1413         if (snap_el == NULL)
1414                 return (0);
1415 
1416         /*
1417          * For an incemental stream need to skip
1418          * the incremental base snapshot
1419          */
1420         if (incr_base[0] != '\0') {
1421                 char *short_snap_name = strrchr(snap_el->name, '@') + 1;
1422                 if (strcmp(incr_base, short_snap_name) == 0) {
1423                         snap_el_prev = snap_el;
1424                         snap_el = AVL_NEXT(snapshots, snap_el);
1425                 }
1426         }
1427 
1428         if (resume_snap_name != NULL) {
1429                 while (snap_el != NULL) {
1430                         if (strcmp(snap_el->name, resume_snap_name) == 0)
1431                                 break;
1432 
1433                         snap_el_prev = snap_el;
1434                         snap_el = AVL_NEXT(snapshots, snap_el);
1435                 }
1436         }
1437 
1438         /*
1439          * Origin snapshot is here not to sent it,
1440          * it is used to define start point
1441          */
1442         if (snap_el != NULL && snap_el->origin) {
1443                 snap_el_prev = snap_el;
1444                 snap_el = AVL_NEXT(snapshots, snap_el);
1445         }
1446 
1447         while (snap_el != NULL) {
1448                 err = zfs_send_one_ds(krrp_task, snap_el, snap_el_prev);
1449                 if (err != 0)
1450                         break;
1451 
1452                 /*
1453                  * We have sent resumed snap,
1454                  * so resume_info is not relevant anymore
1455                  */
1456                 if (krrp_task->buffer_args.resume_info != NULL) {
1457                         fnvlist_free(krrp_task->buffer_args.resume_info);
1458                         krrp_task->buffer_args.resume_info = NULL;
1459                 }
1460 
1461                 snap_el_prev = snap_el;
1462                 snap_el = AVL_NEXT(snapshots, snap_el);
1463         }
1464 
1465         return (err);
1466 }
1467 
1468 static int
1469 dmu_krrp_send_resume(char *resume_token, list_t *ds_to_send,
1470     char **resume_fs_name, char **resume_snap_name)
1471 {
1472         zfs_ds_node_t *fs_el;
1473         zfs_snap_avl_node_t *snap_el;
1474         char *at_ptr;
1475 
1476         at_ptr = strrchr(resume_token, '@');
1477         if (at_ptr == NULL) {
1478                 cmn_err(CE_WARN, "Invalid resume_token [%s]", resume_token);
1479                 return (SET_ERROR(ENOSR));
1480         }
1481 
1482         *at_ptr = '\0';
1483 
1484         /* First need to find FS that matches the given cookie */
1485         fs_el = list_head(ds_to_send);
1486         while (fs_el != NULL) {
1487                 if (strcmp(fs_el->name, resume_token) == 0)
1488                         break;
1489 
1490                 fs_el = list_next(ds_to_send, fs_el);
1491         }
1492 
1493         /* There is no target FS */
1494         if (fs_el == NULL) {
1495                 cmn_err(CE_WARN, "Unknown FS name [%s]", resume_token);
1496                 return (SET_ERROR(ENOSR));
1497         }
1498 
1499         *at_ptr = '@';
1500 
1501         /*
1502          * FS has been found, need to find SNAP that
1503          * matches the given cookie
1504          */
1505         snap_el = avl_first(&fs_el->snapshots);
1506         while (snap_el != NULL) {
1507                 if (strcmp(snap_el->name, resume_token) == 0)
1508                         break;
1509 
1510                 snap_el = AVL_NEXT(&fs_el->snapshots, snap_el);
1511         }
1512 
1513         /* There is no target snapshot */
1514         if (snap_el == NULL) {
1515                 cmn_err(CE_WARN, "Unknown SNAP name [%s]", resume_token);
1516                 return (SET_ERROR(ENOSR));
1517         }
1518 
1519         *resume_snap_name = snap_el->name;
1520         *resume_fs_name = fs_el->name;
1521 
1522         return (0);
1523 }
1524 
1525 static int
1526 zfs_send_ds(dmu_krrp_task_t *krrp_task, list_t *ds_to_send)
1527 {
1528         int err = 0;
1529         zfs_ds_node_t *fs_el;
1530         char *resume_fs_name = NULL;
1531         char *resume_snap_name = NULL;
1532 
1533         fs_el = list_head(ds_to_send);
1534 
1535         /* Resume logic */
1536         if (krrp_task->buffer_args.resume_info != NULL) {
1537                 char *toname = NULL;
1538 
1539                 err = nvlist_lookup_string(krrp_task->buffer_args.resume_info,
1540                     "toname", &toname);
1541                 ASSERT(err != ENOENT);
1542                 if (err != 0)
1543                         return (SET_ERROR(err));
1544 
1545                 err = dmu_krrp_send_resume(toname, ds_to_send,
1546                     &resume_fs_name, &resume_snap_name);
1547                 if (err != 0)
1548                         return (err);
1549 
1550                 while (fs_el != NULL) {
1551                         if (strcmp(fs_el->name, resume_fs_name) == 0)
1552                                 break;
1553 
1554                         fs_el = list_next(ds_to_send, fs_el);
1555                 }
1556         }
1557 
1558         while (fs_el != NULL) {
1559                 err = zfs_send_snapshots(krrp_task,
1560                     &fs_el->snapshots, resume_snap_name);
1561                 if (err != 0)
1562                         break;
1563 
1564                 /*
1565                  * resume_snap_name needs to be NULL for the datasets,
1566                  * that are on the "right" side of the resume-token,
1567                  * because need to process all their snapshots
1568                  */
1569                 if (resume_snap_name != NULL)
1570                         resume_snap_name = NULL;
1571 
1572                 fs_el = list_next(ds_to_send, fs_el);
1573         }
1574 
1575         return (err);
1576 }
1577 
1578 static void
1579 zfs_cleanup_send_list(dmu_krrp_task_t *krrp_task, list_t *ds_list)
1580 {
1581         zfs_ds_node_t *fs_el;
1582 
1583         /* Walk over all collected FSs and their SNAPs to cleanup */
1584         while ((fs_el = list_remove_head(ds_list)) != NULL) {
1585                 zfs_snap_avl_node_t *snap_el;
1586                 void *cookie = NULL;
1587 
1588                 while ((snap_el = avl_destroy_nodes(&fs_el->snapshots,
1589                     &cookie)) != NULL) {
1590                         if (snap_el->origin) {
1591                                 dsl_dataset_long_rele(snap_el->ds, fs_el);
1592                                 dsl_dataset_rele(snap_el->ds, fs_el);
1593                         } else {
1594                                 dsl_dataset_long_rele(snap_el->ds, krrp_task);
1595                                 dsl_dataset_rele(snap_el->ds, krrp_task);
1596                         }
1597 
1598                         kmem_free(snap_el, sizeof (zfs_snap_avl_node_t));
1599                 }
1600 
1601                 dsl_dataset_long_rele(fs_el->ds, krrp_task);
1602                 dsl_dataset_rele(fs_el->ds, krrp_task);
1603 
1604                 kmem_free(fs_el, sizeof (zfs_ds_node_t));
1605         }
1606 }
1607 
1608 /*
1609  * zfs_send_thread
1610  * executes ONE iteration, initial or incremental, on the sender side
1611  * 1) validates versus WBC
1612  * 2) collects source datasets and its to-be-sent snapshots
1613  *    2.1) each source dataset is an element of list, that contains
1614  *    - name of dataset
1615  *    - avl-tree of snapshots
1616  *    - its guid
1617  *    - the corresponding long held dsl_datasets_t
1618  *    2.2) each snapshot is an element of avl-tree, that contains
1619  *    - name of snapshot
1620  *    - its guid
1621  *    - creation TXG
1622  *    - the corresponding long held dsl_datasets_t
1623  * 3) initiate send stream
1624  * 4) send in order, one snapshot at a time
1625  */
1626 static void
1627 zfs_send_thread(void *krrp_task_void)
1628 {
1629         dmu_replay_record_t drr = { 0 };
1630         dmu_krrp_task_t *krrp_task = krrp_task_void;
1631         kreplication_zfs_args_t *buffer_args = &krrp_task->buffer_args;
1632         list_t ds_to_send;
1633         int err = 0;
1634         spa_t *spa = NULL;
1635 
1636         boolean_t compound_stream = buffer_args->recursive ||
1637             buffer_args->properties || buffer_args->do_all;
1638 
1639         ASSERT(krrp_task != NULL);
1640 
1641         err = spa_open(krrp_task->buffer_args.from_ds, &spa, krrp_task);
1642         if (err != 0)
1643                 goto early_error;
1644 
1645         if (buffer_args->resume_info != NULL) {
1646                 err = dmu_krrp_validate_resume_info(buffer_args->resume_info);
1647                 if (err != 0)
1648                         goto early_error;
1649         }
1650 
1651         list_create(&ds_to_send, sizeof (zfs_ds_node_t),
1652             offsetof(zfs_ds_node_t, list_node));
1653 
1654         /*
1655          * Source cannot be a writecached child if
1656          * the from_snapshot is an autosnap
1657          */
1658         err = wbc_check_dataset(buffer_args->from_ds);
1659         if (err != 0 && err != ENOTACTIVE) {
1660                 boolean_t from_snap_is_autosnap =
1661                     autosnap_check_name(buffer_args->from_snap);
1662                 if (err != EOPNOTSUPP || from_snap_is_autosnap) {
1663                         if (err == EOPNOTSUPP)
1664                                 err = SET_ERROR(ENOTDIR);
1665 
1666                         goto final;
1667                 }
1668         }
1669 
1670         err = autosnap_lock(spa, RW_READER);
1671         if (err != 0)
1672                 goto final;
1673 
1674         err = zfs_collect_ds(krrp_task, spa, &ds_to_send);
1675 
1676         autosnap_unlock(spa);
1677 
1678         if (err != 0)
1679                 goto final;
1680 
1681         /*
1682          * Recursive stream, stream with properties, or complete-incremental
1683          * stream have special header (DMU_COMPOUNDSTREAM)
1684          */
1685         if (compound_stream) {
1686                 err = zfs_send_compound_stream_header(krrp_task, &ds_to_send);
1687                 if (err != 0)
1688                         goto final;
1689         }
1690 
1691         err = zfs_send_ds(krrp_task, &ds_to_send);
1692 
1693 final:
1694 
1695         zfs_cleanup_send_list(krrp_task, &ds_to_send);
1696 
1697         list_destroy(&ds_to_send);
1698 
1699         if (err == 0 && compound_stream) {
1700                 bzero(&drr, sizeof (drr));
1701                 drr.drr_type = DRR_END;
1702                 err = dmu_krrp_buffer_write(&drr, sizeof (drr), krrp_task);
1703         }
1704 
1705         if (err == 0)
1706                 err = dmu_krrp_put_buffer(krrp_task);
1707 
1708 early_error:
1709         if (err != 0) {
1710                 dmu_set_send_recv_error(krrp_task, err);
1711                 cmn_err(CE_WARN, "Send thread exited with error code %d", err);
1712         }
1713 
1714         if (spa != NULL)
1715                 spa_close(spa, krrp_task);
1716 
1717         (void) dmu_krrp_fini_task(krrp_task);
1718 }
1719 
1720 /* KRRP-RECV routines */
1721 
1722 /*
1723  * Alternate props from the received steam
1724  * Walk over all props from incoming nvlist "props" and
1725  * - replace each that is contained in nvlist "replace"
1726  * - remove each that is contained in nvlist "exclude"
1727  */
1728 static void
1729 zfs_recv_alter_props(nvlist_t *props, nvlist_t *exclude, nvlist_t *replace)
1730 {
1731         nvpair_t *element = NULL;
1732 
1733         if (props != NULL && exclude != NULL) {
1734                 while (
1735                     (element = nvlist_next_nvpair(exclude, element)) != NULL) {
1736                         nvpair_t *pair;
1737                         char *prop = nvpair_name(element);
1738                         char *prop_recv;
1739                         char *prop_inher;
1740 
1741                         prop_recv =
1742                             kmem_asprintf("%s%s", prop, ZPROP_RECVD_SUFFIX);
1743                         prop_inher =
1744                             kmem_asprintf("%s%s", prop, ZPROP_INHERIT_SUFFIX);
1745 
1746                         pair = NULL;
1747                         (void) nvlist_lookup_nvpair(props, prop, &pair);
1748                         if (pair)
1749                                 fnvlist_remove_nvpair(props, pair);
1750 
1751                         pair = NULL;
1752                         (void) nvlist_lookup_nvpair(props, prop_recv, &pair);
1753                         if (pair)
1754                                 fnvlist_remove_nvpair(props, pair);
1755 
1756                         pair = NULL;
1757                         (void) nvlist_lookup_nvpair(props, prop_inher, &pair);
1758                         if (pair)
1759                                 fnvlist_remove_nvpair(props, pair);
1760 
1761                         strfree(prop_recv);
1762                         strfree(prop_inher);
1763                 }
1764         }
1765 
1766         if (props != NULL && replace != NULL) {
1767                 while (
1768                     (element = nvlist_next_nvpair(replace, element)) != NULL) {
1769                         nvpair_t *pair;
1770                         char *prop = nvpair_name(element);
1771                         char *prop_recv;
1772                         char *prop_inher;
1773 
1774                         prop_recv =
1775                             kmem_asprintf("%s%s", prop, ZPROP_RECVD_SUFFIX);
1776                         prop_inher =
1777                             kmem_asprintf("%s%s", prop, ZPROP_INHERIT_SUFFIX);
1778 
1779                         pair = NULL;
1780                         (void) nvlist_lookup_nvpair(props, prop, &pair);
1781                         if (pair)
1782                                 fnvlist_remove_nvpair(props, pair);
1783 
1784                         pair = NULL;
1785                         (void) nvlist_lookup_nvpair(props, prop_recv, &pair);
1786                         if (pair)
1787                                 fnvlist_remove_nvpair(props, pair);
1788 
1789                         pair = NULL;
1790                         (void) nvlist_lookup_nvpair(props, prop_inher, &pair);
1791                         if (pair)
1792                                 fnvlist_remove_nvpair(props, pair);
1793 
1794                         strfree(prop_recv);
1795                         strfree(prop_inher);
1796 
1797                         fnvlist_add_nvpair(props, element);
1798                 }
1799         }
1800 }
1801 
1802 /*
1803  * Callback for dmu_objset_find_dp()
1804  * Checks only snapshots. If a snapshot is matched 'guid',
1805  * that is passed over the cb_arg, then the snapshot is
1806  * our target origin. So that we store its name and return
1807  * EINTR to speed up finalization of dmu_objset_find_dp()
1808  */
1809 /* ARGSUSED */
1810 static int
1811 zfs_lookup_origin_snapshot_cb(dsl_pool_t *dp,
1812     dsl_dataset_t *ds, void *arg)
1813 {
1814         zfs_los_cb_arg_t *cb_arg = arg;
1815 
1816         if (!ds->ds_is_snapshot)
1817                 return (0);
1818 
1819 
1820         if (dsl_dataset_phys(ds)->ds_guid == cb_arg->guid) {
1821                 dsl_dataset_name(ds, cb_arg->origin_name);
1822                 return (EINTR);
1823         }
1824 
1825         return (0);
1826 }
1827 
1828 /*
1829  * FIXME: needs to be optimized
1830  * TODO: no reason to walk over the already walked datasets
1831  */
1832 static int
1833 zfs_lookup_origin_snapshot(spa_t *spa, const char *clone_name,
1834     uint64_t guid, char *result)
1835 {
1836         char start[ZFS_MAX_DATASET_NAME_LEN];
1837         char *cp;
1838         dsl_pool_t *dp = spa_get_dsl(spa);
1839         zfs_los_cb_arg_t cb_arg = {result, guid};
1840 
1841         ASSERT(result != NULL);
1842 
1843         *result = '\0';
1844 
1845         (void) strlcpy(start, clone_name, sizeof (start));
1846         cp = strrchr(start, '/');
1847         if (cp == NULL)
1848                 cp = strchr(start, '\0');
1849 
1850         for (; cp != NULL; cp = strrchr(start, '/')) {
1851                 dsl_dataset_t *ds = NULL;
1852                 int err;
1853                 uint64_t dd_object;
1854 
1855                 *cp = '\0';
1856 
1857                 dsl_pool_config_enter(dp, FTAG);
1858 
1859                 err = dsl_dataset_hold(dp, start, FTAG, &ds);
1860                 if (err != 0) {
1861                         dsl_pool_config_exit(dp, FTAG);
1862                         break;
1863                 }
1864 
1865                 dd_object = ds->ds_dir->dd_object;
1866                 dsl_dataset_rele(ds, FTAG);
1867 
1868                 err = dmu_objset_find_dp(dp, dd_object,
1869                     zfs_lookup_origin_snapshot_cb, &cb_arg,
1870                     DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
1871 
1872                 dsl_pool_config_exit(dp, FTAG);
1873 
1874                 if (*result != '\0' || err != 0)
1875                         break;
1876         }
1877 
1878         return ((*result == '\0') ? -1 : 0);
1879 }
1880 
1881 /* Recv a single snapshot. It is a simplified version of recv */
1882 static int
1883 zfs_recv_one_ds(spa_t *spa, char *ds, dmu_replay_record_t *drr,
1884     nvlist_t *fs_props, nvlist_t *snap_props, dmu_krrp_task_t *krrp_task)
1885 {
1886         int err = 0;
1887         uint64_t errf = 0;
1888         uint64_t ahdl = 0;
1889         uint64_t sz = 0;
1890         char *tosnap;
1891         char origin[ZFS_MAX_DATASET_NAME_LEN];
1892         char *originp = NULL;
1893         struct drr_begin *drrb = &drr->drr_u.drr_begin;
1894 
1895         if (krrp_task->buffer_args.to_snap[0]) {
1896                 tosnap = krrp_task->buffer_args.to_snap;
1897         } else {
1898                 tosnap = strchr(drrb->drr_toname, '@') + 1;
1899         }
1900 
1901         /* To recv cloned DS need to find its origin snapshot */
1902         if ((drrb->drr_flags & DRR_FLAG_CLONE) != 0) {
1903                 err = zfs_lookup_origin_snapshot(spa, ds,
1904                     drrb->drr_fromguid, origin);
1905                 if (err != 0) {
1906                         if (krrp_debug) {
1907                                 cmn_err(CE_WARN, "Origin snapshot "
1908                                     "(guid: %llu) does not exist",
1909                                     (unsigned long long)drrb->drr_fromguid);
1910                         }
1911 
1912                         return (SET_ERROR(ENOLINK));
1913                 }
1914 
1915                 originp = origin;
1916         }
1917 
1918         zfs_recv_alter_props(fs_props,
1919             krrp_task->buffer_args.ignore_list,
1920             krrp_task->buffer_args.replace_list);
1921 
1922         if (krrp_debug) {
1923                 cmn_err(CE_NOTE, "KRRP RECV INC_BASE: "
1924                     "%llu -- DS: %s -- TO_SNAP:%s",
1925                     (unsigned long long)drr->drr_u.drr_begin.drr_fromguid,
1926                     ds, tosnap);
1927         }
1928 
1929         /* hack to avoid adding the symnol to the libzpool export list */
1930 #ifdef _KERNEL
1931         err = dmu_recv_impl(NULL, ds, tosnap, originp, drr, B_TRUE, fs_props,
1932             NULL, &errf, -1, &ahdl, &sz, krrp_task->buffer_args.force,
1933             krrp_task);
1934 
1935         /*
1936          * If receive has been successfully finished
1937          * we can apply received snapshot properties
1938          */
1939         if (err == 0 && snap_props != NULL) {
1940                 char *full_snap_name;
1941 
1942                 full_snap_name = kmem_asprintf("%s@%s", ds, tosnap);
1943                 err = zfs_ioc_set_prop_impl(full_snap_name,
1944                     snap_props, B_TRUE, NULL);
1945                 if (err != 0 && krrp_debug) {
1946                         cmn_err(CE_NOTE, "KRRP RECV: failed to apply "
1947                             "received snapshot properties [%d]", err);
1948                 }
1949 
1950                 strfree(full_snap_name);
1951         }
1952 #endif
1953 
1954         return (err);
1955 }
1956 
1957 /*
1958  * Recv one stream
1959  * 1) validates versus WBC
1960  * 2) prepares receiving paths according to the given
1961  * flags ('leave_tail' or 'strip_head')
1962  * 3) recv stream
1963  * 4) apply snapshot properties if they
1964  * are part of received stream
1965  * 5) To support resume-recv save to ZAP the name
1966  * of complettly received snapshot. After merge with illumos
1967  * the resume-logic need to be replaced by the more intelegent
1968  * logic from illumos
1969  *
1970  * The implemented "recv" supports most of userspace-recv
1971  * functionality.
1972  *
1973  * Dedup-stream is not supported
1974  */
1975 static void
1976 zfs_recv_thread(void *krrp_task_void)
1977 {
1978         dmu_krrp_task_t *krrp_task = krrp_task_void;
1979         dmu_replay_record_t drr = { 0 };
1980         struct drr_begin *drrb = &drr.drr_u.drr_begin;
1981         zio_cksum_t zcksum = { 0 };
1982         int err;
1983         int baselen;
1984         spa_t *spa = NULL;
1985         char latest_snap[ZFS_MAX_DATASET_NAME_LEN] = { 0 };
1986         char to_ds[ZFS_MAX_DATASET_NAME_LEN];
1987         int hdrtype;
1988         uint64_t featureflags;
1989 
1990         ASSERT(krrp_task != NULL);
1991 
1992         err = spa_open(krrp_task->buffer_args.to_ds, &spa, krrp_task);
1993         if (err != NULL)
1994                 goto out;
1995 
1996         /*
1997          * This option requires a functionality (similar to
1998          * create_parents() from libzfs_dataset.c), that is not
1999          * implemented yet
2000          */
2001         if (krrp_task->buffer_args.strip_head) {
2002                 err = SET_ERROR(ENOTSUP);
2003                 goto out;
2004         }
2005 
2006         (void) strlcpy(to_ds, krrp_task->buffer_args.to_ds, sizeof (to_ds));
2007         if (dsl_dataset_creation_txg(to_ds) == UINT64_MAX) {
2008                 char *p;
2009 
2010                 /*
2011                  * If 'leave_tail' or 'strip_head' are define,
2012                  * then 'to_ds' just a prefix and must exist
2013                  */
2014                 if (krrp_task->buffer_args.leave_tail ||
2015                     krrp_task->buffer_args.strip_head) {
2016                         err = SET_ERROR(ENOENT);
2017                         goto out;
2018                 }
2019 
2020                 /*
2021                  * spa found, '/' must be, becase the above
2022                  * check returns UINT64_MAX
2023                  */
2024                 VERIFY((p = strrchr(to_ds, '/')) != NULL);
2025                 *p = '\0';
2026 
2027                 /*
2028                  * It is OK that destination does not exist,
2029                  * but its parent must be here
2030                  */
2031                 if (dsl_dataset_creation_txg(to_ds) == UINT64_MAX) {
2032                         err = SET_ERROR(ENOENT);
2033                         goto out;
2034                 }
2035         }
2036 
2037         /* destination cannot be writecached */
2038         err = wbc_check_dataset(to_ds);
2039         if (err == 0 || err == EOPNOTSUPP) {
2040                 err = SET_ERROR(ENOTDIR);
2041                 goto out;
2042         }
2043 
2044         /*
2045          * ENOTACTIVE means WBC is not active for the DS
2046          * If some another error just return
2047          */
2048         if (err != ENOTACTIVE)
2049                 goto out;
2050 
2051         /* Read leading block */
2052         err = dmu_krrp_buffer_read(&drr, sizeof (drr), krrp_task);
2053         if (err != 0)
2054                 goto out;
2055 
2056         if (drr.drr_type != DRR_BEGIN ||
2057             (drrb->drr_magic != DMU_BACKUP_MAGIC &&
2058             drrb->drr_magic != BSWAP_64(DMU_BACKUP_MAGIC))) {
2059                 err = SET_ERROR(EBADMSG);
2060                 goto out;
2061         }
2062 
2063         baselen = strchr(drrb->drr_toname, '@') - drrb->drr_toname;
2064 
2065         /* Process passed arguments */
2066         if (krrp_task->buffer_args.strip_head) {
2067                 char *pos = strchr(drrb->drr_toname, '/');
2068                 if (pos)
2069                         baselen = pos - drrb->drr_toname;
2070         }
2071 
2072         if (krrp_task->buffer_args.leave_tail) {
2073                 char *pos = strrchr(drrb->drr_toname, '/');
2074                 if (pos)
2075                         baselen = pos - drrb->drr_toname;
2076         }
2077 
2078         featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
2079         hdrtype = DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo);
2080         if (!DMU_STREAM_SUPPORTED(featureflags) ||
2081             (hdrtype != DMU_SUBSTREAM && hdrtype != DMU_COMPOUNDSTREAM)) {
2082                 err = SET_ERROR(EBADMSG);
2083                 goto out;
2084         }
2085 
2086         if (hdrtype == DMU_SUBSTREAM) {
2087                 /* recv a simple single snapshot */
2088                 char full_ds[ZFS_MAX_DATASET_NAME_LEN];
2089 
2090                 (void) strlcpy(full_ds, krrp_task->buffer_args.to_ds,
2091                     sizeof (full_ds));
2092                 if (krrp_task->buffer_args.strip_head ||
2093                     krrp_task->buffer_args.leave_tail) {
2094                         char *pos;
2095                         int len = strlen(full_ds) +
2096                             strlen(drrb->drr_toname + baselen) + 1;
2097                         if (len < sizeof (full_ds)) {
2098                                 (void) strlcat(full_ds, "/", sizeof (full_ds));
2099                                 (void) strlcat(full_ds,
2100                                     drrb->drr_toname + baselen,
2101                                     sizeof (full_ds));
2102                                 pos = strchr(full_ds, '@');
2103                                 *pos = '\0';
2104                         } else {
2105                                 err = SET_ERROR(ENAMETOOLONG);
2106                                 goto out;
2107                         }
2108                 }
2109 
2110                 (void) snprintf(latest_snap, sizeof (latest_snap),
2111                     "%s%s", full_ds, strchr(drrb->drr_toname, '@'));
2112                 err = zfs_recv_one_ds(spa, full_ds, &drr, NULL, NULL, krrp_task);
2113         } else {
2114                 nvlist_t *nvl = NULL, *nvfs = NULL;
2115                 avl_tree_t *fsavl = NULL;
2116 
2117                 if (krrp_task->buffer_args.force_cksum) {
2118                         (void) fletcher_4_incremental_native(&drr,
2119                             sizeof (drr), &zcksum);
2120                 }
2121 
2122                 /* Recv COMPOUND PAYLOAD */
2123                 if (drr.drr_payloadlen > 0) {
2124                         char *buf = kmem_alloc(drr.drr_payloadlen, KM_SLEEP);
2125                         err = dmu_krrp_buffer_read(
2126                             buf, drr.drr_payloadlen, krrp_task);
2127                         if (err != 0) {
2128                                 kmem_free(buf, drr.drr_payloadlen);
2129                                 goto out;
2130                         }
2131 
2132                         if (krrp_task->buffer_args.force_cksum) {
2133                                 (void) fletcher_4_incremental_native(buf,
2134                                     drr.drr_payloadlen, &zcksum);
2135                         }
2136 
2137                         err = nvlist_unpack(buf, drr.drr_payloadlen,
2138                             &nvl, KM_SLEEP);
2139                         kmem_free(buf, drr.drr_payloadlen);
2140 
2141                         if (err != 0) {
2142                                 err = SET_ERROR(EBADMSG);
2143                                 goto out;
2144                         }
2145 
2146                         err = nvlist_lookup_nvlist(nvl, "fss", &nvfs);
2147                         if (err != 0) {
2148                                 err = SET_ERROR(EBADMSG);
2149                                 goto out_nvl;
2150                         }
2151 
2152                         err = fsavl_create(nvfs, &fsavl);
2153                         if (err != 0) {
2154                                 err = SET_ERROR(EBADMSG);
2155                                 goto out_nvl;
2156                         }
2157                 }
2158 
2159                 /* Check end of stream marker */
2160                 err = dmu_krrp_buffer_read(&drr, sizeof (drr), krrp_task);
2161                 if (drr.drr_type != DRR_END &&
2162                     drr.drr_type != BSWAP_32(DRR_END)) {
2163                         err = SET_ERROR(EBADMSG);
2164                         goto out_nvl;
2165                 }
2166 
2167                 if (err == 0 && krrp_task->buffer_args.force_cksum &&
2168                     !ZIO_CHECKSUM_EQUAL(drr.drr_u.drr_end.drr_checksum,
2169                     zcksum)) {
2170                         err = SET_ERROR(ECKSUM);
2171                         goto out_nvl;
2172                 }
2173 
2174                 /* process all substeams from stream */
2175                 for (;;) {
2176                         nvlist_t *fs_props = NULL, *snap_props = NULL;
2177                         boolean_t free_fs_props = B_FALSE;
2178                         char ds[ZFS_MAX_DATASET_NAME_LEN];
2179                         char *at;
2180 
2181                         err = dmu_krrp_buffer_read(&drr,
2182                             sizeof (drr), krrp_task);
2183                         if (err != 0)
2184                                 break;
2185 
2186                         if (drr.drr_type == DRR_END ||
2187                             drr.drr_type == BSWAP_32(DRR_END))
2188                                 break;
2189 
2190                         if (drr.drr_type != DRR_BEGIN ||
2191                             (drrb->drr_magic != DMU_BACKUP_MAGIC &&
2192                             drrb->drr_magic != BSWAP_64(DMU_BACKUP_MAGIC))) {
2193                                 err = SET_ERROR(EBADMSG);
2194                                 break;
2195                         }
2196 
2197                         if (strlen(krrp_task->buffer_args.to_ds) +
2198                             strlen(drrb->drr_toname + baselen) >= sizeof (ds)) {
2199                                 err = SET_ERROR(ENAMETOOLONG);
2200                                 break;
2201                         }
2202 
2203                         (void) snprintf(ds, sizeof (ds), "%s%s",
2204                             krrp_task->buffer_args.to_ds,
2205                             drrb->drr_toname + baselen);
2206                         if (nvfs != NULL) {
2207                                 char *snapname;
2208                                 nvlist_t *snapprops;
2209                                 nvlist_t *fs;
2210 
2211                                 fs = fsavl_find(fsavl, drrb->drr_toguid,
2212                                     &snapname);
2213                                 err = nvlist_lookup_nvlist(fs,
2214                                     "props", &fs_props);
2215                                 if (err != 0) {
2216                                         if (err != ENOENT) {
2217                                                 err = SET_ERROR(err);
2218                                                 break;
2219                                         }
2220 
2221                                         err = 0;
2222                                         fs_props = fnvlist_alloc();
2223                                         free_fs_props = B_TRUE;
2224                                 }
2225 
2226                                 if (nvlist_lookup_nvlist(fs,
2227                                     "snapprops", &snapprops) == 0) {
2228                                         err = nvlist_lookup_nvlist(snapprops,
2229                                             snapname, &snap_props);
2230                                         if (err != 0) {
2231                                                 err = SET_ERROR(err);
2232                                                 break;
2233                                         }
2234                                 }
2235                         }
2236 
2237                         (void) strlcpy(latest_snap, ds, sizeof (latest_snap));
2238                         at = strrchr(ds, '@');
2239                         *at = '\0';
2240                         (void) strlcpy(krrp_task->cookie, drrb->drr_toname,
2241                             sizeof (krrp_task->cookie));
2242                         err = zfs_recv_one_ds(spa, ds, &drr, fs_props,
2243                             snap_props, krrp_task);
2244                         if (free_fs_props)
2245                                 fnvlist_free(fs_props);
2246 
2247                         if (err != 0)
2248                                 break;
2249                 }
2250 
2251 out_nvl:
2252                 if (nvl != NULL) {
2253                         fsavl_destroy(fsavl);
2254                         fnvlist_free(nvl);
2255                 }
2256         }
2257 
2258         /* Put final block */
2259         if (err == 0)
2260                 (void) dmu_krrp_put_buffer(krrp_task);
2261 
2262 out:
2263         dmu_set_send_recv_error(krrp_task_void, err);
2264         if (err != 0) {
2265                 cmn_err(CE_WARN, "Recv thread exited with "
2266                     "error code %d", err);
2267         }
2268 
2269         if (spa != NULL)
2270                 spa_close(spa, krrp_task);
2271 
2272         (void) dmu_krrp_fini_task(krrp_task);
2273 }
2274 
2275 /* Common send/recv entry point */
2276 static void *
2277 dmu_krrp_init_send_recv(void (*func)(void *), kreplication_zfs_args_t *args)
2278 {
2279         dmu_krrp_task_t *krrp_task =
2280             kmem_zalloc(sizeof (dmu_krrp_task_t), KM_SLEEP);
2281         dmu_krrp_stream_t *stream = args->stream_handler;
2282 
2283         krrp_task->stream_handler = stream;
2284         krrp_task->buffer_args = *args;
2285         cv_init(&krrp_task->buffer_state_cv, NULL, CV_DEFAULT, NULL);
2286         cv_init(&krrp_task->buffer_destroy_cv, NULL, CV_DEFAULT, NULL);
2287         mutex_init(&krrp_task->buffer_state_lock, NULL,
2288             MUTEX_DEFAULT, NULL);
2289 
2290         mutex_enter(&stream->mtx);
2291         if (!stream->running) {
2292                 cmn_err(CE_WARN, "Cannot dispatch send/recv task");
2293                 mutex_destroy(&krrp_task->buffer_state_lock);
2294                 cv_destroy(&krrp_task->buffer_state_cv);
2295                 cv_destroy(&krrp_task->buffer_destroy_cv);
2296                 kmem_free(krrp_task, sizeof (dmu_krrp_task_t));
2297 
2298                 mutex_exit(&stream->mtx);
2299                 return (NULL);
2300         }
2301 
2302         stream->task = krrp_task;
2303         stream->task_executor = func;
2304         cv_broadcast(&stream->cv);
2305         mutex_exit(&stream->mtx);
2306 
2307         return (krrp_task);
2308 }
2309 
2310 void *
2311 dmu_krrp_init_send_task(void *args)
2312 {
2313         kreplication_zfs_args_t *zfs_args = args;
2314         ASSERT(zfs_args != NULL);
2315         *zfs_args->to_ds = '\0';
2316         return (dmu_krrp_init_send_recv(zfs_send_thread, zfs_args));
2317 }
2318 
2319 void *
2320 dmu_krrp_init_recv_task(void *args)
2321 {
2322         kreplication_zfs_args_t *zfs_args = args;
2323         ASSERT(zfs_args != NULL);
2324         *zfs_args->from_ds = '\0';
2325         return (dmu_krrp_init_send_recv(zfs_recv_thread, zfs_args));
2326 }
2327 
2328 static void
2329 dmu_set_send_recv_error(void *krrp_task_void, int err)
2330 {
2331         dmu_krrp_task_t *krrp_task = krrp_task_void;
2332 
2333         ASSERT(krrp_task != NULL);
2334 
2335         mutex_enter(&krrp_task->buffer_state_lock);
2336         krrp_task->buffer_error = err;
2337         mutex_exit(&krrp_task->buffer_state_lock);
2338 }
2339 
2340 /*
2341  * Finalize send/recv task
2342  * Finalization is two step process, both sides should finalize stream in order
2343  * to proceed. Finalization is an execution barier - a thread which ends first
2344  * will wait for another
2345  */
2346 int
2347 dmu_krrp_fini_task(void *krrp_task_void)
2348 {
2349         dmu_krrp_task_t *krrp_task = krrp_task_void;
2350         int error;
2351 
2352         ASSERT(krrp_task != NULL);
2353 
2354         mutex_enter(&krrp_task->buffer_state_lock);
2355         if (krrp_task->buffer_state == SBS_DESTROYED) {
2356                 cv_signal(&krrp_task->buffer_destroy_cv);
2357                 error = krrp_task->buffer_error;
2358                 mutex_exit(&krrp_task->buffer_state_lock);
2359         } else {
2360                 krrp_task->buffer_state = SBS_DESTROYED;
2361                 cv_signal(&krrp_task->buffer_state_cv);
2362                 cv_wait(&krrp_task->buffer_destroy_cv,
2363                     &krrp_task->buffer_state_lock);
2364                 error = krrp_task->buffer_error;
2365                 mutex_exit(&krrp_task->buffer_state_lock);
2366                 mutex_destroy(&krrp_task->buffer_state_lock);
2367                 cv_destroy(&krrp_task->buffer_state_cv);
2368                 cv_destroy(&krrp_task->buffer_destroy_cv);
2369                 if (krrp_task->buffer_args.resume_info != NULL)
2370                         fnvlist_free(krrp_task->buffer_args.resume_info);
2371 
2372                 kmem_free(krrp_task, sizeof (dmu_krrp_task_t));
2373         }
2374 
2375         return (error);
2376 }
2377 
2378 /* Wait for a lent buffer */
2379 static int
2380 dmu_krrp_get_buffer(void *krrp_task_void)
2381 {
2382         dmu_krrp_task_t *krrp_task = krrp_task_void;
2383 
2384         ASSERT(krrp_task != NULL);
2385 
2386         mutex_enter(&krrp_task->buffer_state_lock);
2387         while (krrp_task->buffer_state != SBS_AVAIL) {
2388                 if (krrp_task->buffer_state == SBS_DESTROYED) {
2389                         mutex_exit(&krrp_task->buffer_state_lock);
2390                         return (SET_ERROR(ENOMEM));
2391                 }
2392                 DTRACE_PROBE(wait_for_buffer);
2393                 (void) cv_timedwait(&krrp_task->buffer_state_cv,
2394                     &krrp_task->buffer_state_lock,
2395                     ddi_get_lbolt() + zfs_send_timeout * hz);
2396                 DTRACE_PROBE(wait_for_buffer_end);
2397         }
2398         krrp_task->buffer_state = SBS_USED;
2399         mutex_exit(&krrp_task->buffer_state_lock);
2400 
2401         return (0);
2402 }
2403 
2404 /* Return buffer to transport */
2405 static int
2406 dmu_krrp_put_buffer(void *krrp_task_void)
2407 {
2408         dmu_krrp_task_t *krrp_task = krrp_task_void;
2409 
2410         ASSERT(krrp_task != NULL);
2411 
2412         mutex_enter(&krrp_task->buffer_state_lock);
2413         if (krrp_task->buffer_state != SBS_USED) {
2414                 mutex_exit(&krrp_task->buffer_state_lock);
2415                 return (0);
2416         }
2417         krrp_task->buffer_state = SBS_DONE;
2418         krrp_task->is_full = (krrp_task->buffer == NULL);
2419         krrp_task->buffer = NULL;
2420         cv_signal(&krrp_task->buffer_state_cv);
2421         mutex_exit(&krrp_task->buffer_state_lock);
2422 
2423         return (0);
2424 }
2425 
2426 /* Common entry point for lending buffer */
2427 static int
2428 dmu_krrp_lend_buffer(void *krrp_task_void,
2429     kreplication_buffer_t *buffer, boolean_t recv)
2430 {
2431         dmu_krrp_task_t *krrp_task = krrp_task_void;
2432         boolean_t full;
2433 
2434         ASSERT(krrp_task != NULL);
2435         ASSERT(buffer != NULL);
2436         ASSERT(krrp_task->buffer == NULL);
2437 
2438         mutex_enter(&krrp_task->buffer_state_lock);
2439         if (krrp_task->buffer_state == SBS_DESTROYED) {
2440                 int error = krrp_task->buffer_error;
2441                 mutex_exit(&krrp_task->buffer_state_lock);
2442                 if (error)
2443                         return (error);
2444                 if (recv)
2445                         return (E2BIG);
2446                 return (ENODATA);
2447         }
2448         krrp_task->buffer = buffer;
2449         krrp_task->buffer_state = SBS_AVAIL;
2450         krrp_task->buffer_bytes_read = 0;
2451         krrp_task->is_read = B_FALSE;
2452         krrp_task->is_full = B_FALSE;
2453         cv_signal(&krrp_task->buffer_state_cv);
2454         while (krrp_task->buffer_state != SBS_DONE) {
2455                 if (krrp_task->buffer_state == SBS_DESTROYED) {
2456                         int error = krrp_task->buffer_error;
2457                         full = krrp_task->is_full;
2458                         mutex_exit(&krrp_task->buffer_state_lock);
2459                         if (error)
2460                                 return (error);
2461                         if (recv && !krrp_task->is_read)
2462                                 return (E2BIG);
2463                         return ((recv || full) ? 0 : ENODATA);
2464                 }
2465                 DTRACE_PROBE(wait_for_data);
2466                 (void) cv_timedwait(&krrp_task->buffer_state_cv,
2467                     &krrp_task->buffer_state_lock,
2468                     ddi_get_lbolt() + zfs_send_timeout * hz);
2469                 DTRACE_PROBE(wait_for_data_end);
2470         }
2471         krrp_task->buffer = NULL;
2472         full = krrp_task->is_full;
2473         mutex_exit(&krrp_task->buffer_state_lock);
2474 
2475         return ((recv || full) ? 0 : ENODATA);
2476 }
2477 
2478 int
2479 dmu_krrp_lend_send_buffer(void *krrp_task_void, kreplication_buffer_t *buffer)
2480 {
2481         ASSERT(buffer != NULL);
2482         kreplication_buffer_t *iter;
2483         for (iter = buffer; iter != NULL; iter = iter->next)
2484                 iter->data_size = 0;
2485         return (dmu_krrp_lend_buffer(krrp_task_void, buffer, B_FALSE));
2486 }
2487 
2488 int
2489 dmu_krrp_lend_recv_buffer(void *krrp_task_void, kreplication_buffer_t *buffer)
2490 {
2491         ASSERT(buffer != NULL);
2492         return (dmu_krrp_lend_buffer(krrp_task_void, buffer, B_TRUE));
2493 }
2494 
2495 /*
2496  * FIXME: Temporary disabled because this logic
2497  * needs to be adjusted according to ARC-Compression changes
2498  */
2499 /* ARGSUSED */
2500 int
2501 dmu_krrp_direct_arc_read(spa_t *spa, dmu_krrp_task_t *krrp_task,
2502     zio_cksum_t *zc, const blkptr_t *bp)
2503 {
2504         return (ENODATA);
2505 
2506 #if 0
2507         int error;
2508         dmu_krrp_arc_bypass_t bypass = {
2509             .krrp_task = krrp_task,
2510             .zc = zc,
2511             .cb = dmu_krrp_buffer_write,
2512         };
2513 
2514         error = arc_io_bypass(spa, bp, dmu_krrp_arc_bypass, &bypass);
2515         if (error == 0) {
2516                 DTRACE_PROBE(krrp_send_arc_bypass);
2517         } else if (error == ENODATA) {
2518                 DTRACE_PROBE(krrp_send_disk_read);
2519                 return (error);
2520         }
2521 
2522         if (error != 0) {
2523                 DTRACE_PROBE1(orig_error, int, error);
2524                 error = SET_ERROR(EINTR);
2525         }
2526 
2527         return (error);
2528 #endif
2529 }
2530 
2531 static int
2532 dmu_krrp_validate_resume_info(nvlist_t *resume_info)
2533 {
2534         char *toname = NULL;
2535         uint64_t resumeobj = 0, resumeoff = 0, bytes = 0, toguid = 0;
2536 
2537         if (nvlist_lookup_string(resume_info, "toname", &toname) != 0 ||
2538             nvlist_lookup_uint64(resume_info, "object", &resumeobj) != 0 ||
2539             nvlist_lookup_uint64(resume_info, "offset", &resumeoff) != 0 ||
2540             nvlist_lookup_uint64(resume_info, "bytes", &bytes) != 0 ||
2541             nvlist_lookup_uint64(resume_info, "toguid", &toguid) != 0)
2542                 return (SET_ERROR(EINVAL));
2543 
2544         return (0);
2545 }
2546 
2547 int
2548 dmu_krrp_decode_resume_token(const char *resume_token, nvlist_t **resume_info)
2549 {
2550         nvlist_t *nvl = NULL;
2551         int err;
2552 
2553         err = zfs_send_resume_token_to_nvlist_impl(resume_token, &nvl);
2554         if (err != 0)
2555                 return (err);
2556 
2557         err = dmu_krrp_validate_resume_info(nvl);
2558         if (err != 0)
2559                 return (err);
2560 
2561         ASSERT(resume_info != NULL && *resume_info == NULL);
2562         *resume_info = nvl;
2563         return (0);
2564 }