1 /*
   2  * This file and its contents are supplied under the terms of the
   3  * Common Development and Distribution License ("CDDL"), version 1.0.
   4  * You may only use this file in accordance with the terms of version
   5  * 1.0 of the CDDL.
   6  *
   7  * A full copy of the text of the CDDL should have accompanied this
   8  * source.  A copy of the CDDL is also available via the Internet at
   9  * http://www.illumos.org/license/CDDL.
  10  */
  11 
  12 /*
  13  * Copyright 2018 Nexenta Systems, Inc.  All rights reserved.
  14  */
  15 
  16 #ifndef _KRRP_STREAM_TASK_H
  17 #define _KRRP_STREAM_TASK_H
  18 
  19 #include <sys/sysmacros.h>
  20 #include <sys/kmem.h>
  21 #include <sys/atomic.h>
  22 #include <sys/stream.h>
  23 #include <sys/list.h>
  24 #include <sys/modctl.h>
  25 #include <sys/class.h>
  26 #include <sys/cmn_err.h>
  27 
  28 #include <sys/kreplication_common.h>
  29 
  30 #include <krrp_error.h>
  31 
  32 #include "krrp_queue.h"
  33 #include "krrp_pdu.h"
  34 
  35 #ifdef __cplusplus
  36 extern "C" {
  37 #endif
  38 
  39 /*
  40  * A skip mask is a string that looks like
  41  *              prop_name=prop_value
  42  *
  43  * max len of prop_name is ZAP_MAXNAMELEN
  44  * max len of prop_value is ZAP_MAXVALUELEN
  45  * and 1 byte to store '='
  46  */
  47 #define KRRP_SKIP_SNAP_MASK_LEN ZAP_MAXNAMELEN + ZAP_MAXVALUELEN + 1
  48 
  49 #define KRRP_DBLK_TAIL_SIZE     sizeof (kreplication_buffer_t)
  50 
  51 typedef enum {
  52         KRRP_STEM_READ = 1,
  53         KRRP_STEM_WRITE,
  54 } krrp_stream_te_mode_t;
  55 
  56 typedef enum {
  57         KRRP_STRMRF_RECURSIVE           = (1 << 0),
  58         KRRP_STRMRF_SEND_PROPS          = (1 << 1),
  59         KRRP_STRMRF_SEND_ALL_SNAPS      = (1 << 2),
  60         KRRP_STRMRF_EMBEDDED            = (1 << 3),
  61         KRRP_STRMRF_ENABLE_CHKSUM       = (1 << 4),
  62         KRRP_STRMRF_COMPRESSED          = (1 << 5),
  63         KRRP_STRMRF_LARGE_BLOCKS        = (1 << 6)
  64 } krrp_stream_read_flag_t;
  65 
  66 typedef enum {
  67         KRRP_STRMWF_FORCE_RECV          = (1 << 0),
  68         KRRP_STRMWF_DISCARD_HEAD        = (1 << 1),
  69         KRRP_STRMWF_LEAVE_TAIL          = (1 << 2),
  70         KRRP_STRMWF_ENABLE_CHKSUM       = (1 << 3)
  71 } krrp_stream_write_flag_t;
  72 
  73 
  74 typedef struct krrp_stream_te_s {
  75         krrp_queue_t                    *tasks;
  76         krrp_queue_t                    *tasks_done;
  77         krrp_queue_t                    *tasks_done2;
  78         kmem_cache_t                    *tasks_cache;
  79         krrp_stream_te_mode_t   mode;
  80 
  81         void                                    *global_zfs_ctx;
  82 
  83         const char                              *dataset;
  84 
  85         char                                    skip_snaps_prop_name[ZAP_MAXNAMELEN];
  86         char                                    skip_snaps_prop_val[ZAP_MAXVALUELEN];
  87 
  88         boolean_t                               fake_mode;
  89         boolean_t                               discard_head;
  90         boolean_t                               leave_tail;
  91         boolean_t                               force_receive;
  92         boolean_t                               recursive;
  93         boolean_t                               incremental_package;
  94         boolean_t                               properties;
  95         boolean_t                               enable_cksum;
  96         boolean_t                               embedded;
  97         boolean_t                               compressed;
  98         boolean_t                               large_blocks;
  99         nvlist_t                                *ignore_props_list;
 100         nvlist_t                                *replace_props_list;
 101 
 102         krrp_check_enough_mem   *mem_check_cb;
 103         void                                    *mem_check_cb_arg;
 104 } krrp_stream_te_t;
 105 
 106 typedef struct krrp_stream_task_s krrp_stream_task_t;
 107 
 108 typedef void krrp_stream_task_shandler_t(krrp_stream_task_t *);
 109 typedef int krrp_stream_task_handler_t(krrp_stream_task_t *, krrp_pdu_data_t *);
 110 
 111 struct krrp_stream_task_s {
 112         list_node_t                                     node;
 113 
 114         kreplication_zfs_args_t         zargs;
 115 
 116         void                                            *zfs_ctx;
 117         uint64_t                                        txg;
 118         uint64_t                                        fake_data_sz;
 119         boolean_t                                       done;
 120         krrp_stream_task_shandler_t     *start;
 121         krrp_stream_task_shandler_t     *shutdown;
 122         krrp_stream_task_handler_t      *process;
 123         krrp_stream_te_t                                *engine;
 124 
 125         /* These fields are used only at recv-side */
 126         uint64_t                                        txg_start;
 127         uint64_t                                        txg_end;
 128 
 129         /* These fields are used only at send-side */
 130         hrtime_t                                        init_hrtime;
 131 
 132         /* To implement sleep() for fake-tasks */
 133         kmutex_t                                        mtx;
 134         kcondvar_t                                      cv;
 135 };
 136 
 137 int krrp_stream_te_read_create(krrp_stream_te_t **result_te,
 138     const char *dataset, krrp_stream_read_flag_t flags,
 139     krrp_check_enough_mem *mem_check_cb, void *mem_check_cb_arg,
 140         const char *skip_snaps_mask, krrp_error_t *error);
 141 int krrp_stream_te_write_create(krrp_stream_te_t **result_te,
 142     const char *dataset, krrp_stream_write_flag_t flags,
 143     nvlist_t *ignore_props_list, nvlist_t *replace_props_list,
 144     krrp_error_t *error);
 145 int krrp_stream_te_fake_read_create(krrp_stream_te_t **result_te,
 146     krrp_error_t *error);
 147 int krrp_stream_te_fake_write_create(krrp_stream_te_t **result_te,
 148     krrp_error_t *error);
 149 
 150 void krrp_stream_te_destroy(krrp_stream_te_t *task_engine);
 151 
 152 size_t krrp_stream_te_total_num_tasks(krrp_stream_te_t *);
 153 size_t krrp_stream_te_num_pending_tasks(krrp_stream_te_t *);
 154 
 155 void krrp_stream_task_engine_get_task(krrp_stream_te_t *,
 156     krrp_stream_task_t **);
 157 
 158 void krrp_stream_fake_read_task_init(krrp_stream_te_t *, uint64_t);
 159 void krrp_stream_read_task_init(krrp_stream_te_t *, uint64_t,
 160     const char *, const char *, nvlist_t *);
 161 void krrp_stream_write_task_init(krrp_stream_te_t *, uint64_t,
 162     krrp_stream_task_t **, nvlist_t *);
 163 void krrp_stream_task_fini(krrp_stream_task_t *);
 164 
 165 hrtime_t krrp_stream_task_calc_rpo(krrp_stream_task_t *);
 166 
 167 #ifdef __cplusplus
 168 }
 169 #endif
 170 
 171 #endif /* _KRRP_STREAM_TASK_H */