1 /*
2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
5 * 1.0 of the CDDL.
6 *
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
10 */
11
12 /*
13 * Copyright 2017 Nexenta Systems, Inc. All rights reserved.
14 */
15 #include <sys/autosnap.h>
16 #include <sys/dmu_objset.h>
17 #include <sys/dmu_send.h>
18 #include <sys/dmu_tx.h>
19 #include <sys/dsl_dir.h>
20 #include <sys/dsl_pool.h>
21 #include <sys/dsl_prop.h>
22 #include <sys/spa.h>
23 #include <zfs_fletcher.h>
24 #include <sys/zap.h>
25
26 #include <zfs_sendrecv.h>
27
28 #define STRING_PROP_EL_SIZE 1
29 #define UINT64_PROP_EL_SIZE 8
30
31 #define RECV_BUFFER_SIZE (1 << 20)
32
33 extern int wbc_check_dataset(const char *name);
34
35 int zfs_send_timeout = 5;
36 uint64_t krrp_debug = 0;
37
38 static void dmu_krrp_work_thread(void *arg);
39 static void dmu_set_send_recv_error(void *krrp_task_void, int err);
40 static int dmu_krrp_get_buffer(void *krrp_task_void);
41 static int dmu_krrp_put_buffer(void *krrp_task_void);
42 static int dmu_krrp_validate_resume_info(nvlist_t *resume_info);
43
44 /* Used by zfs_lookup_origin_snapshot() */
45 typedef struct {
46 char *origin_name;
47 uint64_t guid;
48 } zfs_los_cb_arg_t;
49
50 /* An element of snapshots AVL-tree of zfs_ds_node_t */
51 typedef struct {
52 char name[ZFS_MAX_DATASET_NAME_LEN];
53 uint64_t txg;
54 uint64_t guid;
55 dsl_dataset_t *ds;
56 avl_node_t avl_node;
57 boolean_t origin;
58 } zfs_snap_avl_node_t;
59
60 typedef struct zfs_ds_node zfs_ds_node_t;
61 struct zfs_ds_node {
62 char name[ZFS_MAX_DATASET_NAME_LEN];
63 char origin_name[ZFS_MAX_DATASET_NAME_LEN];
64 uint64_t origin_guid;
65 uint64_t creation_txg;
66 boolean_t is_root;
67 boolean_t is_clone;
68
69 zfs_ds_node_t *origin;
70 dsl_dataset_t *ds;
71
72 list_node_t list_node;
73 avl_node_t avl_node;
74
75 avl_tree_t snapshots;
76 };
77
78 typedef struct {
79 list_t *datasets;
80 avl_tree_t clones_avl;
81 void *owner;
82 uint64_t root_ds_object;
83 } zfs_collect_cb_arg_t;
84
85
86 /*
87 * Stream is a sequence of snapshots considered to be related
88 * init/fini initialize and deinitialize structures which are
89 * persistent for a stream.
90 * Here we initialize a work-thread and all required locks.
91 * The work-thread is used to execute stream-tasks, that are
92 * used to process one ZFS-stream.
93 */
94 void *
95 dmu_krrp_stream_init()
96 {
97 dmu_krrp_stream_t *stream =
98 kmem_zalloc(sizeof (dmu_krrp_stream_t), KM_SLEEP);
99
100 mutex_init(&stream->mtx, NULL, MUTEX_DEFAULT, NULL);
101 cv_init(&stream->cv, NULL, CV_DEFAULT, NULL);
102
103 mutex_enter(&stream->mtx);
104 stream->work_thread = thread_create(NULL, 32 << 10,
105 dmu_krrp_work_thread, stream, 0, &p0, TS_RUN, minclsyspri);
106
107 while (!stream->running)
108 cv_wait(&stream->cv, &stream->mtx);
109
110 mutex_exit(&stream->mtx);
111
112 return (stream);
113 }
114
115 void
116 dmu_krrp_stream_fini(void *handler)
117 {
118 dmu_krrp_stream_t *stream = handler;
119
120 if (stream == NULL)
121 return;
122
123 mutex_enter(&stream->mtx);
124 stream->running = B_FALSE;
125 cv_broadcast(&stream->cv);
126 while (stream->work_thread != NULL)
127 cv_wait(&stream->cv, &stream->mtx);
128
129 mutex_exit(&stream->mtx);
130
131 mutex_destroy(&stream->mtx);
132 cv_destroy(&stream->cv);
133 kmem_free(stream, sizeof (dmu_krrp_stream_t));
134 }
135
136 /*
137 * Work-thread executes stream-tasks.
138 */
139 static void
140 dmu_krrp_work_thread(void *arg)
141 {
142 dmu_krrp_stream_t *stream = arg;
143 dmu_krrp_task_t *task;
144 void (*task_executor)(void *);
145
146 mutex_enter(&stream->mtx);
147 stream->running = B_TRUE;
148 cv_broadcast(&stream->cv);
149
150 while (stream->running) {
151 if (stream->task == NULL) {
152 cv_wait(&stream->cv, &stream->mtx);
153 continue;
154 }
155
156 ASSERT(stream->task_executor != NULL);
157
158 task = stream->task;
159 task_executor = stream->task_executor;
160 stream->task = NULL;
161 stream->task_executor = NULL;
162
163 mutex_exit(&stream->mtx);
164
165 task_executor(task);
166
167 mutex_enter(&stream->mtx);
168 }
169
170 stream->work_thread = NULL;
171 cv_broadcast(&stream->cv);
172 mutex_exit(&stream->mtx);
173 thread_exit();
174 }
175
176 /*
177 * Arc bypass is supposed to reduce amount of copying inside memory
178 * Here os the main callback for krrp usage of arc bypass
179 */
180 int
181 dmu_krrp_arc_bypass(void *buf, int len, void *arg)
182 {
183 dmu_krrp_arc_bypass_t *bypass = arg;
184 dmu_krrp_task_t *task = bypass->krrp_task;
185 kreplication_zfs_args_t *buffer_args = &task->buffer_args;
186
187 if (buffer_args->mem_check_cb != NULL) {
188 /*
189 * ARC holds the target buffer while
190 * we read it, so to exclude deadlock need
191 * to be sure that we have enough memory to
192 * completely read the buffer without waiting
193 * for free of required memory space
194 */
195 boolean_t zero_copy_ready =
196 buffer_args->mem_check_cb(len,
197 buffer_args->mem_check_cb_arg);
198 if (!zero_copy_ready)
199 return (ENODATA);
200 }
201
202 if (buffer_args->force_cksum)
203 (void) fletcher_4_incremental_native(buf, len, bypass->zc);
204 DTRACE_PROBE(arc_bypass_send);
205 return (bypass->cb(buf, len, task));
206 }
207
208 /*
209 * KRRP-SR-INV
210 * Functions used in send/recv functions to pass data to the KRRP transport
211 */
212 int
213 dmu_krrp_buffer_write(void *buf, int len,
214 dmu_krrp_task_t *krrp_task)
215 {
216 int count = 0;
217 int err = 0;
218
219 while ((!err) && (count < len)) {
220 if (krrp_task->buffer_state == SBS_USED) {
221 kreplication_buffer_t *buffer = krrp_task->buffer;
222 size_t buf_rem = buffer->buffer_size -
223 buffer->data_size;
224 size_t rem = len - count;
225 size_t size = MIN(rem, buf_rem);
226
227 (void) memcpy((char *)buffer->data + buffer->data_size,
228 (char *)buf + count, size);
229 count += size;
230 buffer->data_size += size;
231
232 if (buffer->data_size == buffer->buffer_size) {
233 krrp_task->buffer = buffer->next;
234 if (!krrp_task->buffer) {
235 err = dmu_krrp_put_buffer(
236 krrp_task);
237 }
238 }
239 } else {
240 err = dmu_krrp_get_buffer(krrp_task);
241 }
242 }
243
244 return (err);
245 }
246
247 int
248 dmu_krrp_buffer_read(void *buf, int len,
249 dmu_krrp_task_t *krrp_task)
250 {
251 int done = 0;
252 int err = 0;
253
254 while (!err && (done < len)) {
255 if (krrp_task->buffer_state == SBS_USED) {
256 kreplication_buffer_t *buffer = krrp_task->buffer;
257 size_t rem = len - done;
258 size_t buf_rem = buffer->data_size -
259 krrp_task->buffer_bytes_read;
260 size_t size = MIN(rem, buf_rem);
261
262 (void) memcpy((char *)buf + done,
263 (char *)buffer->data +
264 krrp_task->buffer_bytes_read, size);
265 krrp_task->buffer_bytes_read += size;
266 done += size;
267 krrp_task->is_read = B_TRUE;
268
269 if (krrp_task->buffer_bytes_read ==
270 buffer->data_size) {
271 krrp_task->buffer = buffer->next;
272 krrp_task->buffer_bytes_read = 0;
273 if (!krrp_task->buffer) {
274 err = dmu_krrp_put_buffer(
275 krrp_task);
276 }
277 }
278 } else {
279 err = dmu_krrp_get_buffer(krrp_task);
280 }
281 }
282
283 return (err);
284 }
285
286 /*
287 * KRRP-SEND routines
288 */
289
290 /*
291 * The common function that is called from
292 * zfs_collect_snap_props and zfs_collect_fs_props
293 * iterates over the given zap-object and adds zfs props
294 * to the resulting nvlist
295 */
296 static int
297 zfs_collect_props(objset_t *mos, uint64_t zapobj, nvlist_t *props)
298 {
299 int err = 0;
300 zap_cursor_t zc;
301 zap_attribute_t za;
302
303 ASSERT(nvlist_empty(props));
304
305 zap_cursor_init(&zc, mos, zapobj);
306
307 /* walk over properties' zap */
308 while (zap_cursor_retrieve(&zc, &za) == 0) {
309 uint64_t cnt, el;
310 zfs_prop_t prop;
311 const char *suffix, *prop_name;
312 char buf[ZAP_MAXNAMELEN];
313
314 suffix = strchr(za.za_name, '$');
315 prop_name = za.za_name;
316 if (suffix != NULL) {
317 char *valstr;
318
319 /*
320 * The following logic is similar to
321 * dsl_prop_get_all_impl()
322 * Skip props that have:
323 * - suffix ZPROP_INHERIT_SUFFIX
324 * - all unknown suffixes to be backward compatible
325 */
326 if (strcmp(suffix, ZPROP_INHERIT_SUFFIX) == 0 ||
327 strcmp(suffix, ZPROP_RECVD_SUFFIX) != 0) {
328 zap_cursor_advance(&zc);
329 continue;
330 }
331
332 (void) strncpy(buf, za.za_name, (suffix - za.za_name));
333 buf[suffix - za.za_name] = '\0';
334 prop_name = buf;
335
336 /* Skip if locally overridden. */
337 err = zap_contains(mos, zapobj, prop_name);
338 if (err == 0) {
339 zap_cursor_advance(&zc);
340 continue;
341 }
342
343 if (err != ENOENT)
344 break;
345
346 /* Skip if explicitly inherited. */
347 valstr = kmem_asprintf("%s%s", prop_name,
348 ZPROP_INHERIT_SUFFIX);
349 err = zap_contains(mos, zapobj, valstr);
350 strfree(valstr);
351 if (err == 0) {
352 zap_cursor_advance(&zc);
353 continue;
354 }
355
356 if (err != ENOENT)
357 break;
358
359 /*
360 * zero out to make sure ENOENT is not returned
361 * if the loop breaks in this iteration
362 */
363 err = 0;
364 }
365
366 prop = zfs_name_to_prop(prop_name);
367
368 /*
369 * This property make sense only to this dataset,
370 * so no reasons to include it into stream
371 */
372 if (prop == ZFS_PROP_WBC_MODE) {
373 zap_cursor_advance(&zc);
374 continue;
375 }
376
377 (void) zap_length(mos, zapobj, za.za_name, &el, &cnt);
378
379 if (el == STRING_PROP_EL_SIZE) {
380 char val[ZAP_MAXVALUELEN];
381
382 err = zap_lookup(mos, zapobj, za.za_name,
383 STRING_PROP_EL_SIZE, cnt, val);
384 if (err != 0) {
385 cmn_err(CE_WARN,
386 "Error while looking up a prop"
387 "zap : %d", err);
388 break;
389 }
390
391 fnvlist_add_string(props, prop_name, val);
392 } else if (el == UINT64_PROP_EL_SIZE) {
393 fnvlist_add_uint64(props, prop_name,
394 za.za_first_integer);
395 }
396
397 zap_cursor_advance(&zc);
398 }
399
400 zap_cursor_fini(&zc);
401
402 return (err);
403 }
404
405 static int
406 zfs_collect_snap_props(dsl_dataset_t *snap_ds, nvlist_t **nvsnaps_props)
407 {
408 int err;
409 nvlist_t *props;
410 uint64_t zapobj;
411 objset_t *mos;
412
413 ASSERT(nvsnaps_props != NULL && *nvsnaps_props == NULL);
414 ASSERT(dsl_dataset_long_held(snap_ds));
415 ASSERT(snap_ds->ds_is_snapshot);
416
417 props = fnvlist_alloc();
418 mos = snap_ds->ds_dir->dd_pool->dp_meta_objset;
419 zapobj = dsl_dataset_phys(snap_ds)->ds_props_obj;
420 err = zfs_collect_props(mos, zapobj, props);
421 if (err == 0)
422 *nvsnaps_props = props;
423 else
424 fnvlist_free(props);
425
426 return (err);
427 }
428
429 static int
430 zfs_collect_fs_props(dsl_dataset_t *fs_ds, nvlist_t *nvfs)
431 {
432 int err = 0;
433 uint64_t zapobj;
434 objset_t *mos;
435 nvlist_t *nvfsprops;
436
437 ASSERT(dsl_dataset_long_held(fs_ds));
438
439 nvfsprops = fnvlist_alloc();
440 mos = fs_ds->ds_dir->dd_pool->dp_meta_objset;
441 zapobj = dsl_dir_phys(fs_ds->ds_dir)->dd_props_zapobj;
442 err = zfs_collect_props(mos, zapobj, nvfsprops);
443 if (err == 0)
444 fnvlist_add_nvlist(nvfs, "props", nvfsprops);
445
446 fnvlist_free(nvfsprops);
447
448 return (err);
449 }
450
451 /* AVL compare function for snapshots */
452 static int
453 zfs_snapshot_txg_compare(const void *arg1, const void *arg2)
454 {
455 const zfs_snap_avl_node_t *s1 = arg1;
456 const zfs_snap_avl_node_t *s2 = arg2;
457
458 if (s1->txg > s2->txg) {
459 return (+1);
460 } else if (s1->txg < s2->txg) {
461 return (-1);
462 } else {
463 return (0);
464 }
465 }
466
467 static zfs_snap_avl_node_t *
468 zfs_construct_snap_node(dsl_dataset_t *snap_ds, char *full_snap_name)
469 {
470 zfs_snap_avl_node_t *snap_el;
471
472 snap_el = kmem_zalloc(sizeof (zfs_snap_avl_node_t), KM_SLEEP);
473
474 (void) strlcpy(snap_el->name, full_snap_name,
475 sizeof (snap_el->name));
476 snap_el->guid = dsl_dataset_phys(snap_ds)->ds_guid;
477 snap_el->txg = dsl_dataset_phys(snap_ds)->ds_creation_txg;
478 snap_el->ds = snap_ds;
479
480 return (snap_el);
481 }
482
483 /*
484 * This function is used to make decision about include
485 * the given snap_ds into stream or not.
486 *
487 * Returns B_TRUE if the given snapshot has the given
488 * prop_name and its value is not equal to the given prop_val,
489 * otherwise returns B_FALSE
490 */
491 static boolean_t
492 zfs_skip_check(dsl_dataset_t *snap_ds,
493 const char *prop_name, const char *prop_val)
494 {
495 uint64_t zapobj;
496 objset_t *mos;
497 char val[ZAP_MAXVALUELEN];
498 uint64_t cnt = 0, el = 0;
499
500 if (prop_name == NULL || prop_val == NULL)
501 return (B_FALSE);
502
503 mos = snap_ds->ds_dir->dd_pool->dp_meta_objset;
504 zapobj = dsl_dataset_phys(snap_ds)->ds_props_obj;
505
506 if (zap_length(mos, zapobj, prop_name, &el, &cnt) == 0) {
507 if (zap_lookup(mos, zapobj, prop_name,
508 STRING_PROP_EL_SIZE, cnt, val) != 0)
509 return (B_FALSE);
510
511 if (strcmp(prop_val, val) != 0)
512 return (B_TRUE);
513 }
514
515 return (B_FALSE);
516 }
517
518 /*
519 * Collects all snapshots (txg_first < Creation TXG < txg_last)
520 * for the given FS and adds them to the resulting AVL-tree
521 */
522 static int
523 zfs_collect_interim_snaps(dmu_krrp_task_t *krrp_task,
524 zfs_ds_node_t *fs_el, uint64_t txg_first,
525 uint64_t txg_last)
526 {
527 int err;
528 uint64_t ds_creation_txg;
529 avl_tree_t *snapshots = &fs_el->snapshots;
530 zfs_snap_avl_node_t *snap_el;
531 char full_snap_name[ZFS_MAX_DATASET_NAME_LEN];
532 char *snap_name;
533 objset_t *os = NULL;
534 dsl_dataset_t *snap_ds = NULL;
535 dsl_dataset_t *ds = fs_el->ds;
536 dsl_pool_t *dp = ds->ds_dir->dd_pool;
537 uint64_t offp = 0, obj = 0;
538
539 dsl_pool_config_enter(dp, FTAG);
540
541 err = dmu_objset_from_ds(ds, &os);
542 if (err != 0) {
543 dsl_pool_config_exit(dp, FTAG);
544 return (err);
545 }
546
547 (void) snprintf(full_snap_name, sizeof (full_snap_name),
548 "%s@", fs_el->name);
549 snap_name = strchr(full_snap_name, '@') + 1;
550
551 /* walk over snapshots and add them to the tree to sort */
552 for (;;) {
553 snap_ds = NULL;
554 snap_name[0] = '\0';
555 err = dmu_snapshot_list_next(os,
556 sizeof (full_snap_name) - strlen(full_snap_name),
557 full_snap_name + strlen(full_snap_name),
558 &obj, &offp, NULL);
559 if (err != 0) {
560 if (err == ENOENT) {
561 /*
562 * ENOENT in this case means no more
563 * snapshots, that is not an error
564 */
565 err = 0;
566 }
567
568 break;
569 }
570
571 /* We do not want intermediate autosnapshots */
572 if (autosnap_check_name(snap_name))
573 continue;
574
575 err = dsl_dataset_hold(dp, full_snap_name, krrp_task, &snap_ds);
576 if (err != 0) {
577 ASSERT(err != ENOENT);
578 break;
579 }
580
581 ds_creation_txg =
582 dsl_dataset_phys(snap_ds)->ds_creation_txg;
583
584 /*
585 * We want only snapshots that are inside of
586 * our boundaries
587 * boundary snap_el already added to avl
588 */
589 if (ds_creation_txg <= txg_first ||
590 ds_creation_txg >= txg_last) {
591 dsl_dataset_rele(snap_ds, krrp_task);
592 continue;
593 }
594
595 if (zfs_skip_check(snap_ds,
596 krrp_task->buffer_args.skip_snaps_prop_name,
597 krrp_task->buffer_args.skip_snaps_prop_val)) {
598 dsl_dataset_rele(snap_ds, krrp_task);
599 continue;
600 }
601
602 snap_el = zfs_construct_snap_node(snap_ds,
603 full_snap_name);
604 dsl_dataset_long_hold(snap_ds, krrp_task);
605 avl_add(snapshots, snap_el);
606 }
607
608 dsl_pool_config_exit(dp, FTAG);
609
610 return (err);
611 }
612
613 /*
614 * Collect snapshots of a given dataset in a given range, where
615 * 'to_snap' - the right boundary
616 * 'from_snap' - the left boundary
617 * Collects interim snapshots if incl_interim_snaps == B_TRUE
618 */
619 static int
620 zfs_collect_snaps(dmu_krrp_task_t *krrp_task,
621 zfs_ds_node_t *fs_el, char *from_snap,
622 char *to_snap, boolean_t incl_interim_snaps)
623 {
624 int err = 0;
625 dsl_dataset_t *snap_ds = NULL;
626 dsl_dataset_t *fs_ds = fs_el->ds;
627 dsl_pool_t *dp = fs_ds->ds_dir->dd_pool;
628 uint64_t txg_first = 0, txg_last = UINT64_MAX;
629 char full_snap_name[ZFS_MAX_DATASET_NAME_LEN];
630 char *snap_name;
631 boolean_t no_from_snap = B_TRUE;
632
633 zfs_snap_avl_node_t *from_snap_el = NULL;
634 zfs_snap_avl_node_t *to_snap_el = NULL;
635
636 /* the right boundary snapshot should be exist */
637 if (to_snap == NULL || to_snap[0] == '\0')
638 return (SET_ERROR(EINVAL));
639
640 dsl_pool_config_enter(dp, FTAG);
641
642 /*
643 * Snapshots must be sorted in the ascending order by birth_txg
644 */
645 avl_create(&fs_el->snapshots, zfs_snapshot_txg_compare,
646 sizeof (zfs_snap_avl_node_t),
647 offsetof(zfs_snap_avl_node_t, avl_node));
648
649 (void) snprintf(full_snap_name, sizeof (full_snap_name),
650 "%s@%s", fs_el->name, to_snap);
651 snap_name = strchr(full_snap_name, '@') + 1;
652
653 err = dsl_dataset_hold(dp, full_snap_name, krrp_task, &snap_ds);
654 if (err != 0) {
655 dsl_pool_config_exit(dp, FTAG);
656
657 /*
658 * This FS was created after 'to_snap',
659 * so skip it at this time
660 */
661 if (err == ENOENT)
662 err = 0;
663
664 return (err);
665 }
666
667 to_snap_el = zfs_construct_snap_node(snap_ds,
668 full_snap_name);
669 txg_last = dsl_dataset_phys(snap_ds)->ds_creation_txg;
670 dsl_dataset_long_hold(to_snap_el->ds, krrp_task);
671 avl_add(&fs_el->snapshots, to_snap_el);
672
673 /* check left boundary */
674 if (from_snap != NULL && from_snap[0] != '\0') {
675 snap_ds = NULL;
676 snap_name[0] = '\0';
677 (void) strcat(full_snap_name, from_snap);
678 err = dsl_dataset_hold(dp, full_snap_name,
679 krrp_task, &snap_ds);
680
681 if (err == 0) {
682 txg_first =
683 dsl_dataset_phys(snap_ds)->ds_creation_txg;
684 from_snap_el =
685 zfs_construct_snap_node(snap_ds, full_snap_name);
686 dsl_dataset_long_hold(from_snap_el->ds, krrp_task);
687 avl_add(&fs_el->snapshots, from_snap_el);
688 no_from_snap = B_FALSE;
689 } else {
690 /*
691 * it is possible that from_snap does not exist
692 * for a child FS, because the FS was created
693 * after from_snap
694 */
695 if (err != ENOENT || fs_el->is_root) {
696 dsl_pool_config_exit(dp, FTAG);
697 return (err);
698 }
699
700 err = 0;
701 }
702 }
703
704 /*
705 * For cloned DS that doesn't have from_snap
706 * need to igin_snap as from_snap
707 * The owner of the held origin will be fs_el
708 */
709 if (no_from_snap && fs_el->origin_name[0] != '\0') {
710 snap_ds = NULL;
711 err = dsl_dataset_hold(dp, fs_el->origin_name,
712 fs_el, &snap_ds);
713 if (err != 0) {
714 dsl_pool_config_exit(dp, FTAG);
715 return (err);
716 }
717
718 /*
719 * Need to be sure that origin's name doesn't
720 * match the skip_mask. If origin was not/will
721 * not be replicated to the destination, then
722 * its clone will be replicated as a regular DS.
723 */
724 if (zfs_skip_check(snap_ds,
725 krrp_task->buffer_args.skip_snaps_prop_name,
726 krrp_task->buffer_args.skip_snaps_prop_val)) {
727 dsl_dataset_rele(snap_ds, fs_el);
728 } else {
729 dsl_dataset_long_hold(snap_ds, fs_el);
730 from_snap_el = zfs_construct_snap_node(snap_ds,
731 fs_el->origin_name);
732 from_snap_el->origin = B_TRUE;
733 avl_add(&fs_el->snapshots, from_snap_el);
734 }
735 }
736
737 dsl_pool_config_exit(dp, FTAG);
738
739 /*
740 * 'FROM' snapshot cannot be created before 'TO' snapshot
741 * and
742 * 'FROM' and 'TO' snapshots cannot be the same snapshot
743 */
744 if (txg_last <= txg_first)
745 return (SET_ERROR(EXDEV));
746
747 /*
748 * If 'incl_interim_snaps' flag isn't presented,
749 * only 'from' and 'to' snapshots should be in list
750 */
751 if (!incl_interim_snaps)
752 return (0);
753
754 err = zfs_collect_interim_snaps(krrp_task, fs_el,
755 txg_first, txg_last);
756
757 return (err);
758 }
759
760 static boolean_t
761 zfs_is_snapshot_belong(const char *ds_name, const char *snap_name)
762 {
763 char *at;
764
765 ASSERT(strchr(ds_name, '@') == NULL);
766 VERIFY((at = strrchr(snap_name, '@')) != NULL);
767
768 return (strncmp(ds_name, snap_name, at - snap_name + 1) == 0);
769 }
770
771 /*
772 * AVL compare function for cloned datasets
773 * To be sure that a cloned dataset will be replicated
774 * after its origin this functions does 2-stage compare.
775 * At the first stage it compares origin_name and name
776 * of both nodes to check that either s1 is clone of s2
777 * or vise versa.
778 * If s1 and s2 don't have dependencies, then at the second
779 * stage this function compares their TXG.
780 */
781 static int
782 zfs_cloned_ds_compare(const void *arg1, const void *arg2)
783 {
784 const zfs_ds_node_t *s1 = arg1;
785 const zfs_ds_node_t *s2 = arg2;
786
787 /* s1 is clone of s2, so s1 needs to be placed after s2 */
788 if (zfs_is_snapshot_belong(s2->name, s1->origin_name))
789 return (+1);
790
791 /* s2 is clone of s1, so s2 needs to be placed after s1 */
792 if (zfs_is_snapshot_belong(s1->name, s2->origin_name))
793 return (-1);
794
795 if (s1->creation_txg > s2->creation_txg)
796 return (+1);
797
798 if (s1->creation_txg < s2->creation_txg)
799 return (-1);
800
801 return (0);
802 }
803
804 /*
805 * This function retrieves the name and
806 * the guid of origin snapshot for the given clone
807 */
808 static int
809 zfs_populate_clone_info(zfs_ds_node_t *node)
810 {
811 int err;
812 dsl_dataset_t *ds_origin = NULL;
813 dsl_dir_t *ds_dir = node->ds->ds_dir;
814 dsl_pool_t *dp = ds_dir->dd_pool;
815
816 err = dsl_dataset_hold_obj(dp,
817 dsl_dir_phys(ds_dir)->dd_origin_obj, FTAG, &ds_origin);
818 if (err != 0)
819 return (err);
820
821 ASSERT(ds_origin->ds_is_snapshot);
822 dsl_dataset_name(ds_origin, node->origin_name);
823 ASSERT(strchr(node->origin_name, '@') != NULL);
824 node->origin_guid = dsl_dataset_phys(ds_origin)->ds_guid;
825 dsl_dataset_rele(ds_origin, FTAG);
826
827 return (0);
828 }
829
830 /*
831 * This function is used to lookup a node in the given list,
832 * that points to the parent of origin snapshot for the given
833 * clone_node. The last one is not a part of the list.
834 *
835 * If the list doesn't have required node, then NULL is returned.
836 */
837 static zfs_ds_node_t *
838 zfs_lookup_origin_node(list_t *ds_to_send, zfs_ds_node_t *clone_node)
839 {
840 char *at;
841 zfs_ds_node_t *node;
842
843 at = strchr(clone_node->origin_name, '@');
844 *at = '\0';
845
846 node = list_head(ds_to_send);
847 while (node != NULL) {
848 if (strcmp(node->name, clone_node->origin_name) == 0)
849 break;
850
851 node = list_next(ds_to_send, node);
852 }
853
854 *at = '@';
855 return (node);
856 }
857
858 /*
859 * This function is used to lookup a node in the given list,
860 * that points to the parent of the tnode. The last one
861 * is not a part of the list.
862 *
863 * If the list doesn't have required node, then NULL is returned.
864 */
865 static zfs_ds_node_t *
866 zfs_lookup_parent_node(list_t *ds_list, zfs_ds_node_t *tnode,
867 zfs_ds_node_t *start_node)
868 {
869 char *final_slash;
870 zfs_ds_node_t *node;
871
872 final_slash = strrchr(tnode->name, '/');
873 if (final_slash == NULL)
874 return (NULL);
875
876 *final_slash = '\0';
877
878 node = (start_node == NULL) ? list_head(ds_list) : start_node;
879 while (node != NULL) {
880 if (strcmp(node->name, tnode->name) == 0)
881 break;
882
883 node = list_next(ds_list, node);
884 }
885
886 *final_slash = '/';
887 return (node);
888 }
889
890 static int
891 zfs_construct_ds_node(dsl_pool_t *dp, uint64_t ds_object,
892 void *owner, zfs_ds_node_t **result)
893 {
894 zfs_ds_node_t *node;
895 int err;
896
897 ASSERT(result != NULL && *result == NULL);
898
899 node = kmem_zalloc(sizeof (zfs_ds_node_t), KM_SLEEP);
900
901 /* We need our own "hold" on the dataset */
902 err = dsl_dataset_hold_obj(dp, ds_object,
903 owner, &node->ds);
904 if (err != 0) {
905 kmem_free(node, sizeof (zfs_ds_node_t));
906 return (err);
907 }
908
909 dsl_dataset_long_hold(node->ds, owner);
910
911 dsl_dataset_name(node->ds, node->name);
912 node->creation_txg = dsl_dataset_phys(node->ds)->ds_creation_txg;
913
914 node->is_clone = dsl_dir_is_clone(node->ds->ds_dir);
915 if (node->is_clone) {
916 err = zfs_populate_clone_info(node);
917 if (err != 0) {
918 dsl_dataset_long_rele(node->ds, owner);
919 dsl_dataset_rele(node->ds, owner);
920 kmem_free(node, sizeof (zfs_ds_node_t));
921 return (err);
922 }
923 }
924
925 *result = node;
926 return (0);
927 }
928
929 /*
930 * This function walks only next-level children (depth = 1)
931 * and puts them into the given list.
932 * Clones also are placed into the given AVL.
933 */
934 static int
935 zfs_collect_children(dmu_krrp_task_t *krrp_task, dsl_pool_t *dp,
936 zfs_ds_node_t *parent_node, list_t *ds_list, avl_tree_t *clones)
937 {
938 zap_cursor_t zc;
939 zap_attribute_t attr;
940 int err;
941 objset_t *mos = dp->dp_meta_objset;
942 uint64_t dd_child_dir_zapobj =
943 dsl_dir_phys(parent_node->ds->ds_dir)->dd_child_dir_zapobj;
944
945 zap_cursor_init(&zc, mos, dd_child_dir_zapobj);
946 while (zap_cursor_retrieve(&zc, &attr) == 0) {
947 dsl_dir_t *dd = NULL;
948 zfs_ds_node_t *node = NULL;
949
950 ASSERT3U(attr.za_integer_length, ==,
951 sizeof (uint64_t));
952 ASSERT3U(attr.za_num_integers, ==, 1);
953
954 err = dsl_dir_hold_obj(dp, attr.za_first_integer,
955 attr.za_name, FTAG, &dd);
956 if (err != 0)
957 break;
958
959 err = zfs_construct_ds_node(dp,
960 dsl_dir_phys(dd)->dd_head_dataset_obj,
961 krrp_task, &node);
962 dsl_dir_rele(dd, FTAG);
963 if (err != 0) {
964 break;
965 }
966
967 list_insert_tail(ds_list, node);
968 if (node->is_clone)
969 avl_add(clones, node);
970
971 (void) zap_cursor_advance(&zc);
972 }
973
974 zap_cursor_fini(&zc);
975
976 return (err);
977 }
978
979 /*
980 * Collect datasets and snapshots of each dataset.
981 *
982 * This function walks ZFS-tree of datasets by using
983 * breadth-first search (BFS) method to avoid misordering
984 * in case of existing cloned datasets.
985 */
986 static int
987 zfs_collect_ds(dmu_krrp_task_t *krrp_task, spa_t *spa, list_t *ds_list)
988 {
989 int err = 0;
990 dsl_pool_t *dp;
991 dsl_dataset_t *ds = NULL;
992 uint64_t root_ds_object;
993 zfs_ds_node_t *clone_node, *node, *parent_node;
994 void *cookie = NULL;
995 avl_tree_t clones;
996
997 char *from_ds = krrp_task->buffer_args.from_ds;
998 char *from_snap = krrp_task->buffer_args.from_incr_base;
999 char *to_snap = krrp_task->buffer_args.from_snap;
1000 boolean_t incl_interim_snaps = krrp_task->buffer_args.do_all;
1001 boolean_t recursive = krrp_task->buffer_args.recursive;
1002
1003 dp = spa_get_dsl(spa);
1004
1005 dsl_pool_config_enter(dp, FTAG);
1006
1007 err = dsl_dataset_hold(dp, from_ds, FTAG, &ds);
1008 if (err != 0) {
1009 dsl_pool_config_exit(dp, FTAG);
1010 return (err);
1011 }
1012
1013 root_ds_object = ds->ds_object;
1014 dsl_dataset_rele(ds, FTAG);
1015
1016 node = NULL;
1017 err = zfs_construct_ds_node(dp, root_ds_object,
1018 krrp_task, &node);
1019 if (err != 0) {
1020 dsl_pool_config_exit(dp, FTAG);
1021 return (err);
1022 }
1023
1024 node->is_root = B_TRUE;
1025 list_insert_head(ds_list, node);
1026
1027 avl_create(&clones, zfs_cloned_ds_compare,
1028 sizeof (zfs_ds_node_t), offsetof(zfs_ds_node_t, avl_node));
1029
1030 if (recursive) {
1031 /*
1032 * The following loop walk over the list,
1033 * that is populated by zfs_collect_children(),
1034 * that always puts new items to the tail.
1035 *
1036 */
1037 while (node != NULL) {
1038 err = zfs_collect_children(krrp_task,
1039 dp, node, ds_list, &clones);
1040 if (err != 0)
1041 break;
1042
1043 node = list_next(ds_list, node);
1044 }
1045 }
1046
1047 dsl_pool_config_exit(dp, FTAG);
1048
1049 if (err != 0) {
1050 while ((node = avl_destroy_nodes(&clones, &cookie)) != NULL);
1051 avl_destroy(&clones);
1052 return (err);
1053 }
1054
1055 /*
1056 * We've collected all required datasets.
1057 *
1058 * Now need to do additional resort to place cloned datasets
1059 * to the correct position. And there are 2 cases:
1060 * (1) parent is located before the origin DS
1061 * (2) parent is located after the origin DS
1062 * In the first case need to place clone rigth after origin,
1063 * in the second after parent.
1064 *
1065 * avl_destroy_nodes() cannot be used here, because it
1066 * travels AVL from the end.
1067 */
1068 while ((clone_node = avl_first(&clones)) != NULL) {
1069 avl_remove(&clones, clone_node);
1070 list_remove(ds_list, clone_node);
1071
1072 clone_node->origin =
1073 zfs_lookup_origin_node(ds_list, clone_node);
1074
1075 if (clone_node->origin == NULL) {
1076 /*
1077 * It seems the origin is outside
1078 * of replication tree and in this case doesn't
1079 * matter where this node will be in the list
1080 */
1081
1082 list_insert_tail(ds_list, clone_node);
1083 continue;
1084 }
1085
1086 /*
1087 * We are looking for parent starting from origin,
1088 * because cannot place clone before its origin.
1089 *
1090 * parent_node == NULL means that it is located
1091 * in the list before origin, so we can just put
1092 * it rigth after the origin.
1093 */
1094 parent_node = zfs_lookup_parent_node(ds_list,
1095 clone_node, clone_node->origin);
1096 if (parent_node == NULL) {
1097 list_insert_after(ds_list,
1098 clone_node->origin, clone_node);
1099 } else {
1100 list_insert_after(ds_list,
1101 parent_node, clone_node);
1102 }
1103 }
1104
1105 avl_destroy(&clones);
1106
1107 node = list_head(ds_list);
1108 while (err == 0 && node != NULL) {
1109 err = zfs_collect_snaps(krrp_task, node,
1110 from_snap, to_snap, incl_interim_snaps);
1111 node = list_next(ds_list, node);
1112 }
1113
1114 return (err);
1115 }
1116
1117 /* Send a single dataset, mostly mimic regular send */
1118 static int
1119 zfs_send_one_ds(dmu_krrp_task_t *krrp_task, zfs_snap_avl_node_t *snap_el,
1120 zfs_snap_avl_node_t *snap_el_prev)
1121 {
1122 int err = 0;
1123 offset_t off = 0;
1124 dsl_pool_t *dp = NULL;
1125 dsl_dataset_t *snap_ds = NULL;
1126 dsl_dataset_t *snap_ds_prev = NULL;
1127 boolean_t embedok = krrp_task->buffer_args.embedok;
1128 boolean_t compressok = krrp_task->buffer_args.compressok;
1129 boolean_t large_block_ok = krrp_task->buffer_args.large_block_ok;
1130 nvlist_t *resume_info = krrp_task->buffer_args.resume_info;
1131 uint64_t resumeobj = 0, resumeoff = 0;
1132
1133 /*
1134 * 'ds' of snap_ds/snap_ds_prev alredy long-held
1135 * so we do not need to hold them again
1136 */
1137
1138 snap_ds = snap_el->ds;
1139 if (snap_el_prev != NULL)
1140 snap_ds_prev = snap_el_prev->ds;
1141
1142 /*
1143 * dsl_pool_config_enter() cannot be used here because
1144 * dmu_send_impl() calls dsl_pool_rele()
1145 *
1146 * VERIFY0() is used because dsl_pool_hold() opens spa,
1147 * that already is opened in our case, so it cannot fail
1148 */
1149 VERIFY0(dsl_pool_hold(snap_el->name, FTAG, &dp));
1150
1151 if (resume_info != NULL) {
1152 err = nvlist_lookup_uint64(resume_info, "object", &resumeobj);
1153 ASSERT3U(err, !=, ENOENT);
1154 if (err != 0) {
1155 dsl_pool_rele(dp, FTAG);
1156 return (SET_ERROR(err));
1157 }
1158
1159 err = nvlist_lookup_uint64(resume_info, "offset", &resumeoff);
1160 ASSERT3U(err, !=, ENOENT);
1161 if (err != 0) {
1162 dsl_pool_rele(dp, FTAG);
1163 return (SET_ERROR(err));
1164 }
1165 }
1166
1167 if (krrp_debug) {
1168 cmn_err(CE_NOTE, "KRRP SEND INC_BASE: %s -- DS: "
1169 "%s -- GUID: %llu",
1170 snap_el_prev == NULL ? "<none>" : snap_el_prev->name,
1171 snap_el->name,
1172 (unsigned long long)dsl_dataset_phys(snap_ds)->ds_guid);
1173 }
1174
1175 if (snap_ds_prev != NULL) {
1176 zfs_bookmark_phys_t zb;
1177 boolean_t is_clone;
1178
1179 if (!dsl_dataset_is_before(snap_ds, snap_ds_prev, 0)) {
1180 dsl_pool_rele(dp, FTAG);
1181 return (SET_ERROR(EXDEV));
1182 }
1183
1184 zb.zbm_creation_time =
1185 dsl_dataset_phys(snap_ds_prev)->ds_creation_time;
1186 zb.zbm_creation_txg =
1187 dsl_dataset_phys(snap_ds_prev)->ds_creation_txg;
1188 zb.zbm_guid = dsl_dataset_phys(snap_ds_prev)->ds_guid;
1189 is_clone = (snap_ds_prev->ds_dir != snap_ds->ds_dir);
1190
1191 err = dmu_send_impl(FTAG, dp, snap_ds, &zb, is_clone,
1192 embedok, large_block_ok, compressok, -1, resumeobj, resumeoff, NULL,
1193 &off, krrp_task);
1194 } else {
1195 err = dmu_send_impl(FTAG, dp, snap_ds, NULL, B_FALSE,
1196 embedok, large_block_ok, compressok, -1, resumeobj, resumeoff, NULL,
1197 &off, krrp_task);
1198 }
1199
1200 /*
1201 * dsl_pool_rele() is not required here
1202 * because dmu_send_impl() already did it
1203 */
1204
1205 return (err);
1206 }
1207
1208 /*
1209 * Here we iterate over all collected FSs and
1210 * their SNAPs to collect props
1211 */
1212 static int
1213 zfs_prepare_compound_data(list_t *fs_list, nvlist_t **fss)
1214 {
1215 zfs_ds_node_t *fs_el;
1216 int err = 0;
1217 nvlist_t *nvfss;
1218 uint64_t guid;
1219 char sguid[64];
1220
1221 nvfss = fnvlist_alloc();
1222
1223 /* Traverse the list of datasetss */
1224 fs_el = list_head(fs_list);
1225 while (fs_el != NULL) {
1226 zfs_snap_avl_node_t *snap_el;
1227 nvlist_t *nvfs, *nvsnaps, *nvsnaps_props;
1228 char *at;
1229
1230 nvfs = fnvlist_alloc();
1231 fnvlist_add_string(nvfs, "name", fs_el->name);
1232
1233 if (fs_el->origin_name[0] != '\0') {
1234 fnvlist_add_uint64(nvfs,
1235 "origin", fs_el->origin_guid);
1236 VERIFY((at = strchr(fs_el->origin_name, '@')) != NULL);
1237 *at = '\0';
1238 fnvlist_add_string(nvfs,
1239 "origin_fsname", fs_el->origin_name);
1240 *at = '@';
1241 }
1242
1243 err = zfs_collect_fs_props(fs_el->ds, nvfs);
1244 if (err != 0) {
1245 fnvlist_free(nvfs);
1246 break;
1247 }
1248
1249 nvsnaps = fnvlist_alloc();
1250 nvsnaps_props = fnvlist_alloc();
1251
1252 snap_el = avl_first(&fs_el->snapshots);
1253 while (snap_el != NULL) {
1254 nvlist_t *nvsnap_props = NULL;
1255 char *snapname, *at;
1256
1257 at = strrchr(snap_el->name, '@');
1258 ASSERT(at != NULL);
1259 if (at == NULL) {
1260 err = SET_ERROR(EILSEQ);
1261 break;
1262 }
1263
1264 err = zfs_collect_snap_props(snap_el->ds,
1265 &nvsnap_props);
1266 if (err != 0)
1267 break;
1268
1269 snapname = at + 1;
1270 fnvlist_add_uint64(nvsnaps, snapname, snap_el->guid);
1271 fnvlist_add_nvlist(nvsnaps_props,
1272 snapname, nvsnap_props);
1273 fnvlist_free(nvsnap_props);
1274
1275 snap_el = AVL_NEXT(&fs_el->snapshots, snap_el);
1276 }
1277
1278 if (err == 0) {
1279 fnvlist_add_nvlist(nvfs, "snaps", nvsnaps);
1280 fnvlist_add_nvlist(nvfs, "snapprops",
1281 nvsnaps_props);
1282
1283 guid = dsl_dataset_phys(fs_el->ds)->ds_guid;
1284 (void) sprintf(sguid, "0x%llx",
1285 (unsigned long long)guid);
1286 fnvlist_add_nvlist(nvfss, sguid, nvfs);
1287 }
1288
1289 fnvlist_free(nvsnaps);
1290 fnvlist_free(nvsnaps_props);
1291 fnvlist_free(nvfs);
1292
1293 if (err != 0)
1294 break;
1295
1296 fs_el = list_next(fs_list, fs_el);
1297 }
1298
1299 if (err != 0)
1300 fnvlist_free(nvfss);
1301 else
1302 *fss = nvfss;
1303
1304 return (err);
1305 }
1306
1307 static void
1308 zfs_prepare_compound_hdr(dmu_krrp_task_t *krrp_task, nvlist_t **hdrnvl)
1309 {
1310 nvlist_t *nvl;
1311
1312 nvl = fnvlist_alloc();
1313
1314 if (krrp_task->buffer_args.from_incr_base[0] != '\0') {
1315 fnvlist_add_string(nvl, "fromsnap",
1316 krrp_task->buffer_args.from_incr_base);
1317 }
1318
1319 fnvlist_add_string(nvl, "tosnap", krrp_task->buffer_args.from_snap);
1320
1321 if (!krrp_task->buffer_args.recursive)
1322 fnvlist_add_boolean(nvl, "not_recursive");
1323
1324 *hdrnvl = nvl;
1325 }
1326
1327 static int
1328 zfs_send_compound_stream_header(dmu_krrp_task_t *krrp_task, list_t *ds_to_send)
1329 {
1330 int err;
1331 nvlist_t *fss = NULL;
1332 nvlist_t *hdrnvl = NULL;
1333 dmu_replay_record_t drr;
1334 zio_cksum_t zc = { 0 };
1335 char *packbuf = NULL;
1336 size_t buflen = 0;
1337
1338 zfs_prepare_compound_hdr(krrp_task, &hdrnvl);
1339
1340 err = zfs_prepare_compound_data(ds_to_send, &fss);
1341 if (err != 0)
1342 return (err);
1343
1344 fnvlist_add_nvlist(hdrnvl, "fss", fss);
1345 fnvlist_free(fss);
1346
1347 VERIFY0(nvlist_pack(hdrnvl, &packbuf, &buflen,
1348 NV_ENCODE_XDR, KM_SLEEP));
1349 fnvlist_free(hdrnvl);
1350
1351 bzero(&drr, sizeof (drr));
1352 drr.drr_type = DRR_BEGIN;
1353 drr.drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
1354 DMU_SET_STREAM_HDRTYPE(drr.drr_u.drr_begin.drr_versioninfo,
1355 DMU_COMPOUNDSTREAM);
1356 (void) snprintf(drr.drr_u.drr_begin.drr_toname,
1357 sizeof (drr.drr_u.drr_begin.drr_toname),
1358 "%s@%s", krrp_task->buffer_args.from_ds,
1359 krrp_task->buffer_args.from_snap);
1360 drr.drr_payloadlen = buflen;
1361 if (krrp_task->buffer_args.force_cksum)
1362 (void) fletcher_4_incremental_native(&drr, sizeof (drr), &zc);
1363
1364 err = dmu_krrp_buffer_write(&drr, sizeof (drr), krrp_task);
1365 if (err != 0)
1366 goto out;
1367
1368 if (buflen != 0) {
1369 if (krrp_task->buffer_args.force_cksum)
1370 (void) fletcher_4_incremental_native(packbuf, buflen, &zc);
1371
1372 err = dmu_krrp_buffer_write(packbuf, buflen, krrp_task);
1373 if (err != 0)
1374 goto out;
1375 }
1376
1377 bzero(&drr, sizeof (drr));
1378 drr.drr_type = DRR_END;
1379 drr.drr_u.drr_end.drr_checksum = zc;
1380
1381 err = dmu_krrp_buffer_write(&drr, sizeof (drr), krrp_task);
1382
1383 out:
1384 if (packbuf != NULL)
1385 kmem_free(packbuf, buflen);
1386
1387 return (err);
1388 }
1389
1390 /*
1391 * For every dataset there is a chain of snapshots. It may start with
1392 * an empty record, which means it is a non-incremental snap, after
1393 * that this dataset is considered to be under an incremental stream.
1394 * In an incremental stream, first snapshot for every dataset is
1395 * an incremental base. After sending, currently sent snapshot
1396 * becomes a base for the next one unless the next belongs to
1397 * another dataset or is an empty record.
1398 */
1399 static int
1400 zfs_send_snapshots(dmu_krrp_task_t *krrp_task, avl_tree_t *snapshots,
1401 char *resume_snap_name)
1402 {
1403 int err = 0;
1404 char *incr_base = krrp_task->buffer_args.from_incr_base;
1405 zfs_snap_avl_node_t *snap_el, *snap_el_prev = NULL;
1406
1407 snap_el = avl_first(snapshots);
1408
1409 /*
1410 * It is possible that a new FS does not yet have snapshots,
1411 * because the FS was created after the right border snapshot
1412 */
1413 if (snap_el == NULL)
1414 return (0);
1415
1416 /*
1417 * For an incemental stream need to skip
1418 * the incremental base snapshot
1419 */
1420 if (incr_base[0] != '\0') {
1421 char *short_snap_name = strrchr(snap_el->name, '@') + 1;
1422 if (strcmp(incr_base, short_snap_name) == 0) {
1423 snap_el_prev = snap_el;
1424 snap_el = AVL_NEXT(snapshots, snap_el);
1425 }
1426 }
1427
1428 if (resume_snap_name != NULL) {
1429 while (snap_el != NULL) {
1430 if (strcmp(snap_el->name, resume_snap_name) == 0)
1431 break;
1432
1433 snap_el_prev = snap_el;
1434 snap_el = AVL_NEXT(snapshots, snap_el);
1435 }
1436 }
1437
1438 /*
1439 * Origin snapshot is here not to sent it,
1440 * it is used to define start point
1441 */
1442 if (snap_el != NULL && snap_el->origin) {
1443 snap_el_prev = snap_el;
1444 snap_el = AVL_NEXT(snapshots, snap_el);
1445 }
1446
1447 while (snap_el != NULL) {
1448 err = zfs_send_one_ds(krrp_task, snap_el, snap_el_prev);
1449 if (err != 0)
1450 break;
1451
1452 /*
1453 * We have sent resumed snap,
1454 * so resume_info is not relevant anymore
1455 */
1456 if (krrp_task->buffer_args.resume_info != NULL) {
1457 fnvlist_free(krrp_task->buffer_args.resume_info);
1458 krrp_task->buffer_args.resume_info = NULL;
1459 }
1460
1461 snap_el_prev = snap_el;
1462 snap_el = AVL_NEXT(snapshots, snap_el);
1463 }
1464
1465 return (err);
1466 }
1467
1468 static int
1469 dmu_krrp_send_resume(char *resume_token, list_t *ds_to_send,
1470 char **resume_fs_name, char **resume_snap_name)
1471 {
1472 zfs_ds_node_t *fs_el;
1473 zfs_snap_avl_node_t *snap_el;
1474 char *at_ptr;
1475
1476 at_ptr = strrchr(resume_token, '@');
1477 if (at_ptr == NULL) {
1478 cmn_err(CE_WARN, "Invalid resume_token [%s]", resume_token);
1479 return (SET_ERROR(ENOSR));
1480 }
1481
1482 *at_ptr = '\0';
1483
1484 /* First need to find FS that matches the given cookie */
1485 fs_el = list_head(ds_to_send);
1486 while (fs_el != NULL) {
1487 if (strcmp(fs_el->name, resume_token) == 0)
1488 break;
1489
1490 fs_el = list_next(ds_to_send, fs_el);
1491 }
1492
1493 /* There is no target FS */
1494 if (fs_el == NULL) {
1495 cmn_err(CE_WARN, "Unknown FS name [%s]", resume_token);
1496 return (SET_ERROR(ENOSR));
1497 }
1498
1499 *at_ptr = '@';
1500
1501 /*
1502 * FS has been found, need to find SNAP that
1503 * matches the given cookie
1504 */
1505 snap_el = avl_first(&fs_el->snapshots);
1506 while (snap_el != NULL) {
1507 if (strcmp(snap_el->name, resume_token) == 0)
1508 break;
1509
1510 snap_el = AVL_NEXT(&fs_el->snapshots, snap_el);
1511 }
1512
1513 /* There is no target snapshot */
1514 if (snap_el == NULL) {
1515 cmn_err(CE_WARN, "Unknown SNAP name [%s]", resume_token);
1516 return (SET_ERROR(ENOSR));
1517 }
1518
1519 *resume_snap_name = snap_el->name;
1520 *resume_fs_name = fs_el->name;
1521
1522 return (0);
1523 }
1524
1525 static int
1526 zfs_send_ds(dmu_krrp_task_t *krrp_task, list_t *ds_to_send)
1527 {
1528 int err = 0;
1529 zfs_ds_node_t *fs_el;
1530 char *resume_fs_name = NULL;
1531 char *resume_snap_name = NULL;
1532
1533 fs_el = list_head(ds_to_send);
1534
1535 /* Resume logic */
1536 if (krrp_task->buffer_args.resume_info != NULL) {
1537 char *toname = NULL;
1538
1539 err = nvlist_lookup_string(krrp_task->buffer_args.resume_info,
1540 "toname", &toname);
1541 ASSERT(err != ENOENT);
1542 if (err != 0)
1543 return (SET_ERROR(err));
1544
1545 err = dmu_krrp_send_resume(toname, ds_to_send,
1546 &resume_fs_name, &resume_snap_name);
1547 if (err != 0)
1548 return (err);
1549
1550 while (fs_el != NULL) {
1551 if (strcmp(fs_el->name, resume_fs_name) == 0)
1552 break;
1553
1554 fs_el = list_next(ds_to_send, fs_el);
1555 }
1556 }
1557
1558 while (fs_el != NULL) {
1559 err = zfs_send_snapshots(krrp_task,
1560 &fs_el->snapshots, resume_snap_name);
1561 if (err != 0)
1562 break;
1563
1564 /*
1565 * resume_snap_name needs to be NULL for the datasets,
1566 * that are on the "right" side of the resume-token,
1567 * because need to process all their snapshots
1568 */
1569 if (resume_snap_name != NULL)
1570 resume_snap_name = NULL;
1571
1572 fs_el = list_next(ds_to_send, fs_el);
1573 }
1574
1575 return (err);
1576 }
1577
1578 static void
1579 zfs_cleanup_send_list(dmu_krrp_task_t *krrp_task, list_t *ds_list)
1580 {
1581 zfs_ds_node_t *fs_el;
1582
1583 /* Walk over all collected FSs and their SNAPs to cleanup */
1584 while ((fs_el = list_remove_head(ds_list)) != NULL) {
1585 zfs_snap_avl_node_t *snap_el;
1586 void *cookie = NULL;
1587
1588 while ((snap_el = avl_destroy_nodes(&fs_el->snapshots,
1589 &cookie)) != NULL) {
1590 if (snap_el->origin) {
1591 dsl_dataset_long_rele(snap_el->ds, fs_el);
1592 dsl_dataset_rele(snap_el->ds, fs_el);
1593 } else {
1594 dsl_dataset_long_rele(snap_el->ds, krrp_task);
1595 dsl_dataset_rele(snap_el->ds, krrp_task);
1596 }
1597
1598 kmem_free(snap_el, sizeof (zfs_snap_avl_node_t));
1599 }
1600
1601 dsl_dataset_long_rele(fs_el->ds, krrp_task);
1602 dsl_dataset_rele(fs_el->ds, krrp_task);
1603
1604 kmem_free(fs_el, sizeof (zfs_ds_node_t));
1605 }
1606 }
1607
1608 /*
1609 * zfs_send_thread
1610 * executes ONE iteration, initial or incremental, on the sender side
1611 * 1) validates versus WBC
1612 * 2) collects source datasets and its to-be-sent snapshots
1613 * 2.1) each source dataset is an element of list, that contains
1614 * - name of dataset
1615 * - avl-tree of snapshots
1616 * - its guid
1617 * - the corresponding long held dsl_datasets_t
1618 * 2.2) each snapshot is an element of avl-tree, that contains
1619 * - name of snapshot
1620 * - its guid
1621 * - creation TXG
1622 * - the corresponding long held dsl_datasets_t
1623 * 3) initiate send stream
1624 * 4) send in order, one snapshot at a time
1625 */
1626 static void
1627 zfs_send_thread(void *krrp_task_void)
1628 {
1629 dmu_replay_record_t drr = { 0 };
1630 dmu_krrp_task_t *krrp_task = krrp_task_void;
1631 kreplication_zfs_args_t *buffer_args = &krrp_task->buffer_args;
1632 list_t ds_to_send;
1633 int err = 0;
1634 spa_t *spa = NULL;
1635
1636 boolean_t compound_stream = buffer_args->recursive ||
1637 buffer_args->properties || buffer_args->do_all;
1638
1639 ASSERT(krrp_task != NULL);
1640
1641 err = spa_open(krrp_task->buffer_args.from_ds, &spa, krrp_task);
1642 if (err != 0)
1643 goto early_error;
1644
1645 if (buffer_args->resume_info != NULL) {
1646 err = dmu_krrp_validate_resume_info(buffer_args->resume_info);
1647 if (err != 0)
1648 goto early_error;
1649 }
1650
1651 list_create(&ds_to_send, sizeof (zfs_ds_node_t),
1652 offsetof(zfs_ds_node_t, list_node));
1653
1654 /*
1655 * Source cannot be a writecached child if
1656 * the from_snapshot is an autosnap
1657 */
1658 err = wbc_check_dataset(buffer_args->from_ds);
1659 if (err != 0 && err != ENOTACTIVE) {
1660 boolean_t from_snap_is_autosnap =
1661 autosnap_check_name(buffer_args->from_snap);
1662 if (err != EOPNOTSUPP || from_snap_is_autosnap) {
1663 if (err == EOPNOTSUPP)
1664 err = SET_ERROR(ENOTDIR);
1665
1666 goto final;
1667 }
1668 }
1669
1670 err = autosnap_lock(spa, RW_READER);
1671 if (err != 0)
1672 goto final;
1673
1674 err = zfs_collect_ds(krrp_task, spa, &ds_to_send);
1675
1676 autosnap_unlock(spa);
1677
1678 if (err != 0)
1679 goto final;
1680
1681 /*
1682 * Recursive stream, stream with properties, or complete-incremental
1683 * stream have special header (DMU_COMPOUNDSTREAM)
1684 */
1685 if (compound_stream) {
1686 err = zfs_send_compound_stream_header(krrp_task, &ds_to_send);
1687 if (err != 0)
1688 goto final;
1689 }
1690
1691 err = zfs_send_ds(krrp_task, &ds_to_send);
1692
1693 final:
1694
1695 zfs_cleanup_send_list(krrp_task, &ds_to_send);
1696
1697 list_destroy(&ds_to_send);
1698
1699 if (err == 0 && compound_stream) {
1700 bzero(&drr, sizeof (drr));
1701 drr.drr_type = DRR_END;
1702 err = dmu_krrp_buffer_write(&drr, sizeof (drr), krrp_task);
1703 }
1704
1705 if (err == 0)
1706 err = dmu_krrp_put_buffer(krrp_task);
1707
1708 early_error:
1709 if (err != 0) {
1710 dmu_set_send_recv_error(krrp_task, err);
1711 cmn_err(CE_WARN, "Send thread exited with error code %d", err);
1712 }
1713
1714 if (spa != NULL)
1715 spa_close(spa, krrp_task);
1716
1717 (void) dmu_krrp_fini_task(krrp_task);
1718 }
1719
1720 /* KRRP-RECV routines */
1721
1722 /*
1723 * Alternate props from the received steam
1724 * Walk over all props from incoming nvlist "props" and
1725 * - replace each that is contained in nvlist "replace"
1726 * - remove each that is contained in nvlist "exclude"
1727 */
1728 static void
1729 zfs_recv_alter_props(nvlist_t *props, nvlist_t *exclude, nvlist_t *replace)
1730 {
1731 nvpair_t *element = NULL;
1732
1733 if (props != NULL && exclude != NULL) {
1734 while (
1735 (element = nvlist_next_nvpair(exclude, element)) != NULL) {
1736 nvpair_t *pair;
1737 char *prop = nvpair_name(element);
1738 char *prop_recv;
1739 char *prop_inher;
1740
1741 prop_recv =
1742 kmem_asprintf("%s%s", prop, ZPROP_RECVD_SUFFIX);
1743 prop_inher =
1744 kmem_asprintf("%s%s", prop, ZPROP_INHERIT_SUFFIX);
1745
1746 pair = NULL;
1747 (void) nvlist_lookup_nvpair(props, prop, &pair);
1748 if (pair)
1749 fnvlist_remove_nvpair(props, pair);
1750
1751 pair = NULL;
1752 (void) nvlist_lookup_nvpair(props, prop_recv, &pair);
1753 if (pair)
1754 fnvlist_remove_nvpair(props, pair);
1755
1756 pair = NULL;
1757 (void) nvlist_lookup_nvpair(props, prop_inher, &pair);
1758 if (pair)
1759 fnvlist_remove_nvpair(props, pair);
1760
1761 strfree(prop_recv);
1762 strfree(prop_inher);
1763 }
1764 }
1765
1766 if (props != NULL && replace != NULL) {
1767 while (
1768 (element = nvlist_next_nvpair(replace, element)) != NULL) {
1769 nvpair_t *pair;
1770 char *prop = nvpair_name(element);
1771 char *prop_recv;
1772 char *prop_inher;
1773
1774 prop_recv =
1775 kmem_asprintf("%s%s", prop, ZPROP_RECVD_SUFFIX);
1776 prop_inher =
1777 kmem_asprintf("%s%s", prop, ZPROP_INHERIT_SUFFIX);
1778
1779 pair = NULL;
1780 (void) nvlist_lookup_nvpair(props, prop, &pair);
1781 if (pair)
1782 fnvlist_remove_nvpair(props, pair);
1783
1784 pair = NULL;
1785 (void) nvlist_lookup_nvpair(props, prop_recv, &pair);
1786 if (pair)
1787 fnvlist_remove_nvpair(props, pair);
1788
1789 pair = NULL;
1790 (void) nvlist_lookup_nvpair(props, prop_inher, &pair);
1791 if (pair)
1792 fnvlist_remove_nvpair(props, pair);
1793
1794 strfree(prop_recv);
1795 strfree(prop_inher);
1796
1797 fnvlist_add_nvpair(props, element);
1798 }
1799 }
1800 }
1801
1802 /*
1803 * Callback for dmu_objset_find_dp()
1804 * Checks only snapshots. If a snapshot is matched 'guid',
1805 * that is passed over the cb_arg, then the snapshot is
1806 * our target origin. So that we store its name and return
1807 * EINTR to speed up finalization of dmu_objset_find_dp()
1808 */
1809 /* ARGSUSED */
1810 static int
1811 zfs_lookup_origin_snapshot_cb(dsl_pool_t *dp,
1812 dsl_dataset_t *ds, void *arg)
1813 {
1814 zfs_los_cb_arg_t *cb_arg = arg;
1815
1816 if (!ds->ds_is_snapshot)
1817 return (0);
1818
1819
1820 if (dsl_dataset_phys(ds)->ds_guid == cb_arg->guid) {
1821 dsl_dataset_name(ds, cb_arg->origin_name);
1822 return (EINTR);
1823 }
1824
1825 return (0);
1826 }
1827
1828 /*
1829 * FIXME: needs to be optimized
1830 * TODO: no reason to walk over the already walked datasets
1831 */
1832 static int
1833 zfs_lookup_origin_snapshot(spa_t *spa, const char *clone_name,
1834 uint64_t guid, char *result)
1835 {
1836 char start[ZFS_MAX_DATASET_NAME_LEN];
1837 char *cp;
1838 dsl_pool_t *dp = spa_get_dsl(spa);
1839 zfs_los_cb_arg_t cb_arg = {result, guid};
1840
1841 ASSERT(result != NULL);
1842
1843 *result = '\0';
1844
1845 (void) strlcpy(start, clone_name, sizeof (start));
1846 cp = strrchr(start, '/');
1847 if (cp == NULL)
1848 cp = strchr(start, '\0');
1849
1850 for (; cp != NULL; cp = strrchr(start, '/')) {
1851 dsl_dataset_t *ds = NULL;
1852 int err;
1853 uint64_t dd_object;
1854
1855 *cp = '\0';
1856
1857 dsl_pool_config_enter(dp, FTAG);
1858
1859 err = dsl_dataset_hold(dp, start, FTAG, &ds);
1860 if (err != 0) {
1861 dsl_pool_config_exit(dp, FTAG);
1862 break;
1863 }
1864
1865 dd_object = ds->ds_dir->dd_object;
1866 dsl_dataset_rele(ds, FTAG);
1867
1868 err = dmu_objset_find_dp(dp, dd_object,
1869 zfs_lookup_origin_snapshot_cb, &cb_arg,
1870 DS_FIND_CHILDREN | DS_FIND_SNAPSHOTS);
1871
1872 dsl_pool_config_exit(dp, FTAG);
1873
1874 if (*result != '\0' || err != 0)
1875 break;
1876 }
1877
1878 return ((*result == '\0') ? -1 : 0);
1879 }
1880
1881 /* Recv a single snapshot. It is a simplified version of recv */
1882 static int
1883 zfs_recv_one_ds(spa_t *spa, char *ds, dmu_replay_record_t *drr,
1884 nvlist_t *fs_props, nvlist_t *snap_props, dmu_krrp_task_t *krrp_task)
1885 {
1886 int err = 0;
1887 uint64_t errf = 0;
1888 uint64_t ahdl = 0;
1889 uint64_t sz = 0;
1890 char *tosnap;
1891 char origin[ZFS_MAX_DATASET_NAME_LEN];
1892 char *originp = NULL;
1893 struct drr_begin *drrb = &drr->drr_u.drr_begin;
1894
1895 if (krrp_task->buffer_args.to_snap[0]) {
1896 tosnap = krrp_task->buffer_args.to_snap;
1897 } else {
1898 tosnap = strchr(drrb->drr_toname, '@') + 1;
1899 }
1900
1901 /* To recv cloned DS need to find its origin snapshot */
1902 if ((drrb->drr_flags & DRR_FLAG_CLONE) != 0) {
1903 err = zfs_lookup_origin_snapshot(spa, ds,
1904 drrb->drr_fromguid, origin);
1905 if (err != 0) {
1906 if (krrp_debug) {
1907 cmn_err(CE_WARN, "Origin snapshot "
1908 "(guid: %llu) does not exist",
1909 (unsigned long long)drrb->drr_fromguid);
1910 }
1911
1912 return (SET_ERROR(ENOLINK));
1913 }
1914
1915 originp = origin;
1916 }
1917
1918 zfs_recv_alter_props(fs_props,
1919 krrp_task->buffer_args.ignore_list,
1920 krrp_task->buffer_args.replace_list);
1921
1922 if (krrp_debug) {
1923 cmn_err(CE_NOTE, "KRRP RECV INC_BASE: "
1924 "%llu -- DS: %s -- TO_SNAP:%s",
1925 (unsigned long long)drr->drr_u.drr_begin.drr_fromguid,
1926 ds, tosnap);
1927 }
1928
1929 /* hack to avoid adding the symnol to the libzpool export list */
1930 #ifdef _KERNEL
1931 err = dmu_recv_impl(NULL, ds, tosnap, originp, drr, B_TRUE, fs_props,
1932 NULL, &errf, -1, &ahdl, &sz, krrp_task->buffer_args.force,
1933 krrp_task);
1934
1935 /*
1936 * If receive has been successfully finished
1937 * we can apply received snapshot properties
1938 */
1939 if (err == 0 && snap_props != NULL) {
1940 char *full_snap_name;
1941
1942 full_snap_name = kmem_asprintf("%s@%s", ds, tosnap);
1943 err = zfs_ioc_set_prop_impl(full_snap_name,
1944 snap_props, B_TRUE, NULL);
1945 if (err != 0 && krrp_debug) {
1946 cmn_err(CE_NOTE, "KRRP RECV: failed to apply "
1947 "received snapshot properties [%d]", err);
1948 }
1949
1950 strfree(full_snap_name);
1951 }
1952 #endif
1953
1954 return (err);
1955 }
1956
1957 /*
1958 * Recv one stream
1959 * 1) validates versus WBC
1960 * 2) prepares receiving paths according to the given
1961 * flags ('leave_tail' or 'strip_head')
1962 * 3) recv stream
1963 * 4) apply snapshot properties if they
1964 * are part of received stream
1965 * 5) To support resume-recv save to ZAP the name
1966 * of complettly received snapshot. After merge with illumos
1967 * the resume-logic need to be replaced by the more intelegent
1968 * logic from illumos
1969 *
1970 * The implemented "recv" supports most of userspace-recv
1971 * functionality.
1972 *
1973 * Dedup-stream is not supported
1974 */
1975 static void
1976 zfs_recv_thread(void *krrp_task_void)
1977 {
1978 dmu_krrp_task_t *krrp_task = krrp_task_void;
1979 dmu_replay_record_t drr = { 0 };
1980 struct drr_begin *drrb = &drr.drr_u.drr_begin;
1981 zio_cksum_t zcksum = { 0 };
1982 int err;
1983 int baselen;
1984 spa_t *spa = NULL;
1985 char latest_snap[ZFS_MAX_DATASET_NAME_LEN] = { 0 };
1986 char to_ds[ZFS_MAX_DATASET_NAME_LEN];
1987 int hdrtype;
1988 uint64_t featureflags;
1989
1990 ASSERT(krrp_task != NULL);
1991
1992 err = spa_open(krrp_task->buffer_args.to_ds, &spa, krrp_task);
1993 if (err != NULL)
1994 goto out;
1995
1996 /*
1997 * This option requires a functionality (similar to
1998 * create_parents() from libzfs_dataset.c), that is not
1999 * implemented yet
2000 */
2001 if (krrp_task->buffer_args.strip_head) {
2002 err = SET_ERROR(ENOTSUP);
2003 goto out;
2004 }
2005
2006 (void) strlcpy(to_ds, krrp_task->buffer_args.to_ds, sizeof (to_ds));
2007 if (dsl_dataset_creation_txg(to_ds) == UINT64_MAX) {
2008 char *p;
2009
2010 /*
2011 * If 'leave_tail' or 'strip_head' are define,
2012 * then 'to_ds' just a prefix and must exist
2013 */
2014 if (krrp_task->buffer_args.leave_tail ||
2015 krrp_task->buffer_args.strip_head) {
2016 err = SET_ERROR(ENOENT);
2017 goto out;
2018 }
2019
2020 /*
2021 * spa found, '/' must be, becase the above
2022 * check returns UINT64_MAX
2023 */
2024 VERIFY((p = strrchr(to_ds, '/')) != NULL);
2025 *p = '\0';
2026
2027 /*
2028 * It is OK that destination does not exist,
2029 * but its parent must be here
2030 */
2031 if (dsl_dataset_creation_txg(to_ds) == UINT64_MAX) {
2032 err = SET_ERROR(ENOENT);
2033 goto out;
2034 }
2035 }
2036
2037 /* destination cannot be writecached */
2038 err = wbc_check_dataset(to_ds);
2039 if (err == 0 || err == EOPNOTSUPP) {
2040 err = SET_ERROR(ENOTDIR);
2041 goto out;
2042 }
2043
2044 /*
2045 * ENOTACTIVE means WBC is not active for the DS
2046 * If some another error just return
2047 */
2048 if (err != ENOTACTIVE)
2049 goto out;
2050
2051 /* Read leading block */
2052 err = dmu_krrp_buffer_read(&drr, sizeof (drr), krrp_task);
2053 if (err != 0)
2054 goto out;
2055
2056 if (drr.drr_type != DRR_BEGIN ||
2057 (drrb->drr_magic != DMU_BACKUP_MAGIC &&
2058 drrb->drr_magic != BSWAP_64(DMU_BACKUP_MAGIC))) {
2059 err = SET_ERROR(EBADMSG);
2060 goto out;
2061 }
2062
2063 baselen = strchr(drrb->drr_toname, '@') - drrb->drr_toname;
2064
2065 /* Process passed arguments */
2066 if (krrp_task->buffer_args.strip_head) {
2067 char *pos = strchr(drrb->drr_toname, '/');
2068 if (pos)
2069 baselen = pos - drrb->drr_toname;
2070 }
2071
2072 if (krrp_task->buffer_args.leave_tail) {
2073 char *pos = strrchr(drrb->drr_toname, '/');
2074 if (pos)
2075 baselen = pos - drrb->drr_toname;
2076 }
2077
2078 featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
2079 hdrtype = DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo);
2080 if (!DMU_STREAM_SUPPORTED(featureflags) ||
2081 (hdrtype != DMU_SUBSTREAM && hdrtype != DMU_COMPOUNDSTREAM)) {
2082 err = SET_ERROR(EBADMSG);
2083 goto out;
2084 }
2085
2086 if (hdrtype == DMU_SUBSTREAM) {
2087 /* recv a simple single snapshot */
2088 char full_ds[ZFS_MAX_DATASET_NAME_LEN];
2089
2090 (void) strlcpy(full_ds, krrp_task->buffer_args.to_ds,
2091 sizeof (full_ds));
2092 if (krrp_task->buffer_args.strip_head ||
2093 krrp_task->buffer_args.leave_tail) {
2094 char *pos;
2095 int len = strlen(full_ds) +
2096 strlen(drrb->drr_toname + baselen) + 1;
2097 if (len < sizeof (full_ds)) {
2098 (void) strlcat(full_ds, "/", sizeof (full_ds));
2099 (void) strlcat(full_ds,
2100 drrb->drr_toname + baselen,
2101 sizeof (full_ds));
2102 pos = strchr(full_ds, '@');
2103 *pos = '\0';
2104 } else {
2105 err = SET_ERROR(ENAMETOOLONG);
2106 goto out;
2107 }
2108 }
2109
2110 (void) snprintf(latest_snap, sizeof (latest_snap),
2111 "%s%s", full_ds, strchr(drrb->drr_toname, '@'));
2112 err = zfs_recv_one_ds(spa, full_ds, &drr, NULL, NULL, krrp_task);
2113 } else {
2114 nvlist_t *nvl = NULL, *nvfs = NULL;
2115 avl_tree_t *fsavl = NULL;
2116
2117 if (krrp_task->buffer_args.force_cksum) {
2118 (void) fletcher_4_incremental_native(&drr,
2119 sizeof (drr), &zcksum);
2120 }
2121
2122 /* Recv COMPOUND PAYLOAD */
2123 if (drr.drr_payloadlen > 0) {
2124 char *buf = kmem_alloc(drr.drr_payloadlen, KM_SLEEP);
2125 err = dmu_krrp_buffer_read(
2126 buf, drr.drr_payloadlen, krrp_task);
2127 if (err != 0) {
2128 kmem_free(buf, drr.drr_payloadlen);
2129 goto out;
2130 }
2131
2132 if (krrp_task->buffer_args.force_cksum) {
2133 (void) fletcher_4_incremental_native(buf,
2134 drr.drr_payloadlen, &zcksum);
2135 }
2136
2137 err = nvlist_unpack(buf, drr.drr_payloadlen,
2138 &nvl, KM_SLEEP);
2139 kmem_free(buf, drr.drr_payloadlen);
2140
2141 if (err != 0) {
2142 err = SET_ERROR(EBADMSG);
2143 goto out;
2144 }
2145
2146 err = nvlist_lookup_nvlist(nvl, "fss", &nvfs);
2147 if (err != 0) {
2148 err = SET_ERROR(EBADMSG);
2149 goto out_nvl;
2150 }
2151
2152 err = fsavl_create(nvfs, &fsavl);
2153 if (err != 0) {
2154 err = SET_ERROR(EBADMSG);
2155 goto out_nvl;
2156 }
2157 }
2158
2159 /* Check end of stream marker */
2160 err = dmu_krrp_buffer_read(&drr, sizeof (drr), krrp_task);
2161 if (drr.drr_type != DRR_END &&
2162 drr.drr_type != BSWAP_32(DRR_END)) {
2163 err = SET_ERROR(EBADMSG);
2164 goto out_nvl;
2165 }
2166
2167 if (err == 0 && krrp_task->buffer_args.force_cksum &&
2168 !ZIO_CHECKSUM_EQUAL(drr.drr_u.drr_end.drr_checksum,
2169 zcksum)) {
2170 err = SET_ERROR(ECKSUM);
2171 goto out_nvl;
2172 }
2173
2174 /* process all substeams from stream */
2175 for (;;) {
2176 nvlist_t *fs_props = NULL, *snap_props = NULL;
2177 boolean_t free_fs_props = B_FALSE;
2178 char ds[ZFS_MAX_DATASET_NAME_LEN];
2179 char *at;
2180
2181 err = dmu_krrp_buffer_read(&drr,
2182 sizeof (drr), krrp_task);
2183 if (err != 0)
2184 break;
2185
2186 if (drr.drr_type == DRR_END ||
2187 drr.drr_type == BSWAP_32(DRR_END))
2188 break;
2189
2190 if (drr.drr_type != DRR_BEGIN ||
2191 (drrb->drr_magic != DMU_BACKUP_MAGIC &&
2192 drrb->drr_magic != BSWAP_64(DMU_BACKUP_MAGIC))) {
2193 err = SET_ERROR(EBADMSG);
2194 break;
2195 }
2196
2197 if (strlen(krrp_task->buffer_args.to_ds) +
2198 strlen(drrb->drr_toname + baselen) >= sizeof (ds)) {
2199 err = SET_ERROR(ENAMETOOLONG);
2200 break;
2201 }
2202
2203 (void) snprintf(ds, sizeof (ds), "%s%s",
2204 krrp_task->buffer_args.to_ds,
2205 drrb->drr_toname + baselen);
2206 if (nvfs != NULL) {
2207 char *snapname;
2208 nvlist_t *snapprops;
2209 nvlist_t *fs;
2210
2211 fs = fsavl_find(fsavl, drrb->drr_toguid,
2212 &snapname);
2213 err = nvlist_lookup_nvlist(fs,
2214 "props", &fs_props);
2215 if (err != 0) {
2216 if (err != ENOENT) {
2217 err = SET_ERROR(err);
2218 break;
2219 }
2220
2221 err = 0;
2222 fs_props = fnvlist_alloc();
2223 free_fs_props = B_TRUE;
2224 }
2225
2226 if (nvlist_lookup_nvlist(fs,
2227 "snapprops", &snapprops) == 0) {
2228 err = nvlist_lookup_nvlist(snapprops,
2229 snapname, &snap_props);
2230 if (err != 0) {
2231 err = SET_ERROR(err);
2232 break;
2233 }
2234 }
2235 }
2236
2237 (void) strlcpy(latest_snap, ds, sizeof (latest_snap));
2238 at = strrchr(ds, '@');
2239 *at = '\0';
2240 (void) strlcpy(krrp_task->cookie, drrb->drr_toname,
2241 sizeof (krrp_task->cookie));
2242 err = zfs_recv_one_ds(spa, ds, &drr, fs_props,
2243 snap_props, krrp_task);
2244 if (free_fs_props)
2245 fnvlist_free(fs_props);
2246
2247 if (err != 0)
2248 break;
2249 }
2250
2251 out_nvl:
2252 if (nvl != NULL) {
2253 fsavl_destroy(fsavl);
2254 fnvlist_free(nvl);
2255 }
2256 }
2257
2258 /* Put final block */
2259 if (err == 0)
2260 (void) dmu_krrp_put_buffer(krrp_task);
2261
2262 out:
2263 dmu_set_send_recv_error(krrp_task_void, err);
2264 if (err != 0) {
2265 cmn_err(CE_WARN, "Recv thread exited with "
2266 "error code %d", err);
2267 }
2268
2269 if (spa != NULL)
2270 spa_close(spa, krrp_task);
2271
2272 (void) dmu_krrp_fini_task(krrp_task);
2273 }
2274
2275 /* Common send/recv entry point */
2276 static void *
2277 dmu_krrp_init_send_recv(void (*func)(void *), kreplication_zfs_args_t *args)
2278 {
2279 dmu_krrp_task_t *krrp_task =
2280 kmem_zalloc(sizeof (dmu_krrp_task_t), KM_SLEEP);
2281 dmu_krrp_stream_t *stream = args->stream_handler;
2282
2283 krrp_task->stream_handler = stream;
2284 krrp_task->buffer_args = *args;
2285 cv_init(&krrp_task->buffer_state_cv, NULL, CV_DEFAULT, NULL);
2286 cv_init(&krrp_task->buffer_destroy_cv, NULL, CV_DEFAULT, NULL);
2287 mutex_init(&krrp_task->buffer_state_lock, NULL,
2288 MUTEX_DEFAULT, NULL);
2289
2290 mutex_enter(&stream->mtx);
2291 if (!stream->running) {
2292 cmn_err(CE_WARN, "Cannot dispatch send/recv task");
2293 mutex_destroy(&krrp_task->buffer_state_lock);
2294 cv_destroy(&krrp_task->buffer_state_cv);
2295 cv_destroy(&krrp_task->buffer_destroy_cv);
2296 kmem_free(krrp_task, sizeof (dmu_krrp_task_t));
2297
2298 mutex_exit(&stream->mtx);
2299 return (NULL);
2300 }
2301
2302 stream->task = krrp_task;
2303 stream->task_executor = func;
2304 cv_broadcast(&stream->cv);
2305 mutex_exit(&stream->mtx);
2306
2307 return (krrp_task);
2308 }
2309
2310 void *
2311 dmu_krrp_init_send_task(void *args)
2312 {
2313 kreplication_zfs_args_t *zfs_args = args;
2314 ASSERT(zfs_args != NULL);
2315 *zfs_args->to_ds = '\0';
2316 return (dmu_krrp_init_send_recv(zfs_send_thread, zfs_args));
2317 }
2318
2319 void *
2320 dmu_krrp_init_recv_task(void *args)
2321 {
2322 kreplication_zfs_args_t *zfs_args = args;
2323 ASSERT(zfs_args != NULL);
2324 *zfs_args->from_ds = '\0';
2325 return (dmu_krrp_init_send_recv(zfs_recv_thread, zfs_args));
2326 }
2327
2328 static void
2329 dmu_set_send_recv_error(void *krrp_task_void, int err)
2330 {
2331 dmu_krrp_task_t *krrp_task = krrp_task_void;
2332
2333 ASSERT(krrp_task != NULL);
2334
2335 mutex_enter(&krrp_task->buffer_state_lock);
2336 krrp_task->buffer_error = err;
2337 mutex_exit(&krrp_task->buffer_state_lock);
2338 }
2339
2340 /*
2341 * Finalize send/recv task
2342 * Finalization is two step process, both sides should finalize stream in order
2343 * to proceed. Finalization is an execution barier - a thread which ends first
2344 * will wait for another
2345 */
2346 int
2347 dmu_krrp_fini_task(void *krrp_task_void)
2348 {
2349 dmu_krrp_task_t *krrp_task = krrp_task_void;
2350 int error;
2351
2352 ASSERT(krrp_task != NULL);
2353
2354 mutex_enter(&krrp_task->buffer_state_lock);
2355 if (krrp_task->buffer_state == SBS_DESTROYED) {
2356 cv_signal(&krrp_task->buffer_destroy_cv);
2357 error = krrp_task->buffer_error;
2358 mutex_exit(&krrp_task->buffer_state_lock);
2359 } else {
2360 krrp_task->buffer_state = SBS_DESTROYED;
2361 cv_signal(&krrp_task->buffer_state_cv);
2362 cv_wait(&krrp_task->buffer_destroy_cv,
2363 &krrp_task->buffer_state_lock);
2364 error = krrp_task->buffer_error;
2365 mutex_exit(&krrp_task->buffer_state_lock);
2366 mutex_destroy(&krrp_task->buffer_state_lock);
2367 cv_destroy(&krrp_task->buffer_state_cv);
2368 cv_destroy(&krrp_task->buffer_destroy_cv);
2369 if (krrp_task->buffer_args.resume_info != NULL)
2370 fnvlist_free(krrp_task->buffer_args.resume_info);
2371
2372 kmem_free(krrp_task, sizeof (dmu_krrp_task_t));
2373 }
2374
2375 return (error);
2376 }
2377
2378 /* Wait for a lent buffer */
2379 static int
2380 dmu_krrp_get_buffer(void *krrp_task_void)
2381 {
2382 dmu_krrp_task_t *krrp_task = krrp_task_void;
2383
2384 ASSERT(krrp_task != NULL);
2385
2386 mutex_enter(&krrp_task->buffer_state_lock);
2387 while (krrp_task->buffer_state != SBS_AVAIL) {
2388 if (krrp_task->buffer_state == SBS_DESTROYED) {
2389 mutex_exit(&krrp_task->buffer_state_lock);
2390 return (SET_ERROR(ENOMEM));
2391 }
2392 DTRACE_PROBE(wait_for_buffer);
2393 (void) cv_timedwait(&krrp_task->buffer_state_cv,
2394 &krrp_task->buffer_state_lock,
2395 ddi_get_lbolt() + zfs_send_timeout * hz);
2396 DTRACE_PROBE(wait_for_buffer_end);
2397 }
2398 krrp_task->buffer_state = SBS_USED;
2399 mutex_exit(&krrp_task->buffer_state_lock);
2400
2401 return (0);
2402 }
2403
2404 /* Return buffer to transport */
2405 static int
2406 dmu_krrp_put_buffer(void *krrp_task_void)
2407 {
2408 dmu_krrp_task_t *krrp_task = krrp_task_void;
2409
2410 ASSERT(krrp_task != NULL);
2411
2412 mutex_enter(&krrp_task->buffer_state_lock);
2413 if (krrp_task->buffer_state != SBS_USED) {
2414 mutex_exit(&krrp_task->buffer_state_lock);
2415 return (0);
2416 }
2417 krrp_task->buffer_state = SBS_DONE;
2418 krrp_task->is_full = (krrp_task->buffer == NULL);
2419 krrp_task->buffer = NULL;
2420 cv_signal(&krrp_task->buffer_state_cv);
2421 mutex_exit(&krrp_task->buffer_state_lock);
2422
2423 return (0);
2424 }
2425
2426 /* Common entry point for lending buffer */
2427 static int
2428 dmu_krrp_lend_buffer(void *krrp_task_void,
2429 kreplication_buffer_t *buffer, boolean_t recv)
2430 {
2431 dmu_krrp_task_t *krrp_task = krrp_task_void;
2432 boolean_t full;
2433
2434 ASSERT(krrp_task != NULL);
2435 ASSERT(buffer != NULL);
2436 ASSERT(krrp_task->buffer == NULL);
2437
2438 mutex_enter(&krrp_task->buffer_state_lock);
2439 if (krrp_task->buffer_state == SBS_DESTROYED) {
2440 int error = krrp_task->buffer_error;
2441 mutex_exit(&krrp_task->buffer_state_lock);
2442 if (error)
2443 return (error);
2444 if (recv)
2445 return (E2BIG);
2446 return (ENODATA);
2447 }
2448 krrp_task->buffer = buffer;
2449 krrp_task->buffer_state = SBS_AVAIL;
2450 krrp_task->buffer_bytes_read = 0;
2451 krrp_task->is_read = B_FALSE;
2452 krrp_task->is_full = B_FALSE;
2453 cv_signal(&krrp_task->buffer_state_cv);
2454 while (krrp_task->buffer_state != SBS_DONE) {
2455 if (krrp_task->buffer_state == SBS_DESTROYED) {
2456 int error = krrp_task->buffer_error;
2457 full = krrp_task->is_full;
2458 mutex_exit(&krrp_task->buffer_state_lock);
2459 if (error)
2460 return (error);
2461 if (recv && !krrp_task->is_read)
2462 return (E2BIG);
2463 return ((recv || full) ? 0 : ENODATA);
2464 }
2465 DTRACE_PROBE(wait_for_data);
2466 (void) cv_timedwait(&krrp_task->buffer_state_cv,
2467 &krrp_task->buffer_state_lock,
2468 ddi_get_lbolt() + zfs_send_timeout * hz);
2469 DTRACE_PROBE(wait_for_data_end);
2470 }
2471 krrp_task->buffer = NULL;
2472 full = krrp_task->is_full;
2473 mutex_exit(&krrp_task->buffer_state_lock);
2474
2475 return ((recv || full) ? 0 : ENODATA);
2476 }
2477
2478 int
2479 dmu_krrp_lend_send_buffer(void *krrp_task_void, kreplication_buffer_t *buffer)
2480 {
2481 ASSERT(buffer != NULL);
2482 kreplication_buffer_t *iter;
2483 for (iter = buffer; iter != NULL; iter = iter->next)
2484 iter->data_size = 0;
2485 return (dmu_krrp_lend_buffer(krrp_task_void, buffer, B_FALSE));
2486 }
2487
2488 int
2489 dmu_krrp_lend_recv_buffer(void *krrp_task_void, kreplication_buffer_t *buffer)
2490 {
2491 ASSERT(buffer != NULL);
2492 return (dmu_krrp_lend_buffer(krrp_task_void, buffer, B_TRUE));
2493 }
2494
2495 /*
2496 * FIXME: Temporary disabled because this logic
2497 * needs to be adjusted according to ARC-Compression changes
2498 */
2499 /* ARGSUSED */
2500 int
2501 dmu_krrp_direct_arc_read(spa_t *spa, dmu_krrp_task_t *krrp_task,
2502 zio_cksum_t *zc, const blkptr_t *bp)
2503 {
2504 return (ENODATA);
2505
2506 #if 0
2507 int error;
2508 dmu_krrp_arc_bypass_t bypass = {
2509 .krrp_task = krrp_task,
2510 .zc = zc,
2511 .cb = dmu_krrp_buffer_write,
2512 };
2513
2514 error = arc_io_bypass(spa, bp, dmu_krrp_arc_bypass, &bypass);
2515 if (error == 0) {
2516 DTRACE_PROBE(krrp_send_arc_bypass);
2517 } else if (error == ENODATA) {
2518 DTRACE_PROBE(krrp_send_disk_read);
2519 return (error);
2520 }
2521
2522 if (error != 0) {
2523 DTRACE_PROBE1(orig_error, int, error);
2524 error = SET_ERROR(EINTR);
2525 }
2526
2527 return (error);
2528 #endif
2529 }
2530
2531 static int
2532 dmu_krrp_validate_resume_info(nvlist_t *resume_info)
2533 {
2534 char *toname = NULL;
2535 uint64_t resumeobj = 0, resumeoff = 0, bytes = 0, toguid = 0;
2536
2537 if (nvlist_lookup_string(resume_info, "toname", &toname) != 0 ||
2538 nvlist_lookup_uint64(resume_info, "object", &resumeobj) != 0 ||
2539 nvlist_lookup_uint64(resume_info, "offset", &resumeoff) != 0 ||
2540 nvlist_lookup_uint64(resume_info, "bytes", &bytes) != 0 ||
2541 nvlist_lookup_uint64(resume_info, "toguid", &toguid) != 0)
2542 return (SET_ERROR(EINVAL));
2543
2544 return (0);
2545 }
2546
2547 int
2548 dmu_krrp_decode_resume_token(const char *resume_token, nvlist_t **resume_info)
2549 {
2550 nvlist_t *nvl = NULL;
2551 int err;
2552
2553 err = zfs_send_resume_token_to_nvlist_impl(resume_token, &nvl);
2554 if (err != 0)
2555 return (err);
2556
2557 err = dmu_krrp_validate_resume_info(nvl);
2558 if (err != 0)
2559 return (err);
2560
2561 ASSERT(resume_info != NULL && *resume_info == NULL);
2562 *resume_info = nvl;
2563 return (0);
2564 }