1 /*
2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
5 * 1.0 of the CDDL.
6 *
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
10 */
11
12 /*
13 * Copyright 2018 Nexenta Systems, Inc. All rights reserved.
14 */
15
16 /*
17 * Stream module
18 */
19
20 #include <sys/types.h>
21 #include <sys/kmem.h>
22 #include <sys/ddi.h>
23 #include <sys/sunddi.h>
24 #include <sys/sunldi.h>
25 #include <sys/time.h>
26 #include <sys/sdt.h>
27 #include <sys/sysmacros.h>
28 #include <sys/modctl.h>
29 #include <sys/class.h>
30 #include <sys/cmn_err.h>
31
32 #include "krrp_stream.h"
33
34 #define is_str_empty(str) (str[0] == '\0')
35
36 #define copy_str(dst_str, src_str, dst_str_max_sz) \
37 ((strlcpy(dst_str, src_str, dst_str_max_sz) < dst_str_max_sz) ? 0 : -1)
38
39 /* #define KRRP_STREAM_DEBUG 1 */
40
41 /*
42 * This variable is a safeguard for the continuous replication sessions.
43 * It defines additional number of read-tasks per one session.
44 * Each read-task is an autosnapshot for such sessions.
45 * When the total number of read-tasks reach value that equal
46 * keep_snaps + krrp_add_num_read_tasks then the corresponding
47 * session will stop confirmation of create-requests from Autosnap.
48 */
49 size_t krrp_add_num_read_tasks = 5;
50
51
52 /* These extern functions are part of ZFS sources */
53 extern int wbc_check_dataset(const char *name);
54 extern uint64_t dsl_dataset_creation_txg(const char *name);
55 extern int dmu_krrp_decode_resume_token(const char *resume_token,
56 nvlist_t **resume_info);
57
58
59 typedef void (krrp_stream_handler_t)(void *);
60
61 static krrp_stream_t *krrp_stream_common_create(void);
62 static void krrp_stream_task_done(krrp_stream_t *, krrp_stream_task_t *,
63 boolean_t);
64 static void krrp_stream_callback(krrp_stream_t *, krrp_stream_cb_ev_t,
65 uintptr_t);
66
67 static void krrp_stream_calc_avg_rpo(krrp_stream_t *, krrp_stream_task_t *,
68 boolean_t);
69 static void krrp_stream_read(void *);
70 static void krrp_stream_write(void *);
71
72 static int krrp_stream_activate_autosnap(krrp_stream_t *stream,
73 krrp_error_t *error);
74 static void krrp_stream_autosnap_restore_cb(void *void_stream,
75 const char *snap_name, uint64_t txg);
76
77 static int krrp_stream_validate_run(krrp_stream_t *stream,
78 krrp_error_t *error);
79
80 static uint64_t krrp_stream_get_snap_txg(krrp_stream_t *stream,
81 const char *short_snap_name);
82
83 static boolean_t
84 krrp_stream_check_mem(size_t required_mem, void *void_stream);
85
86 #if 0
87 static void krrp_stream_debug(const char *, void *, void *, void *, void *);
88 #endif
89
90 static void krrp_stream_snap_create_error_cb(const char *, int,
91 uint64_t, void *);
92 static boolean_t krrp_stream_read_snap_confirm_cb(const char *, boolean_t,
93 uint64_t, void *);
94 static boolean_t krrp_stream_write_snap_notify_cb(const char *, boolean_t,
95 boolean_t, uint64_t, uint64_t, void *);
96 static boolean_t krrp_stream_read_snap_notify_cb(const char *, boolean_t,
97 boolean_t, uint64_t, uint64_t, void *);
98
99 int
100 krrp_stream_read_create(krrp_stream_t **result_stream,
101 size_t keep_snaps, const char *dataset, const char *base_snap_name,
102 const char *incr_snap_name, const char *resume_token,
103 krrp_stream_read_flag_t flags, const char *skip_snaps_mask,
104 krrp_error_t *error)
105 {
106 krrp_stream_t *stream;
107 int rc;
108
109 VERIFY(result_stream != NULL && *result_stream == NULL);
110 VERIFY(dataset != NULL);
111
112 stream = krrp_stream_common_create();
113
114 stream->notify_txg = UINT64_MAX;
115 stream->mode = KRRP_STRMM_READ;
116 stream->recursive =
117 krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_RECURSIVE);
118 stream->keep_snaps = keep_snaps;
119
120 rc = copy_str(stream->dataset, dataset, sizeof (stream->dataset));
121 if (rc != 0 || is_str_empty(dataset)) {
122 krrp_error_set(error, KRRP_ERRNO_SRCDS, EINVAL);
123 goto err;
124 }
125
126 /* Source and Common snapshots must not be equal */
127 if (base_snap_name != NULL && incr_snap_name != NULL &&
128 strcmp(base_snap_name, incr_snap_name) == 0) {
129 krrp_error_set(error, KRRP_ERRNO_STREAM, EINVAL);
130 goto err;
131 }
132
133 /*
134 * If base_snap_name is defined then the stream will be non_continuous,
135 * that means: only one task will be processed
136 */
137 if (base_snap_name != NULL) {
138 stream->non_continuous = B_TRUE;
139
140 rc = copy_str(stream->base_snap_name, base_snap_name,
141 sizeof (stream->base_snap_name));
142 if (rc != 0 || is_str_empty(base_snap_name)) {
143 krrp_error_set(error, KRRP_ERRNO_SRCSNAP, EINVAL);
144 goto err;
145 }
146
147 if (resume_token != NULL) {
148 rc = dmu_krrp_decode_resume_token(resume_token,
149 &stream->resume_info);
150 if (rc != 0) {
151 krrp_error_set(error,
152 KRRP_ERRNO_RESUMETOKEN, rc);
153 goto err;
154 }
155 }
156 }
157
158 if (incr_snap_name != NULL) {
159 rc = copy_str(stream->incr_snap_name, incr_snap_name,
160 sizeof (stream->incr_snap_name));
161 if (rc != 0 || is_str_empty(incr_snap_name)) {
162 krrp_error_set(error, KRRP_ERRNO_CMNSNAP, EINVAL);
163 goto err;
164 }
165 }
166
167 rc = krrp_stream_te_read_create(&stream->task_engine,
168 stream->dataset, flags, &krrp_stream_check_mem,
169 stream, skip_snaps_mask, error);
170 if (rc != 0)
171 goto err;
172
173 krrp_autosnap_rside_create(&stream->autosnap,
174 stream->keep_snaps, stream->dataset, stream->recursive);
175
176 *result_stream = stream;
177
178 return (0);
179
180 err:
181 krrp_stream_destroy(stream);
182
183 return (-1);
184 }
185
186 int
187 krrp_stream_write_create(krrp_stream_t **result_stream,
188 size_t keep_snaps, const char *dataset, const char *incr_snap_name,
189 const char *resume_token, krrp_stream_write_flag_t flags,
190 nvlist_t *ignore_props_list, nvlist_t *replace_props_list,
191 krrp_error_t *error)
192 {
193 krrp_stream_t *stream;
194 int rc;
195
196 VERIFY(result_stream != NULL && *result_stream == NULL);
197 VERIFY(dataset != NULL);
198
199 stream = krrp_stream_common_create();
200
201 stream->mode = KRRP_STRMM_WRITE;
202 stream->keep_snaps = keep_snaps;
203
204 rc = copy_str(stream->dataset, dataset, sizeof (stream->dataset));
205 if (rc != 0 || is_str_empty(dataset)) {
206 krrp_error_set(error, KRRP_ERRNO_DSTDS, EINVAL);
207 goto err;
208 }
209
210 if (resume_token != NULL) {
211 rc = dmu_krrp_decode_resume_token(resume_token,
212 &stream->resume_info);
213 if (rc != 0) {
214 krrp_error_set(error,
215 KRRP_ERRNO_RESUMETOKEN, rc);
216 goto err;
217 }
218 }
219
220 if (incr_snap_name != NULL) {
221 rc = copy_str(stream->incr_snap_name, incr_snap_name,
222 sizeof (stream->incr_snap_name));
223 if (rc != 0 || is_str_empty(incr_snap_name)) {
224 krrp_error_set(error, KRRP_ERRNO_CMNSNAP, EINVAL);
225 goto err;
226 }
227 }
228
229 rc = krrp_stream_te_write_create(&stream->task_engine,
230 stream->dataset, flags, ignore_props_list,
231 replace_props_list, error);
232 if (rc != 0)
233 goto err;
234
235 krrp_autosnap_wside_create(&stream->autosnap,
236 stream->keep_snaps, stream->dataset);
237
238 *result_stream = stream;
239
240 return (0);
241
242 err:
243 krrp_stream_destroy(stream);
244
245 return (-1);
246 }
247
248 int
249 krrp_stream_fake_read_create(krrp_stream_t **result_stream,
250 uint64_t fake_data_sz, krrp_error_t *error)
251 {
252 int rc = -1;
253 krrp_stream_t *stream;
254
255 VERIFY(result_stream != NULL && *result_stream == NULL);
256
257 if (fake_data_sz == 0) {
258 krrp_error_set(error, KRRP_ERRNO_FAKEDSZ, EINVAL);
259 goto out;
260 }
261
262 stream = krrp_stream_common_create();
263
264 stream->mode = KRRP_STRMM_READ;
265 stream->fake_mode = B_TRUE;
266 stream->non_continuous = B_TRUE;
267 stream->fake_data_sz = fake_data_sz;
268
269 /* Fake never fails */
270 VERIFY(krrp_stream_te_fake_read_create(&stream->task_engine,
271 error) == 0);
272
273 *result_stream = stream;
274 rc = 0;
275
276 out:
277 return (rc);
278 }
279
280 int
281 krrp_stream_fake_write_create(krrp_stream_t **result_stream,
282 krrp_error_t *error)
283 {
284 krrp_stream_t *stream;
285
286 VERIFY(result_stream != NULL && *result_stream == NULL);
287
288 stream = krrp_stream_common_create();
289
290 stream->mode = KRRP_STRMM_WRITE;
291 stream->fake_mode = B_TRUE;
292 stream->non_continuous = B_TRUE;
293
294 /* Fake never fails */
295 VERIFY(krrp_stream_te_fake_write_create(&stream->task_engine,
296 error) == 0);
297
298 *result_stream = stream;
299
300 return (0);
301 }
302
303 static krrp_stream_t *
304 krrp_stream_common_create(void)
305 {
306 krrp_stream_t *stream;
307
308 stream = kmem_zalloc(sizeof (krrp_stream_t), KM_SLEEP);
309
310 mutex_init(&stream->mtx, NULL, MUTEX_DEFAULT, NULL);
311 cv_init(&stream->cv, NULL, CV_DEFAULT, NULL);
312
313 stream->state = KRRP_STRMS_CREATED;
314
315 return (stream);
316 }
317
318 void
319 krrp_stream_destroy(krrp_stream_t *stream)
320 {
321 krrp_stream_lock(stream);
322
323 stream->state = KRRP_STRMS_STOPPED;
324 while (stream->work_thread != NULL)
325 krrp_stream_cv_wait(stream);
326
327 if (stream->autosnap != NULL)
328 krrp_autosnap_destroy(stream->autosnap);
329
330 if (stream->task_engine != NULL)
331 krrp_stream_te_destroy(stream->task_engine);
332
333 if (stream->resume_info != NULL)
334 fnvlist_free(stream->resume_info);
335
336 krrp_stream_unlock(stream);
337
338 cv_destroy(&stream->cv);
339 mutex_destroy(&stream->mtx);
340
341 kmem_free(stream, sizeof (krrp_stream_t));
342 }
343
344 static boolean_t
345 krrp_stream_check_mem(size_t required_mem, void *void_stream)
346 {
347 krrp_stream_t *stream = void_stream;
348 size_t available_mem =
349 krrp_pdu_engine_get_free_mem(stream->data_pdu_engine);
350
351 if (available_mem < required_mem)
352 return (B_FALSE);
353
354 return (B_TRUE);
355 }
356
357 void
358 krrp_stream_register_callback(krrp_stream_t *stream,
359 krrp_stream_cb_t *ev_cb, void *ev_cb_arg)
360 {
361 VERIFY(ev_cb != NULL);
362
363 krrp_stream_lock(stream);
364 VERIFY(stream->state == KRRP_STRMS_CREATED);
365
366 stream->state = KRRP_STRMS_READY_TO_RUN;
367 stream->callback = ev_cb;
368 stream->callback_arg = ev_cb_arg;
369
370 krrp_stream_unlock(stream);
371 }
372
373 int
374 krrp_stream_run(krrp_stream_t *stream, krrp_queue_t *write_data_queue,
375 krrp_pdu_engine_t *data_pdu_engine, krrp_error_t *error)
376 {
377 int rc = -1;
378
379 VERIFY(data_pdu_engine != NULL);
380 VERIFY(data_pdu_engine->type == KRRP_PET_DATA);
381 VERIFY(write_data_queue != NULL);
382
383 krrp_stream_lock(stream);
384 VERIFY(stream->state == KRRP_STRMS_READY_TO_RUN);
385
386 stream->data_pdu_engine = data_pdu_engine;
387 stream->write_data_queue = write_data_queue;
388
389 if (!stream->fake_mode) {
390 if (krrp_stream_validate_run(stream, error) != 0)
391 goto out;
392
393 if (krrp_stream_activate_autosnap(stream, error) != 0)
394 goto out;
395 }
396
397 /* thread_create never fails */
398 switch (stream->mode) {
399 case KRRP_STRMM_READ:
400 if (stream->fake_mode)
401 krrp_stream_fake_read_task_init(stream->task_engine,
402 stream->fake_data_sz);
403
404 stream->work_thread = thread_create(NULL, 0, &krrp_stream_read,
405 stream, 0, &p0, TS_RUN, minclsyspri);
406 break;
407 case KRRP_STRMM_WRITE:
408 stream->work_thread = thread_create(NULL, 0, &krrp_stream_write,
409 stream, 0, &p0, TS_RUN, minclsyspri);
410 break;
411 }
412
413 while (stream->state == KRRP_STRMS_READY_TO_RUN)
414 krrp_stream_cv_wait(stream);
415
416 rc = 0;
417
418 out:
419 krrp_stream_unlock(stream);
420 return (rc);
421 }
422
423 void
424 krrp_stream_stop(krrp_stream_t *stream)
425 {
426 krrp_stream_lock(stream);
427 VERIFY(stream->state == KRRP_STRMS_ACTIVE ||
428 stream->state == KRRP_STRMS_IN_ERROR);
429
430 stream->state = KRRP_STRMS_STOPPED;
431 krrp_stream_cv_broadcast(stream);
432
433 if (!stream->non_continuous)
434 krrp_autosnap_deactivate(stream->autosnap);
435
436 krrp_stream_unlock(stream);
437 }
438
439 int
440 krrp_stream_send_stop(krrp_stream_t *stream)
441 {
442 int rc = -1;
443
444 ASSERT(stream->mode == KRRP_STRMM_READ);
445 ASSERT(!stream->non_continuous);
446
447 if (stream->notify_txg == UINT64_MAX) {
448 krrp_autosnap_create_snapshot(stream->autosnap);
449
450 /*
451 * To deactivate autosnap-logic need to be sure
452 * that an autosnap has been created
453 *
454 * Autosnap-service may delay creation of snapshot,
455 * so here we may wait for some time (1-2 transactions)
456 */
457 krrp_stream_lock(stream);
458
459 stream->wait_for_snap = B_TRUE;
460 while (stream->wait_for_snap &&
461 stream->state == KRRP_STRMS_ACTIVE)
462 krrp_stream_cv_wait(stream);
463
464 krrp_stream_unlock(stream);
465
466 krrp_autosnap_deactivate(stream->autosnap);
467
468 rc = 0;
469 }
470
471 return (rc);
472 }
473
474 static int
475 krrp_stream_validate_run(krrp_stream_t *stream, krrp_error_t *error)
476 {
477 int rc = -1;
478
479 /*
480 * The SOURCE datasets must exist always
481 *
482 * The DESTINATION dataset may not exist, that is allowed,
483 * but if incr_snap_name is defined therefore this replication
484 * is incremental and destination dataset must exist, exclude
485 * the cases if the destination dataset is a ZFS pool (the name
486 * does not contain '/'). For all other cases the parent of
487 * the destination dataset must exist.
488 */
489 if (dsl_dataset_creation_txg(stream->dataset) == UINT64_MAX) {
490 if (stream->mode == KRRP_STRMM_READ) {
491 krrp_error_set(error, KRRP_ERRNO_SRCDS, ENOENT);
492 goto out;
493 } else {
494 char *p;
495 boolean_t parent_exists = B_TRUE;
496 boolean_t dst_is_pool = B_TRUE;
497
498 p = strrchr(stream->dataset, '/');
499 if (p != NULL) {
500 *p = '\0';
501 parent_exists =
502 dsl_dataset_creation_txg(stream->dataset) !=
503 UINT64_MAX;
504 *p = '/';
505 dst_is_pool = B_FALSE;
506 }
507
508 if (dst_is_pool || !parent_exists ||
509 strlen(stream->incr_snap_name) != 0) {
510 krrp_error_set(error, KRRP_ERRNO_DSTDS, ENOENT);
511 goto out;
512 }
513 }
514 }
515
516 rc = 0;
517 if (stream->mode == KRRP_STRMM_READ && !stream->non_continuous) {
518 rc = wbc_check_dataset(stream->dataset);
519 if (rc == 0 || rc == ENOTACTIVE)
520 rc = 0;
521 else
522 krrp_error_set(error, KRRP_ERRNO_STREAM, rc);
523 }
524
525 out:
526 return (rc);
527 }
528
529 static int
530 krrp_stream_activate_autosnap(krrp_stream_t *stream,
531 krrp_error_t *error)
532 {
533 uint64_t incr_snap_txg = UINT64_MAX;
534 int rc = -1;
535
536 if (strlen(stream->incr_snap_name) != 0) {
537 incr_snap_txg = krrp_stream_get_snap_txg(stream,
538 stream->incr_snap_name);
539 if (incr_snap_txg == UINT64_MAX) {
540 krrp_error_set(error, KRRP_ERRNO_CMNSNAP, ENOENT);
541 goto out;
542 }
543 }
544
545 switch (stream->mode) {
546 case KRRP_STRMM_READ:
547 if (!stream->non_continuous) {
548 rc = krrp_autosnap_activate(stream->autosnap, incr_snap_txg,
549 &krrp_stream_read_snap_confirm_cb,
550 &krrp_stream_read_snap_notify_cb,
551 &krrp_stream_snap_create_error_cb,
552 &krrp_stream_autosnap_restore_cb,
553 stream, error);
554 if (rc != 0)
555 goto out;
556
557 /*
558 * Autosnap does snapshots only in case of I/O
559 * to the dataset, so if user has some data on
560 * the dataset, but does not have I/O the available
561 * data will not be replicated. To exclude this case
562 * need to ask autosnap to create snapshot
563 */
564 krrp_autosnap_create_snapshot(stream->autosnap);
565 } else {
566 uint64_t base_snap_txg;
567
568 VERIFY(stream->base_snap_name[0] != '\0');
569
570 base_snap_txg = krrp_stream_get_snap_txg(stream,
571 stream->base_snap_name);
572 if (base_snap_txg == UINT64_MAX) {
573 krrp_error_set(error,
574 KRRP_ERRNO_SRCSNAP, ENOENT);
575 goto out;
576 }
577
578 krrp_stream_read_task_init(stream->task_engine,
579 base_snap_txg, stream->base_snap_name,
580 stream->incr_snap_name, stream->resume_info);
581
582 /*
583 * Non-continuous replication sends only one snapshot.
584 * We remember TXG of this snapshot and will notify
585 * userspace that the snapshot successfully received
586 */
587 stream->notify_txg = base_snap_txg;
588 }
589
590 break;
591 case KRRP_STRMM_WRITE:
592 rc = krrp_autosnap_activate(stream->autosnap, incr_snap_txg,
593 NULL, &krrp_stream_write_snap_notify_cb,
594 &krrp_stream_snap_create_error_cb, NULL, stream, error);
595 if (rc != 0)
596 goto out;
597
598 break;
599 }
600
601 rc = 0;
602
603 out:
604 return (rc);
605 }
606
607 static void
608 krrp_stream_autosnap_restore_cb(void *void_stream,
609 const char *snap_name, uint64_t txg)
610 {
611 krrp_stream_t *stream = void_stream;
612
613 VERIFY(stream->mode == KRRP_STRMM_READ);
614
615 krrp_stream_read_task_init(stream->task_engine, txg, snap_name,
616 stream->incr_snap_name, NULL);
617
618 (void) strlcpy(stream->incr_snap_name, snap_name,
619 sizeof (stream->incr_snap_name));
620 }
621
622 #if 0
623 static void
624 krrp_stream_debug(const char *msg, void *arg1, void *arg2,
625 void *arg3, void *arg4)
626 {
627 cmn_err(CE_PANIC, "Debug");
628 }
629 #endif
630
631 /*
632 * This function is called after a TXG confirmation
633 * has been received from the receiver
634 *
635 * There are two stages of receiving:
636 * - complete recv into krrp-buffers (complete == B_FALSE)
637 * - complete recv into ZFS (complete == B_TRUE)
638 */
639 void
640 krrp_stream_txg_confirmed(krrp_stream_t *stream, uint64_t txg,
641 boolean_t complete)
642 {
643 krrp_stream_task_t *task;
644
645 if (complete) {
646 DTRACE_PROBE1(krrp_txg_ack2, uint64_t, txg);
647
648 /* autosnap is used only by CDP sender */
649 if (!stream->non_continuous)
650 krrp_autosnap_txg_rele(stream->autosnap,
651 txg, AUTOSNAP_NO_SNAP);
652
653 stream->last_full_ack_txg = txg;
654
655 task = krrp_queue_get(stream->task_engine->tasks_done2);
656 krrp_stream_calc_avg_rpo(stream, task, B_TRUE);
657 krrp_stream_task_fini(task);
658
659 if (stream->notify_txg == txg) {
660 krrp_stream_callback(stream,
661 KRRP_STREAM_SEND_DONE, NULL);
662 }
663
664 return;
665 }
666
667 stream->last_ack_txg = txg;
668
669 task = krrp_queue_get(stream->task_engine->tasks_done);
670 ASSERT(task != NULL);
671
672 krrp_stream_calc_avg_rpo(stream, task, B_FALSE);
673 krrp_queue_put(stream->task_engine->tasks_done2, task);
674 }
675
676 static void
677 krrp_stream_calc_avg_rpo(krrp_stream_t *stream, krrp_stream_task_t *task,
678 boolean_t complete)
679 {
680 size_t i, avg_cnt;
681 uint64_t sum;
682 krrp_txg_rpo_t *avg_rpo;
683
684 if (complete)
685 avg_rpo = &stream->avg_total_rpo;
686 else
687 avg_rpo = &stream->avg_rpo;
688
689 avg_rpo->buf[avg_rpo->cnt] = krrp_stream_task_calc_rpo(task);
690 avg_rpo->cnt++;
691 avg_rpo->cnt %= 10;
692
693 sum = 0;
694 avg_cnt = 10;
695 for (i = 0; i < 10; i++) {
696 sum += avg_rpo->buf[i];
697 if (avg_rpo->buf[i] == 0)
698 avg_cnt--;
699 }
700
701 /* Average value in ms */
702 avg_rpo->value = sum / avg_cnt / 1000 / 1000;
703 }
704
705 /* ARGSUSED */
706 static void
707 krrp_stream_snap_create_error_cb(const char *snap_name, int err,
708 uint64_t txg, void *void_stream)
709 {
710 krrp_stream_t *stream;
711 krrp_error_t error;
712 boolean_t just_return = B_FALSE;
713
714 stream = void_stream;
715
716 krrp_stream_lock(stream);
717
718 if (stream->state == KRRP_STRMS_ACTIVE)
719 stream->state = KRRP_STRMS_IN_ERROR;
720 else
721 just_return = B_TRUE;
722
723 if (stream->wait_for_snap) {
724 stream->wait_for_snap = B_FALSE;
725 krrp_stream_cv_signal(stream);
726 }
727
728 krrp_stream_unlock(stream);
729
730 if (just_return)
731 return;
732
733 krrp_error_set(&error, KRRP_ERRNO_SNAPFAIL, err);
734 krrp_stream_callback(stream, KRRP_STREAM_ERROR,
735 (uintptr_t)&error);
736 }
737
738 /* ARGSUSED */
739 static boolean_t
740 krrp_stream_read_snap_confirm_cb(const char *snap_name, boolean_t recursive,
741 uint64_t txg, void *void_stream)
742 {
743 krrp_stream_t *stream = void_stream;
744 boolean_t result = B_FALSE;
745 size_t krrp_max_num_read_tasks =
746 stream->keep_snaps + krrp_add_num_read_tasks;
747
748 if (krrp_autosnap_try_hold_to_confirm(stream->autosnap)) {
749 size_t tasks =
750 krrp_stream_te_total_num_tasks(stream->task_engine);
751 if (tasks < krrp_max_num_read_tasks || stream->wait_for_snap)
752 result = B_TRUE;
753 else
754 result = B_FALSE;
755
756 krrp_autosnap_unhold(stream->autosnap);
757 }
758
759 return (result);
760 }
761
762 /*
763 * Autosnap snap_created callback for the Receiver
764 */
765 /* ARGSUSED */
766 static boolean_t
767 krrp_stream_write_snap_notify_cb(const char *snap_name, boolean_t recursive,
768 boolean_t autosnap, uint64_t txg, uint64_t unused, void *void_stream)
769 {
770 krrp_stream_t *stream;
771
772 stream = void_stream;
773
774 krrp_stream_lock(stream);
775
776 if (stream->cur_task == NULL) {
777 krrp_stream_unlock(stream);
778 return (B_FALSE);
779 }
780
781 if (stream->cur_task->txg_start == UINT64_MAX) {
782 stream->cur_task->txg_start = txg;
783 stream->cur_task->txg_end = AUTOSNAP_NO_SNAP;
784 } else {
785 stream->cur_task->txg_end = txg;
786 }
787
788 krrp_stream_unlock(stream);
789
790 return (B_TRUE);
791 }
792
793 /*
794 * Autosnap snap_created callback for the Sender
795 */
796 /* ARGSUSED */
797 static boolean_t
798 krrp_stream_read_snap_notify_cb(const char *snap_name, boolean_t recursive,
799 boolean_t autosnap, uint64_t txg, uint64_t unused, void *void_stream)
800 {
801 krrp_stream_t *stream = void_stream;
802 boolean_t result = B_FALSE;
803 size_t krrp_max_num_read_tasks =
804 stream->keep_snaps + krrp_add_num_read_tasks;
805
806 if (krrp_autosnap_try_hold_to_confirm(stream->autosnap)) {
807 size_t tasks =
808 krrp_stream_te_total_num_tasks(stream->task_engine);
809 if (tasks < krrp_max_num_read_tasks || stream->wait_for_snap) {
810 uint64_t cur_snap_txg;
811 char *cur_snap_name;
812
813 cur_snap_name = strchr(snap_name, '@');
814 ASSERT(cur_snap_name != NULL);
815 cur_snap_name++;
816 cur_snap_txg = txg;
817
818 krrp_stream_read_task_init(stream->task_engine,
819 cur_snap_txg, cur_snap_name,
820 stream->incr_snap_name, NULL);
821
822 (void) strlcpy(stream->incr_snap_name, cur_snap_name,
823 sizeof (stream->incr_snap_name));
824
825 result = B_TRUE;
826
827 if (stream->wait_for_snap) {
828 stream->wait_for_snap = B_FALSE;
829
830 /*
831 * This snapshot is the last that we will send.
832 * We remember its TXG and will notify userspace
833 * that the snapshot successfully received
834 */
835 stream->notify_txg = txg;
836
837 krrp_stream_lock(stream);
838 krrp_stream_cv_signal(stream);
839 krrp_stream_unlock(stream);
840 }
841 }
842
843 krrp_autosnap_unhold(stream->autosnap);
844 }
845
846 return (result);
847 }
848
849 /*
850 * The handler for READ STREAM
851 *
852 * Stream tasks are created by Autosnaper and pushed to tasks-queue
853 */
854 static void
855 krrp_stream_read(void *arg)
856 {
857 krrp_stream_t *stream = arg;
858
859 krrp_stream_task_t *stream_task = NULL;
860 int rc;
861 krrp_pdu_data_t *pdu = NULL;
862
863 VERIFY(stream->data_pdu_engine != NULL);
864
865 krrp_stream_lock(stream);
866 stream->state = KRRP_STRMS_ACTIVE;
867 krrp_stream_cv_signal(stream);
868
869 while (stream->state == KRRP_STRMS_ACTIVE) {
870 krrp_stream_unlock(stream);
871
872 if (pdu == NULL) {
873 DTRACE_PROBE(krrp_pdu_data_alloc_start);
874
875 krrp_pdu_alloc(stream->data_pdu_engine,
876 (krrp_pdu_t **)&pdu, KRRP_PDU_WITH_HDR);
877
878 DTRACE_PROBE(krrp_pdu_data_alloc_stop);
879
880 if (pdu == NULL) {
881 krrp_stream_lock(stream);
882 continue;
883 }
884 }
885
886 if (stream_task == NULL) {
887 krrp_stream_task_engine_get_task(stream->task_engine,
888 &stream_task);
889 if (stream_task == NULL) {
890 krrp_stream_lock(stream);
891 continue;
892 }
893
894 stream->cur_task = stream_task;
895 stream->cur_send_txg = stream_task->txg;
896 stream->cur_pdu = pdu;
897
898 stream_task->start(stream_task);
899
900 pdu->initial = B_TRUE;
901
902 DTRACE_PROBE1(krrp_stream_task_io_start, uint64_t,
903 stream_task->txg);
904 }
905
906 rc = stream_task->process(stream_task, pdu);
907
908 if (rc != 0) {
909 krrp_pdu_rele((krrp_pdu_t *)pdu);
910 pdu = NULL;
911 stream->cur_pdu = NULL;
912
913 krrp_stream_lock(stream);
914 if (stream->state == KRRP_STRMS_ACTIVE) {
915 krrp_error_t error;
916
917 stream->state = KRRP_STRMS_IN_ERROR;
918 krrp_stream_unlock(stream);
919
920 krrp_error_set(&error, KRRP_ERRNO_READFAIL, rc);
921 krrp_stream_callback(stream, KRRP_STREAM_ERROR,
922 (uintptr_t)&error);
923
924 krrp_stream_lock(stream);
925 }
926
927 break;
928 }
929
930 if (stream_task->done) {
931 DTRACE_PROBE1(krrp_stream_task_io_stop, uint64_t,
932 stream_task->txg);
933
934 krrp_stream_task_done(stream, stream_task, B_FALSE);
935 stream_task = NULL;
936 stream->cur_task = NULL;
937
938 stream->last_send_txg = stream->cur_send_txg;
939 stream->cur_send_txg = 0;
940 }
941
942 stream->bytes_processed += pdu->cur_data_sz;
943 krrp_stream_callback(stream, KRRP_STREAM_DATA_PDU,
944 (uintptr_t)pdu);
945
946 pdu = NULL;
947 stream->cur_pdu = NULL;
948 krrp_stream_lock(stream);
949 } /* while() loop */
950
951 stream->cur_task = NULL;
952 krrp_stream_unlock(stream);
953
954 if (pdu != NULL)
955 krrp_pdu_rele((krrp_pdu_t *)pdu);
956
957 if (stream_task != NULL)
958 krrp_stream_task_done(stream, stream_task, B_TRUE);
959
960 krrp_stream_lock(stream);
961
962 stream->work_thread = NULL;
963
964 krrp_stream_cv_broadcast(stream);
965 krrp_stream_unlock(stream);
966 thread_exit();
967 }
968
969 /*
970 * The handler for WRITE STREAM
971 *
972 * Stream tasks are created on intial PDU
973 */
974 static void
975 krrp_stream_write(void *arg)
976 {
977 krrp_stream_t *stream = arg;
978
979 krrp_pdu_data_t *pdu = NULL;
980 krrp_stream_task_t *stream_task = NULL;
981 int rc;
982
983 VERIFY(stream->write_data_queue != NULL);
984
985 #ifdef KRRP_STREAM_DEBUG
986 krrp_queue_init(&stream->debug_pdu_queue, sizeof (krrp_pdu_t),
987 offsetof(krrp_pdu_t, node));
988 #endif
989
990 krrp_stream_lock(stream);
991 stream->state = KRRP_STRMS_ACTIVE;
992 krrp_stream_cv_signal(stream);
993
994 while (stream->state == KRRP_STRMS_ACTIVE) {
995 krrp_stream_unlock(stream);
996
997 if (pdu == NULL) {
998 pdu = krrp_queue_get(stream->write_data_queue);
999 if (pdu == NULL) {
1000 krrp_stream_lock(stream);
1001 continue;
1002 }
1003 }
1004
1005 if (pdu->initial) {
1006 /* Replace by ASSERT */
1007 VERIFY(stream_task == NULL);
1008
1009 krrp_stream_write_task_init(stream->task_engine,
1010 pdu->txg, &stream_task, stream->resume_info);
1011
1012 /*
1013 * Cookies are required only for
1014 * the first received task
1015 */
1016 if (stream->resume_info != NULL) {
1017 fnvlist_free(stream->resume_info);
1018 stream->resume_info = NULL;
1019 }
1020
1021 stream->cur_task = stream_task;
1022 stream->cur_recv_txg = pdu->txg;
1023
1024 DTRACE_PROBE1(krrp_stream_task_io_start, uint64_t,
1025 stream_task->txg);
1026 }
1027
1028 stream->cur_pdu = pdu;
1029
1030 rc = stream_task->process(stream_task, pdu);
1031 if (rc != 0) {
1032 krrp_stream_lock(stream);
1033 if (stream->state == KRRP_STRMS_ACTIVE) {
1034 krrp_error_t error;
1035
1036 stream->state = KRRP_STRMS_IN_ERROR;
1037 krrp_stream_unlock(stream);
1038
1039 krrp_error_set(&error,
1040 KRRP_ERRNO_WRITEFAIL, rc);
1041 krrp_stream_callback(stream, KRRP_STREAM_ERROR,
1042 (uintptr_t)&error);
1043 krrp_stream_lock(stream);
1044 }
1045
1046 break;
1047 }
1048
1049 stream->bytes_processed += pdu->cur_data_sz;
1050 if (stream_task->done) {
1051 VERIFY(pdu->final == B_TRUE);
1052
1053 DTRACE_PROBE1(krrp_stream_task_io_stop, uint64_t,
1054 stream_task->txg);
1055
1056 krrp_stream_task_done(stream, stream_task, B_FALSE);
1057 stream_task = NULL;
1058 stream->cur_task = NULL;
1059 stream->cur_recv_txg = 0;
1060 }
1061
1062 #ifdef KRRP_STREAM_DEBUG
1063 krrp_queue_put(stream->debug_pdu_queue, pdu);
1064
1065 if (krrp_queue_length(stream->debug_pdu_queue) > 1) {
1066 pdu = krrp_queue_get(stream->debug_pdu_queue);
1067 krrp_pdu_rele((krrp_pdu_t *)pdu);
1068 }
1069 #else
1070 krrp_pdu_rele((krrp_pdu_t *)pdu);
1071 #endif
1072
1073 pdu = NULL;
1074 krrp_stream_lock(stream);
1075 } /* while() loop */
1076
1077 stream->cur_task = NULL;
1078 krrp_stream_unlock(stream);
1079
1080 if (pdu != NULL)
1081 krrp_pdu_rele((krrp_pdu_t *)pdu);
1082
1083 #ifdef KRRP_STREAM_DEBUG
1084 /* Simple get, because this queue without locks */
1085 while ((pdu = krrp_queue_get(stream->debug_pdu_queue)) != NULL)
1086 krrp_pdu_rele((krrp_pdu_t *)pdu);
1087
1088 krrp_queue_fini(stream->debug_pdu_queue);
1089 #endif
1090
1091 if (stream_task != NULL)
1092 krrp_stream_task_done(stream, stream_task, B_TRUE);
1093
1094 krrp_stream_lock(stream);
1095
1096 stream->work_thread = NULL;
1097
1098 krrp_stream_cv_broadcast(stream);
1099 krrp_stream_unlock(stream);
1100 thread_exit();
1101 }
1102
1103 static void
1104 krrp_stream_task_done(krrp_stream_t *stream,
1105 krrp_stream_task_t *task, boolean_t only_fini)
1106 {
1107 task->shutdown(task);
1108
1109 if (only_fini) {
1110 krrp_stream_task_fini(task);
1111 return;
1112 }
1113
1114 switch (stream->mode) {
1115 case KRRP_STRMM_READ:
1116 if (krrp_stream_te_num_pending_tasks(stream->task_engine) == 0) {
1117 if (stream->do_ctrl_snap) {
1118 krrp_autosnap_create_snapshot(stream->autosnap);
1119 stream->do_ctrl_snap = B_FALSE;
1120 }
1121 } else {
1122 stream->do_ctrl_snap = B_TRUE;
1123 }
1124
1125 krrp_queue_put(stream->task_engine->tasks_done, task);
1126 break;
1127 case KRRP_STRMM_WRITE:
1128 krrp_autosnap_txg_rele(stream->autosnap,
1129 task->txg_start, task->txg_end);
1130
1131 krrp_stream_callback(stream, KRRP_STREAM_TXG_RECV_DONE,
1132 (uintptr_t)task->txg);
1133 krrp_stream_task_fini(task);
1134 break;
1135 }
1136 }
1137
1138 static void
1139 krrp_stream_callback(krrp_stream_t *stream,
1140 krrp_stream_cb_ev_t ev, uintptr_t ev_arg)
1141 {
1142 stream->callback(ev, ev_arg, stream->callback_arg);
1143 }
1144
1145 static uint64_t
1146 krrp_stream_get_snap_txg(krrp_stream_t *stream,
1147 const char *short_snap_name)
1148 {
1149 char full_ds_name[MAXNAMELEN];
1150
1151 (void) snprintf(full_ds_name, sizeof (full_ds_name), "%s@%s",
1152 stream->dataset, short_snap_name);
1153
1154 return (dsl_dataset_creation_txg(full_ds_name));
1155 }
1156
1157 boolean_t
1158 krrp_stream_is_write_flag_set(krrp_stream_write_flag_t flags,
1159 krrp_stream_write_flag_t flag)
1160 {
1161 return ((flags & flag) != 0);
1162 }
1163
1164 void
1165 krrp_stream_set_write_flag(krrp_stream_write_flag_t *flags,
1166 krrp_stream_write_flag_t flag)
1167 {
1168 *flags |= flag;
1169 }
1170
1171 boolean_t
1172 krrp_stream_is_read_flag_set(krrp_stream_read_flag_t flags,
1173 krrp_stream_read_flag_t flag)
1174 {
1175 return ((flags & flag) != 0);
1176 }
1177
1178 void
1179 krrp_stream_set_read_flag(krrp_stream_read_flag_t *flags,
1180 krrp_stream_read_flag_t flag)
1181 {
1182 *flags |= flag;
1183 }