1 /*
2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
5 * 1.0 of the CDDL.
6 *
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
10 */
11
12 /*
13 * Copyright 2017 Nexenta Systems, Inc. All rights reserved.
14 */
15
16 #ifndef _KRRP_STREAM_H
17 #define _KRRP_STREAM_H
18
19 #include <sys/sysmacros.h>
20 #include <sys/kmem.h>
21 #include <sys/atomic.h>
22 #include <sys/stream.h>
23 #include <sys/list.h>
24 #include <sys/modctl.h>
25 #include <sys/class.h>
26 #include <sys/cmn_err.h>
27
28 #include <krrp_error.h>
29 #include "krrp_queue.h"
30 #include "krrp_pdu.h"
31
32 #include "krrp_autosnap.h"
33 #include "krrp_stream_task.h"
34
35 #ifdef __cplusplus
36 extern "C" {
37 #endif
38
39 #define krrp_stream_lock(a) mutex_enter(&(a)->mtx)
40 #define krrp_stream_unlock(a) mutex_exit(&(a)->mtx)
41 #define krrp_stream_cv_wait(a) cv_wait(&(a)->cv, &(a)->mtx)
42 #define krrp_stream_cv_signal(a) cv_signal(&(a)->cv)
43 #define krrp_stream_cv_broadcast(a) cv_broadcast(&(a)->cv)
44
45 typedef enum {
46 KRRP_STRMS_CREATED = 1,
47 KRRP_STRMS_READY_TO_RUN,
48 KRRP_STRMS_ACTIVE,
49 KRRP_STRMS_IN_ERROR,
50 KRRP_STRMS_STOPPED
51 } krrp_stream_state_t;
52
53 typedef enum {
54 KRRP_STRMM_READ = 1,
55 KRRP_STRMM_WRITE,
56 } krrp_stream_mode_t;
57
58 typedef struct krrp_stream_s krrp_stream_t;
59
60 typedef enum {
61 KRRP_STREAM_DATA_PDU = 1,
62 KRRP_STREAM_TXG_RECV_DONE,
63 KRRP_STREAM_SEND_DONE,
64 KRRP_STREAM_ERROR,
65 } krrp_stream_cb_ev_t;
66
67 typedef void (krrp_stream_cb_t)(krrp_stream_cb_ev_t ev,
68 uintptr_t ev_arg, void *cb_arg);
69
70 typedef struct krrp_txg_rpo_t {
71 uint64_t value;
72 uint64_t buf[10];
73 size_t cnt;
74 } krrp_txg_rpo_t;
75
76 struct krrp_stream_s {
77 kthread_t *work_thread;
78
79 krrp_stream_cb_t *callback;
80 void *callback_arg;
81
82 krrp_pdu_engine_t *data_pdu_engine;
83
84 krrp_stream_state_t state;
85 boolean_t do_ctrl_snap;
86 kmutex_t mtx;
87 kcondvar_t cv;
88
89 boolean_t wait_for_snap;
90
91 boolean_t non_continuous;
92 boolean_t fake_mode;
93 boolean_t recursive;
94 char dataset[MAXNAMELEN];
95 char base_snap_name[MAXNAMELEN];
96 char incr_snap_name[MAXNAMELEN];
97 nvlist_t *resume_info;
98 uint64_t notify_txg;
99 uint64_t last_send_txg;
100 uint64_t cur_send_txg;
101 uint64_t cur_recv_txg;
102 uint64_t last_ack_txg;
103 uint64_t last_full_ack_txg;
104
105 uint64_t bytes_processed;
106
107 size_t keep_snaps;
108
109 krrp_txg_rpo_t avg_total_rpo;
110 krrp_txg_rpo_t avg_rpo;
111
112 uint64_t fake_data_sz;
113 krrp_queue_t *write_data_queue;
114 krrp_stream_mode_t mode;
115 krrp_stream_te_t *task_engine;
116 krrp_autosnap_t *autosnap;
117
118 krrp_pdu_data_t *cur_pdu;
119 krrp_stream_task_t *cur_task;
120 krrp_queue_t *debug_pdu_queue;
121 krrp_queue_t *debug_tasks_queue;
122 };
123
124 int krrp_stream_read_create(krrp_stream_t **result_stream,
125 size_t keep_snaps, const char *dataset, const char *base_snap_name,
126 const char *incr_snap_name, const char *resume_token,
127 krrp_stream_read_flag_t flags, const char *skip_snaps_mask,
128 krrp_error_t *error);
129 int krrp_stream_write_create(krrp_stream_t **result_stream,
130 size_t keep_snaps, const char *dataset, const char *incr_snap_name,
131 const char *resume_token, krrp_stream_write_flag_t flags,
132 nvlist_t *ignore_props_list, nvlist_t *replace_props_list,
133 krrp_error_t *error);
134 int krrp_stream_fake_read_create(krrp_stream_t **result_stream,
135 uint64_t fake_data_sz, krrp_error_t *error);
136 int krrp_stream_fake_write_create(krrp_stream_t **result_stream,
137 krrp_error_t *error);
138 void krrp_stream_destroy(krrp_stream_t *stream);
139
140 void krrp_stream_register_callback(krrp_stream_t *stream,
141 krrp_stream_cb_t *ev_cb, void *ev_cb_arg);
142
143 int krrp_stream_run(krrp_stream_t *stream, krrp_queue_t *write_data_queue,
144 krrp_pdu_engine_t *data_pdu_engine, krrp_error_t *error);
145
146 void krrp_stream_txg_confirmed(krrp_stream_t *, uint64_t, boolean_t);
147
148 void krrp_stream_stop(krrp_stream_t *stream);
149 int krrp_stream_send_stop(krrp_stream_t *stream);
150
151 boolean_t krrp_stream_is_write_flag_set(krrp_stream_write_flag_t flags,
152 krrp_stream_write_flag_t flag);
153 void krrp_stream_set_write_flag(krrp_stream_write_flag_t *flags,
154 krrp_stream_write_flag_t flag);
155 boolean_t krrp_stream_is_read_flag_set(krrp_stream_read_flag_t flags,
156 krrp_stream_read_flag_t flag);
157 void krrp_stream_set_read_flag(krrp_stream_read_flag_t *flags,
158 krrp_stream_read_flag_t flag);
159
160 #ifdef __cplusplus
161 }
162 #endif
163
164 #endif /* _KRRP_STREAM_H */