1 /*
   2  * This file and its contents are supplied under the terms of the
   3  * Common Development and Distribution License ("CDDL"), version 1.0.
   4  * You may only use this file in accordance with the terms of version
   5  * 1.0 of the CDDL.
   6  *
   7  * A full copy of the text of the CDDL should have accompanied this
   8  * source.  A copy of the CDDL is also available via the Internet at
   9  * http://www.illumos.org/license/CDDL.
  10  */
  11 
  12 /*
  13  * Copyright 2017 Nexenta Systems, Inc.  All rights reserved.
  14  */
  15 
  16 #ifndef _KRRP_STREAM_H
  17 #define _KRRP_STREAM_H
  18 
  19 #include <sys/sysmacros.h>
  20 #include <sys/kmem.h>
  21 #include <sys/atomic.h>
  22 #include <sys/stream.h>
  23 #include <sys/list.h>
  24 #include <sys/modctl.h>
  25 #include <sys/class.h>
  26 #include <sys/cmn_err.h>
  27 
  28 #include <krrp_error.h>
  29 #include "krrp_queue.h"
  30 #include "krrp_pdu.h"
  31 
  32 #include "krrp_autosnap.h"
  33 #include "krrp_stream_task.h"
  34 
  35 #ifdef __cplusplus
  36 extern "C" {
  37 #endif
  38 
  39 #define krrp_stream_lock(a) mutex_enter(&(a)->mtx)
  40 #define krrp_stream_unlock(a) mutex_exit(&(a)->mtx)
  41 #define krrp_stream_cv_wait(a) cv_wait(&(a)->cv, &(a)->mtx)
  42 #define krrp_stream_cv_signal(a) cv_signal(&(a)->cv)
  43 #define krrp_stream_cv_broadcast(a) cv_broadcast(&(a)->cv)
  44 
  45 typedef enum {
  46         KRRP_STRMS_CREATED = 1,
  47         KRRP_STRMS_READY_TO_RUN,
  48         KRRP_STRMS_ACTIVE,
  49         KRRP_STRMS_IN_ERROR,
  50         KRRP_STRMS_STOPPED
  51 } krrp_stream_state_t;
  52 
  53 typedef enum {
  54         KRRP_STRMM_READ = 1,
  55         KRRP_STRMM_WRITE,
  56 } krrp_stream_mode_t;
  57 
  58 typedef struct krrp_stream_s krrp_stream_t;
  59 
  60 typedef enum {
  61         KRRP_STREAM_DATA_PDU = 1,
  62         KRRP_STREAM_TXG_RECV_DONE,
  63         KRRP_STREAM_SEND_DONE,
  64         KRRP_STREAM_ERROR,
  65 } krrp_stream_cb_ev_t;
  66 
  67 typedef void (krrp_stream_cb_t)(krrp_stream_cb_ev_t ev,
  68     uintptr_t ev_arg, void *cb_arg);
  69 
  70 typedef struct krrp_txg_rpo_t {
  71         uint64_t                                value;
  72         uint64_t                                buf[10];
  73         size_t                                  cnt;
  74 } krrp_txg_rpo_t;
  75 
  76 struct krrp_stream_s {
  77         kthread_t                               *work_thread;
  78 
  79         krrp_stream_cb_t                *callback;
  80         void                                    *callback_arg;
  81 
  82         krrp_pdu_engine_t               *data_pdu_engine;
  83 
  84         krrp_stream_state_t             state;
  85         boolean_t                               do_ctrl_snap;
  86         kmutex_t                                mtx;
  87         kcondvar_t                              cv;
  88 
  89         boolean_t                               wait_for_snap;
  90 
  91         boolean_t                               non_continuous;
  92         boolean_t                               fake_mode;
  93         boolean_t                               recursive;
  94         char                                    dataset[MAXNAMELEN];
  95         char                                    base_snap_name[MAXNAMELEN];
  96         char                                    incr_snap_name[MAXNAMELEN];
  97         nvlist_t                                *resume_info;
  98         uint64_t                                notify_txg;
  99         uint64_t                                last_send_txg;
 100         uint64_t                                cur_send_txg;
 101         uint64_t                                cur_recv_txg;
 102         uint64_t                                last_ack_txg;
 103         uint64_t                                last_full_ack_txg;
 104 
 105         uint64_t                                bytes_processed;
 106 
 107         size_t                                  keep_snaps;
 108 
 109         krrp_txg_rpo_t                  avg_total_rpo;
 110         krrp_txg_rpo_t                  avg_rpo;
 111 
 112         uint64_t                                fake_data_sz;
 113         krrp_queue_t                    *write_data_queue;
 114         krrp_stream_mode_t              mode;
 115         krrp_stream_te_t                *task_engine;
 116         krrp_autosnap_t                 *autosnap;
 117 
 118         krrp_pdu_data_t                 *cur_pdu;
 119         krrp_stream_task_t              *cur_task;
 120         krrp_queue_t                    *debug_pdu_queue;
 121         krrp_queue_t                    *debug_tasks_queue;
 122 };
 123 
 124 int krrp_stream_read_create(krrp_stream_t **result_stream,
 125     size_t keep_snaps, const char *dataset, const char *base_snap_name,
 126     const char *incr_snap_name, const char *resume_token,
 127         krrp_stream_read_flag_t flags, const char *skip_snaps_mask,
 128     krrp_error_t *error);
 129 int krrp_stream_write_create(krrp_stream_t **result_stream,
 130     size_t keep_snaps, const char *dataset, const char *incr_snap_name,
 131     const char *resume_token, krrp_stream_write_flag_t flags,
 132     nvlist_t *ignore_props_list, nvlist_t *replace_props_list,
 133     krrp_error_t *error);
 134 int krrp_stream_fake_read_create(krrp_stream_t **result_stream,
 135     uint64_t fake_data_sz, krrp_error_t *error);
 136 int krrp_stream_fake_write_create(krrp_stream_t **result_stream,
 137     krrp_error_t *error);
 138 void krrp_stream_destroy(krrp_stream_t *stream);
 139 
 140 void krrp_stream_register_callback(krrp_stream_t *stream,
 141     krrp_stream_cb_t *ev_cb, void *ev_cb_arg);
 142 
 143 int krrp_stream_run(krrp_stream_t *stream, krrp_queue_t *write_data_queue,
 144     krrp_pdu_engine_t *data_pdu_engine, krrp_error_t *error);
 145 
 146 void krrp_stream_txg_confirmed(krrp_stream_t *, uint64_t, boolean_t);
 147 
 148 void krrp_stream_stop(krrp_stream_t *stream);
 149 int krrp_stream_send_stop(krrp_stream_t *stream);
 150 
 151 boolean_t krrp_stream_is_write_flag_set(krrp_stream_write_flag_t flags,
 152     krrp_stream_write_flag_t flag);
 153 void krrp_stream_set_write_flag(krrp_stream_write_flag_t *flags,
 154     krrp_stream_write_flag_t flag);
 155 boolean_t krrp_stream_is_read_flag_set(krrp_stream_read_flag_t flags,
 156     krrp_stream_read_flag_t flag);
 157 void krrp_stream_set_read_flag(krrp_stream_read_flag_t *flags,
 158     krrp_stream_read_flag_t flag);
 159 
 160 #ifdef __cplusplus
 161 }
 162 #endif
 163 
 164 #endif /* _KRRP_STREAM_H */