1 /*
2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
5 * 1.0 of the CDDL.
6 *
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
10 */
11
12 /*
13 * Copyright 2018 Nexenta Systems, Inc. All rights reserved.
14 */
15
16 /*
17 * Stream-tasks module.
18 * One stream-task is one send/recv operation.
19 */
20
21 #include <sys/types.h>
22 #include <sys/kmem.h>
23 #include <sys/ddi.h>
24 #include <sys/sunddi.h>
25 #include <sys/sunldi.h>
26 #include <sys/time.h>
27 #include <sys/strsubr.h>
28 #include <sys/sysmacros.h>
29 #include <sys/socketvar.h>
30 #include <sys/ksocket.h>
31 #include <sys/filio.h>
32 #include <sys/modctl.h>
33 #include <sys/class.h>
34 #include <sys/cmn_err.h>
35 #include <inet/ip.h>
36
37 #include <sys/dmu_impl.h>
38
39 #include <sys/kreplication_common.h>
40
41 #include "krrp_stream_task.h"
42
43 #define KRRP_FAKE_TXG 0x9BC1546914997F6C
44 #define USEC2NSEC(m) ((hrtime_t)(m) * (NANOSEC / MICROSEC))
45
46 /* #define KRRP_STREAM_TASK_DEBUG 1 */
47
48 extern boolean_t krrp_stream_is_write_flag_set(krrp_stream_write_flag_t flags,
49 krrp_stream_write_flag_t flag);
50 extern boolean_t krrp_stream_is_read_flag_set(krrp_stream_read_flag_t flags,
51 krrp_stream_read_flag_t flag);
52
53 static int krrp_stream_te_common_create(krrp_stream_te_t **result_te,
54 const char *dataset, boolean_t read_mode, krrp_error_t *error);
55
56 static int krrp_stream_task_constructor(void *, void *, int);
57 static void krrp_stream_task_destructor(void *, void *);
58
59 static krrp_stream_task_shandler_t krrp_stream_task_read_start;
60 static krrp_stream_task_shandler_t krrp_stream_task_common_stop;
61
62 static krrp_stream_task_shandler_t krrp_stream_task_fake_common_action;
63
64 static krrp_stream_task_handler_t krrp_stream_task_fake_write_handler;
65 static krrp_stream_task_handler_t krrp_stream_task_write_handler;
66 static krrp_stream_task_handler_t krrp_stream_task_fake_read_handler;
67 static krrp_stream_task_handler_t krrp_stream_task_read_handler;
68
69 static void krrp_stream_task_correct_total_size(krrp_dblk_t *, size_t *);
70
71 static void krrp_stream_task_fake_rate_limit(krrp_stream_task_t *,
72 hrtime_t, uint32_t);
73
74 uint32_t krrp_stream_fake_rate_limit_mb = 0;
75
76 int
77 krrp_stream_te_read_create(krrp_stream_te_t **result_te,
78 const char *dataset, krrp_stream_read_flag_t flags,
79 krrp_check_enough_mem *mem_check_cb, void *mem_check_cb_arg,
80 const char *skip_snaps_mask, krrp_error_t *error)
81 {
82 int rc;
83 krrp_stream_te_t *task_engine;
84
85 ASSERT(dataset != NULL);
86
87 rc = krrp_stream_te_common_create(result_te,
88 dataset, B_TRUE, error);
89 if (rc != 0)
90 return (-1);
91
92 task_engine = *result_te;
93
94 if (skip_snaps_mask != NULL) {
95 char buf[ZAP_MAXNAMELEN + ZAP_MAXVALUELEN + 1];
96 char *eq_sym;
97
98 if (strlcpy(buf, skip_snaps_mask,
99 sizeof (buf)) >= sizeof (buf)) {
100 krrp_error_set(error, KRRP_ERRNO_SKIP_SNAPS_MASK, EMSGSIZE);
101 return (-1);
102 }
103
104 eq_sym = strchr(buf, '=');
105 if (eq_sym != NULL) {
106 *eq_sym = '\0';
107 eq_sym++;
108 }
109
110 if (eq_sym == NULL || *eq_sym == '\0') {
111 krrp_error_set(error, KRRP_ERRNO_SKIP_SNAPS_MASK, EINVAL);
112 return (-1);
113 }
114
115 if (strlcpy(task_engine->skip_snaps_prop_name, buf,
116 sizeof (task_engine->skip_snaps_prop_name)) >=
117 sizeof (task_engine->skip_snaps_prop_name)) {
118 krrp_error_set(error, KRRP_ERRNO_SKIP_SNAPS_MASK, ENAMETOOLONG);
119 return (-1);
120 }
121
122 if (strlcpy(task_engine->skip_snaps_prop_val, eq_sym,
123 sizeof (task_engine->skip_snaps_prop_val)) >=
124 sizeof (task_engine->skip_snaps_prop_val)) {
125 krrp_error_set(error, KRRP_ERRNO_SKIP_SNAPS_MASK, E2BIG);
126 return (-1);
127 }
128 }
129
130 task_engine->recursive =
131 krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_RECURSIVE);
132 task_engine->properties =
133 krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_SEND_PROPS);
134 task_engine->enable_cksum =
135 krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_ENABLE_CHKSUM);
136 task_engine->embedded =
137 krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_EMBEDDED);
138 task_engine->compressed =
139 krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_COMPRESSED);
140 task_engine->large_blocks =
141 krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_LARGE_BLOCKS);
142 task_engine->incremental_package =
143 krrp_stream_is_read_flag_set(flags, KRRP_STRMRF_SEND_ALL_SNAPS);
144
145 task_engine->mem_check_cb = mem_check_cb;
146 task_engine->mem_check_cb_arg = mem_check_cb_arg;
147
148 return (0);
149 }
150
151 int
152 krrp_stream_te_write_create(krrp_stream_te_t **result_te,
153 const char *dataset, krrp_stream_write_flag_t flags,
154 nvlist_t *ignore_props_list, nvlist_t *replace_props_list,
155 krrp_error_t *error)
156 {
157 int rc;
158 krrp_stream_te_t *task_engine;
159
160 ASSERT(dataset != NULL);
161
162 rc = krrp_stream_te_common_create(result_te,
163 dataset, B_FALSE, error);
164 if (rc != 0)
165 return (-1);
166
167 task_engine = *result_te;
168
169 task_engine->force_receive =
170 krrp_stream_is_write_flag_set(flags, KRRP_STRMWF_FORCE_RECV);
171 task_engine->enable_cksum =
172 krrp_stream_is_write_flag_set(flags, KRRP_STRMWF_ENABLE_CHKSUM);
173 task_engine->discard_head =
174 krrp_stream_is_write_flag_set(flags, KRRP_STRMWF_DISCARD_HEAD);
175 task_engine->leave_tail =
176 krrp_stream_is_write_flag_set(flags, KRRP_STRMWF_LEAVE_TAIL);
177
178 /*
179 * Need to dup the nvls because they are part of another nvl,
180 * that will be destroyed
181 */
182 if (ignore_props_list != NULL) {
183 task_engine->ignore_props_list =
184 fnvlist_dup(ignore_props_list);
185 }
186
187 if (replace_props_list != NULL) {
188 task_engine->replace_props_list =
189 fnvlist_dup(replace_props_list);
190 }
191
192 return (0);
193 }
194
195 int
196 krrp_stream_te_fake_read_create(krrp_stream_te_t **result_te,
197 krrp_error_t *error)
198 {
199 return (krrp_stream_te_common_create(result_te,
200 NULL, B_TRUE, error));
201 }
202
203 int
204 krrp_stream_te_fake_write_create(krrp_stream_te_t **result_te,
205 krrp_error_t *error)
206 {
207 return (krrp_stream_te_common_create(result_te,
208 NULL, B_FALSE, error));
209 }
210
211 void
212 krrp_stream_te_destroy(krrp_stream_te_t *task_engine)
213 {
214 krrp_stream_task_t *task;
215
216 while ((task = krrp_queue_get_no_wait(task_engine->tasks)) != NULL)
217 krrp_stream_task_fini(task);
218
219 krrp_queue_fini(task_engine->tasks);
220
221 if (task_engine->mode == KRRP_STEM_READ) {
222 while ((task = krrp_queue_get_no_wait(
223 task_engine->tasks_done)) != NULL)
224 krrp_stream_task_fini(task);
225
226 krrp_queue_fini(task_engine->tasks_done);
227
228 while ((task = krrp_queue_get_no_wait(
229 task_engine->tasks_done2)) != NULL)
230 krrp_stream_task_fini(task);
231
232 krrp_queue_fini(task_engine->tasks_done2);
233 }
234
235 kmem_cache_destroy(task_engine->tasks_cache);
236
237 if (task_engine->global_zfs_ctx != NULL) {
238 dmu_krrp_stream_fini(task_engine->global_zfs_ctx);
239 task_engine->global_zfs_ctx = NULL;
240 }
241
242 if (task_engine->ignore_props_list != NULL)
243 fnvlist_free(task_engine->ignore_props_list);
244
245 if (task_engine->replace_props_list != NULL)
246 fnvlist_free(task_engine->replace_props_list);
247
248 kmem_free(task_engine, sizeof (krrp_stream_te_t));
249 }
250
251 static int
252 krrp_stream_te_common_create(krrp_stream_te_t **result_te,
253 const char *dataset, boolean_t read_mode, krrp_error_t *error)
254 {
255 krrp_stream_te_t *task_engine;
256 char kmem_cache_name[KSTAT_STRLEN];
257
258 ASSERT(result_te != NULL && *result_te == NULL);
259
260 task_engine = kmem_zalloc(sizeof (krrp_stream_te_t), KM_SLEEP);
261
262 if (dataset != NULL) {
263 task_engine->global_zfs_ctx = dmu_krrp_stream_init();
264 if (task_engine->global_zfs_ctx == NULL) {
265 kmem_free(task_engine, sizeof (krrp_stream_te_t));
266 krrp_error_set(error, KRRP_ERRNO_ZFSGCTXFAIL, 0);
267 return (-1);
268 }
269
270 task_engine->dataset = dataset;
271 } else
272 task_engine->fake_mode = B_TRUE;
273
274 krrp_queue_init(&task_engine->tasks, sizeof (krrp_stream_task_t),
275 offsetof(krrp_stream_task_t, node));
276
277 (void) snprintf(kmem_cache_name, KSTAT_STRLEN,
278 "krrp_stc_%p", (void *)task_engine);
279
280 task_engine->tasks_cache = kmem_cache_create(kmem_cache_name,
281 sizeof (krrp_stream_task_t), 0, &krrp_stream_task_constructor,
282 &krrp_stream_task_destructor, NULL, (void *)task_engine,
283 NULL, KM_SLEEP);
284
285 if (read_mode) {
286 task_engine->mode = KRRP_STEM_READ;
287 /*
288 * This queue contains tasks, that were completely read
289 * from ZFS and are being sent to the receiver
290 */
291 krrp_queue_init(&task_engine->tasks_done,
292 sizeof (krrp_stream_task_t),
293 offsetof(krrp_stream_task_t, node));
294 /*
295 * This queue contains tasks/threads waiting to receive
296 * acknowledgement from the receiver that the sent blocks
297 * have been written to the disk.
298 */
299 krrp_queue_init(&task_engine->tasks_done2,
300 sizeof (krrp_stream_task_t),
301 offsetof(krrp_stream_task_t, node));
302 } else
303 task_engine->mode = KRRP_STEM_WRITE;
304
305 *result_te = task_engine;
306
307 return (0);
308 }
309
310 /*
311 * Total number of task is the sum of tasks that are:
312 * waiting for write to ZFS on the receiver (task_engine->tasks_done2)
313 *
314 * waiting for read from ZFS or in process of being sent to the receiver
315 * task_engine->tasks + task_engine->tasks_done
316 */
317 size_t
318 krrp_stream_te_total_num_tasks(krrp_stream_te_t *task_engine)
319 {
320 return (krrp_queue_length(task_engine->tasks) +
321 krrp_queue_length(task_engine->tasks_done) +
322 krrp_queue_length(task_engine->tasks_done2));
323 }
324
325 /*
326 * Pending tasks are the tasks, that are waiting for read from ZFS
327 */
328 size_t
329 krrp_stream_te_num_pending_tasks(krrp_stream_te_t *task_engine)
330 {
331 return (krrp_queue_length(task_engine->tasks));
332 }
333
334 void
335 krrp_stream_read_task_init(krrp_stream_te_t *task_engine, uint64_t txg,
336 const char *src_snap, const char *src_inc_snap, nvlist_t *resume_info)
337 {
338 ASSERT(task_engine->mode == KRRP_STEM_READ);
339
340 ASSERT(src_snap != NULL && strlen(src_snap) != 0);
341
342 krrp_stream_task_t *task;
343
344 task = kmem_cache_alloc(task_engine->tasks_cache, KM_SLEEP);
345
346 task->txg = txg;
347
348 (void) strlcpy(task->zargs.from_snap, src_snap,
349 sizeof (task->zargs.from_snap));
350 if (src_inc_snap != NULL)
351 (void) strlcpy(task->zargs.from_incr_base, src_inc_snap,
352 sizeof (task->zargs.from_incr_base));
353 else
354 task->zargs.from_incr_base[0] = '\0';
355
356 task->zargs.resume_info =
357 resume_info != NULL ? fnvlist_dup(resume_info) : NULL;
358
359 task->init_hrtime = gethrtime();
360
361 krrp_queue_put(task_engine->tasks, task);
362 }
363
364 void
365 krrp_stream_fake_read_task_init(krrp_stream_te_t *task_engine,
366 uint64_t fake_data_sz)
367 {
368 ASSERT(task_engine->mode == KRRP_STEM_READ);
369 ASSERT(fake_data_sz != 0);
370
371 krrp_stream_task_t *task;
372
373 task = kmem_cache_alloc(task_engine->tasks_cache, KM_SLEEP);
374
375 task->txg = KRRP_FAKE_TXG;
376 task->fake_data_sz = fake_data_sz;
377 task->init_hrtime = gethrtime();
378
379 krrp_queue_put(task_engine->tasks, task);
380 }
381
382 void
383 krrp_stream_write_task_init(krrp_stream_te_t *task_engine, uint64_t txg,
384 krrp_stream_task_t **result_task, nvlist_t *resume_info)
385 {
386 ASSERT(task_engine->mode == KRRP_STEM_WRITE);
387
388 krrp_stream_task_t *task;
389
390 task = kmem_cache_alloc(task_engine->tasks_cache, KM_SLEEP);
391
392 task->txg = txg;
393
394 task->txg_start = UINT64_MAX;
395 task->txg_end = UINT64_MAX;
396
397 task->zargs.resume_info =
398 resume_info != NULL ? fnvlist_dup(resume_info) : NULL;
399
400 if (!task_engine->fake_mode)
401 task->zfs_ctx = dmu_krrp_init_recv_task(&task->zargs);
402
403 *result_task = task;
404 }
405
406 hrtime_t
407 krrp_stream_task_calc_rpo(krrp_stream_task_t *task)
408 {
409 ASSERT(task->engine->mode == KRRP_STEM_READ);
410
411 return (gethrtime() - task->init_hrtime);
412 }
413
414 void
415 krrp_stream_task_fini(krrp_stream_task_t *task)
416 {
417 krrp_stream_te_t *task_engine = task->engine;
418
419 task->txg = 0;
420 task->zfs_ctx = NULL;
421 task->done = B_FALSE;
422
423 kmem_cache_free(task_engine->tasks_cache, task);
424 }
425
426 void
427 krrp_stream_task_engine_get_task(krrp_stream_te_t *task_engine,
428 krrp_stream_task_t **result_stream_task)
429 {
430 *result_stream_task = krrp_queue_get(task_engine->tasks);
431 }
432
433 /* ARGSUSED */
434 static int
435 krrp_stream_task_constructor(void *opaque_task,
436 void *opaque_task_engine, int km_flags)
437 {
438 krrp_stream_task_t *task = opaque_task;
439 krrp_stream_te_t *task_engine = opaque_task_engine;
440
441 bzero(&task->zargs, sizeof (kreplication_zfs_args_t));
442
443 task->zargs.stream_handler = task_engine->global_zfs_ctx;
444 task->zargs.force_cksum = task_engine->enable_cksum;
445
446 switch (task_engine->mode) {
447 case KRRP_STEM_READ:
448 if (task_engine->fake_mode) {
449 task->process = &krrp_stream_task_fake_read_handler;
450 task->start = &krrp_stream_task_fake_common_action;
451 task->shutdown = &krrp_stream_task_fake_common_action;
452 } else {
453 (void) strlcpy(task->zargs.from_ds,
454 task_engine->dataset,
455 sizeof (task->zargs.from_ds));
456
457 task->process = &krrp_stream_task_read_handler;
458 task->start = &krrp_stream_task_read_start;
459 task->shutdown = &krrp_stream_task_common_stop;
460
461 task->zargs.skip_snaps_prop_name =
462 task_engine->skip_snaps_prop_name;
463 task->zargs.skip_snaps_prop_val =
464 task_engine->skip_snaps_prop_val;
465
466 task->zargs.mem_check_cb =
467 task_engine->mem_check_cb;
468 task->zargs.mem_check_cb_arg =
469 task_engine->mem_check_cb_arg;
470 }
471
472 task->zargs.do_all = task_engine->incremental_package;
473 task->zargs.properties = task_engine->properties;
474 task->zargs.recursive = task_engine->recursive;
475 task->zargs.embedok = task_engine->embedded;
476 task->zargs.compressok = task_engine->compressed;
477 task->zargs.large_block_ok = task_engine->large_blocks;
478
479 break;
480 case KRRP_STEM_WRITE:
481 if (task_engine->fake_mode) {
482 task->process = &krrp_stream_task_fake_write_handler;
483 task->shutdown = &krrp_stream_task_fake_common_action;
484 } else {
485 (void) strlcpy(task->zargs.to_ds, task_engine->dataset,
486 sizeof (task->zargs.to_ds));
487
488 task->process = &krrp_stream_task_write_handler;
489 task->shutdown = &krrp_stream_task_common_stop;
490 }
491
492 task->zargs.force = task_engine->force_receive;
493 task->zargs.ignore_list = task_engine->ignore_props_list;
494 task->zargs.replace_list = task_engine->replace_props_list;
495 task->zargs.strip_head = task_engine->discard_head;
496 task->zargs.leave_tail = task_engine->leave_tail;
497
498 break;
499 default:
500 VERIFY(0);
501 break;
502 }
503
504 if (task_engine->fake_mode) {
505 mutex_init(&task->mtx, NULL, MUTEX_DEFAULT, NULL);
506 cv_init(&task->cv, NULL, CV_DEFAULT, NULL);
507 }
508
509 task->engine = task_engine;
510
511 task->txg = 0;
512 task->zfs_ctx = NULL;
513 task->done = B_FALSE;
514
515 return (0);
516 }
517
518 static void
519 krrp_stream_task_destructor(void *opaque_task,
520 void *opaque_task_engine)
521 {
522 krrp_stream_task_t *task = opaque_task;
523 krrp_stream_te_t *task_engine = opaque_task_engine;
524
525 if (task_engine->fake_mode) {
526 cv_destroy(&task->cv);
527 mutex_destroy(&task->mtx);
528 }
529 }
530
531 static int
532 krrp_stream_task_fake_write_handler(krrp_stream_task_t *task,
533 krrp_pdu_data_t *pdu)
534 {
535 krrp_stream_task_fake_rate_limit(task,
536 gethrtime(), pdu->cur_data_sz);
537
538 if (pdu->final)
539 task->done = B_TRUE;
540
541 return (0);
542 }
543
544 static int
545 krrp_stream_task_write_handler(krrp_stream_task_t *task, krrp_pdu_data_t *pdu)
546 {
547 int rc = 0;
548 kreplication_buffer_t *kr_buf = NULL;
549
550 kr_buf = (kreplication_buffer_t *)pdu->dblk;
551
552 ASSERT(task->zfs_ctx != NULL);
553
554 if (pdu->final)
555 task->done = B_TRUE;
556
557 if (kr_buf->data_size != 0)
558 rc = dmu_krrp_lend_recv_buffer(task->zfs_ctx, kr_buf);
559
560 #ifdef KRRP_STREAM_TASK_DEBUG
561 VERIFY3U(rc, ==, 0);
562 #endif
563
564 return (rc);
565 }
566
567 static int
568 krrp_stream_task_fake_read_handler(krrp_stream_task_t *task,
569 krrp_pdu_data_t *pdu)
570 {
571 krrp_dblk_t *dblk;
572 hrtime_t start = gethrtime();
573
574 dblk = pdu->dblk;
575
576 while (dblk != NULL) {
577 if (task->fake_data_sz > dblk->max_data_sz)
578 dblk->cur_data_sz = dblk->max_data_sz;
579 else
580 dblk->cur_data_sz = task->fake_data_sz;
581
582 task->fake_data_sz -= dblk->cur_data_sz;
583 pdu->cur_data_sz += dblk->cur_data_sz;
584
585 if (task->fake_data_sz == 0) {
586 if (dblk->next == NULL) {
587 pdu->cur_data_sz -= dblk->cur_data_sz;
588 dblk->cur_data_sz = 0;
589 }
590
591 pdu->final = B_TRUE;
592 task->done = B_TRUE;
593 break;
594 }
595
596 dblk = dblk->next;
597 }
598
599 pdu->txg = task->txg;
600
601 krrp_stream_task_fake_rate_limit(task, start, pdu->cur_data_sz);
602
603 return (0);
604 }
605
606 static void
607 krrp_stream_task_fake_rate_limit(krrp_stream_task_t *task,
608 hrtime_t start_time, uint32_t data_sz)
609 {
610 hrtime_t diff, delay;
611
612 if (krrp_stream_fake_rate_limit_mb == 0 || data_sz == 0) {
613 DTRACE_PROBE1(rate_limit_delay1, uint64_t, 0);
614 return;
615 }
616
617 diff = gethrtime() - start_time;
618 delay = ((hrtime_t)data_sz * NANOSEC) /
619 ((hrtime_t)krrp_stream_fake_rate_limit_mb * 1024 * 1024);
620 if (diff > delay) {
621 DTRACE_PROBE2(rate_limit_delay2, uint64_t, diff,
622 uint64_t, delay);
623 return;
624 }
625
626 delay = delay - diff;
627
628 DTRACE_PROBE1(rate_limit_delay3, uint64_t, delay);
629
630 mutex_enter(&task->mtx);
631 (void) cv_timedwait_hires(&task->cv, &task->mtx,
632 delay, USEC2NSEC(100), CALLOUT_FLAG_ROUNDUP);
633 mutex_exit(&task->mtx);
634
635 diff = gethrtime() - start_time;
636 DTRACE_PROBE2(rate_limit_delay4, uint64_t, delay, uint64_t, diff);
637 }
638
639 static int
640 krrp_stream_task_read_handler(krrp_stream_task_t *task, krrp_pdu_data_t *pdu)
641 {
642 int rc = 0;
643 kreplication_buffer_t *kr_buf = NULL;
644
645 kr_buf = (kreplication_buffer_t *)pdu->dblk;
646
647 ASSERT(task->zfs_ctx != NULL);
648
649 pdu->txg = task->txg;
650
651 /*
652 * dmu_krrp_lend_send_buffer always fill all available space.
653 * only in case of ENODATA it may not fill all available space.
654 */
655 pdu->cur_data_sz = pdu->max_data_sz;
656 rc = dmu_krrp_lend_send_buffer(task->zfs_ctx, kr_buf);
657
658 /* so in this case need to recalculate total size */
659 if (rc == ENODATA) {
660 pdu->final = B_TRUE;
661 task->done = B_TRUE;
662 rc = 0;
663 pdu->cur_data_sz = 0;
664 krrp_stream_task_correct_total_size(pdu->dblk,
665 &pdu->cur_data_sz);
666 VERIFY3U(pdu->cur_data_sz, <=, pdu->max_data_sz);
667 }
668
669 #ifdef KRRP_STREAM_TASK_DEBUG
670 VERIFY3U(rc, ==, 0);
671 #endif
672
673 return (rc);
674 }
675
676 static void
677 krrp_stream_task_read_start(krrp_stream_task_t *task)
678 {
679 task->zfs_ctx = dmu_krrp_init_send_task(&task->zargs);
680 }
681
682 /* ARGSUSED */
683 static void
684 krrp_stream_task_fake_common_action(krrp_stream_task_t *task)
685 {
686 /* nothing to do */
687 }
688
689 static void
690 krrp_stream_task_common_stop(krrp_stream_task_t *task)
691 {
692 ASSERT(task->zfs_ctx != NULL);
693
694 #ifdef KRRP_STREAM_TASK_DEBUG
695 VERIFY3U(dmu_krrp_fini_task(task->zfs_ctx), ==, 0);
696 #else
697 (void) dmu_krrp_fini_task(task->zfs_ctx);
698 #endif
699
700 task->zfs_ctx = NULL;
701 }
702
703 static void
704 krrp_stream_task_correct_total_size(krrp_dblk_t *dblk_head,
705 size_t *total_data_sz)
706 {
707 krrp_dblk_t *dblk;
708
709 dblk = dblk_head;
710 while (dblk != NULL) {
711 *total_data_sz += dblk->cur_data_sz;
712 if (dblk->cur_data_sz == 0)
713 break;
714
715 dblk = dblk->next;
716 }
717 }