1 /*
2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
5 * 1.0 of the CDDL.
6 *
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
10 */
11
12 /*
13 * Copyright 2016 Nexenta Systems, Inc. All rights reserved.
14 */
15
16 /*
17 * PDU is high level structure (a container) that contains header, some ctrl
18 * information and list of dblks that contain payload.
19 * Over network are transfered header and dblks.
20 */
21
22 #include "krrp_svc.h"
23 #include "krrp_pdu.h"
24
25 /* #define KRRP_PDU_DEBUG 1 */
26
27 static struct {
28 uint64_t ref_cnt;
29 krrp_pdu_engine_t *ctrl_pdu_engine;
30 } krrp_global_pdu_engine_env = {0, NULL};
31
32 static void krrp_pdu_rele_task(void *void_pdu);
33 static void krrp_pdu_engine_notify_cb(void *void_pdu_engine);
34
35 int
36 krrp_pdu_engine_global_init()
37 {
38 int rc;
39 krrp_error_t error;
40
41 VERIFY(krrp_global_pdu_engine_env.ref_cnt == 0);
42 VERIFY(krrp_global_pdu_engine_env.ctrl_pdu_engine == NULL);
43
44 /*
45 * CTRL PDU Engine that will be used by all sessions:
46 * preallocation: B_FALSE
47 * max_memory: 100 MB (should be enough)
48 * dblk_per_pdu: 1
49 * dblk_head_sz: 0
50 * dblk_data_sz: 2 KB
51 * notify_free_cb: NULL
52 * notify_free_cb_arg: NULL
53 */
54 rc = krrp_pdu_engine_create(&krrp_global_pdu_engine_env.ctrl_pdu_engine,
55 B_TRUE, B_FALSE, 100, 1, 0, 2 * 1024, &error);
56
57 return (rc);
58 }
59
60 void
61 krrp_pdu_engine_global_fini()
62 {
63 VERIFY(krrp_global_pdu_engine_env.ctrl_pdu_engine != NULL);
64 VERIFY(krrp_global_pdu_engine_env.ref_cnt == 1);
65
66 krrp_pdu_engine_destroy(krrp_global_pdu_engine_env.ctrl_pdu_engine);
67 krrp_global_pdu_engine_env.ctrl_pdu_engine = NULL;
68 }
69
70 void
71 krrp_pdu_ctrl_alloc(krrp_pdu_ctrl_t **result_pdu, boolean_t with_header)
72 {
73 krrp_pdu_alloc(krrp_global_pdu_engine_env.ctrl_pdu_engine,
74 (krrp_pdu_t **)result_pdu, with_header);
75 }
76
77 int
78 krrp_pdu_engine_create(krrp_pdu_engine_t **result_engine, boolean_t ctrl,
79 boolean_t prealloc, size_t max_memory, size_t dblks_per_pdu,
80 size_t dblk_head_sz, size_t dblk_data_sz, krrp_error_t *error)
81 {
82 int rc;
83 krrp_pdu_engine_t *engine;
84 size_t total_dblk_sz, max_dblk_cnt;
85
86 VERIFY(result_engine != NULL && *result_engine == NULL);
87 VERIFY(max_memory != 0);
88 VERIFY(dblk_data_sz != 0);
89
90 engine = kmem_zalloc(sizeof (krrp_pdu_engine_t), KM_SLEEP);
91
92 engine->type = ctrl ? KRRP_PET_CTRL : KRRP_PET_DATA;
93
94 total_dblk_sz = dblk_head_sz + dblk_data_sz;
95 if (dblks_per_pdu == 0)
96 engine->dblks_per_pdu =
97 (KRRP_PDU_DEFAULT_SIZE / total_dblk_sz) + 1;
98 else
99 engine->dblks_per_pdu = dblks_per_pdu;
100
101 engine->max_pdu_cnt = (max_memory * 1024 * 1024) /
102 total_dblk_sz / engine->dblks_per_pdu;
103
104 engine->pdu_data_sz = dblk_data_sz * engine->dblks_per_pdu;
105
106 max_dblk_cnt = engine->max_pdu_cnt * engine->dblks_per_pdu;
107 rc = krrp_dblk_engine_create(&engine->dblk_engine, prealloc,
108 max_dblk_cnt, dblk_head_sz, dblk_data_sz,
109 engine->dblks_per_pdu, &krrp_pdu_engine_notify_cb, engine,
110 error);
111 if (rc != 0) {
112 kmem_free(engine, sizeof (krrp_pdu_engine_t));
113 return (-1);
114 }
115
116 atomic_inc_64(&krrp_global_pdu_engine_env.ref_cnt);
117
118 mutex_init(&engine->mtx, NULL, MUTEX_DEFAULT, NULL);
119 cv_init(&engine->cv, NULL, CV_DEFAULT, NULL);
120
121 cmn_err(CE_NOTE, "PDU Engine config: dblk_head_sz:[%lu], "
122 "dblk_data_sz:[%lu], max_mem:[%lu MB], "
123 "dblks_per_pdu:[%lu], prealloc:[%s]",
124 dblk_head_sz, dblk_data_sz, max_memory,
125 engine->dblks_per_pdu, (prealloc ? "YES" : "NO"));
126
127 *result_engine = engine;
128 return (0);
129 }
130
131 void
132 krrp_pdu_engine_destroy(krrp_pdu_engine_t *engine)
133 {
134 krrp_dblk_engine_destroy(engine->dblk_engine);
135
136 mutex_enter(&engine->mtx);
137
138 cv_destroy(&engine->cv);
139
140 mutex_exit(&engine->mtx);
141 mutex_destroy(&engine->mtx);
142
143 atomic_dec_64(&krrp_global_pdu_engine_env.ref_cnt);
144
145 kmem_free(engine, sizeof (krrp_pdu_engine_t));
146 }
147
148 void
149 krrp_pdu_engine_register_callback(krrp_pdu_engine_t *engine,
150 krrp_pdu_free_notify_cb_t *notify_cb, void *notify_cb_arg)
151 {
152 VERIFY(notify_cb != NULL);
153 VERIFY(engine->notify_free.cb == NULL);
154
155 mutex_enter(&engine->mtx);
156
157 engine->notify_free.cb = notify_cb;
158 engine->notify_free.cb_arg = notify_cb_arg;
159 engine->notify_free.init_value = engine->max_pdu_cnt >> 3;
160 if (engine->notify_free.init_value == 0)
161 engine->notify_free.init_value = 1;
162
163 engine->notify_free.cnt = engine->notify_free.init_value;
164
165 mutex_exit(&engine->mtx);
166 }
167
168 void
169 krrp_pdu_engine_force_notify(krrp_pdu_engine_t *engine, boolean_t initial)
170 {
171 size_t value;
172
173 mutex_enter(&engine->mtx);
174
175 VERIFY(engine->notify_free.cb != NULL);
176
177 if (initial) {
178 value = engine->max_pdu_cnt;
179 goto out;
180 }
181
182 value = engine->notify_free.init_value - engine->notify_free.cnt;
183 if (value == 0 && engine->cur_pdu_cnt == 0)
184 value = engine->notify_free.init_value;
185
186 engine->notify_free.cnt = engine->notify_free.init_value;
187
188 out:
189 if (value != 0)
190 engine->notify_free.cb(engine->notify_free.cb_arg, value);
191
192 mutex_exit(&engine->mtx);
193 }
194
195 size_t
196 krrp_pdu_engine_get_used_mem(krrp_pdu_engine_t *pdu_engine)
197 {
198 return (pdu_engine->pdu_data_sz * pdu_engine->cur_pdu_cnt);
199 }
200
201 size_t
202 krrp_pdu_engine_get_free_mem(krrp_pdu_engine_t *pdu_engine)
203 {
204 size_t free_pdus;
205
206 /*
207 * Mutex is not required here, because this
208 * function is called from zfs-send context,
209 * that does allocation of PDUs.
210 *
211 * Without mutex sometimes free mem may be less
212 * than it actually is, but this is noncritical.
213 */
214 free_pdus = pdu_engine->max_pdu_cnt - pdu_engine->cur_pdu_cnt;
215
216 return (pdu_engine->pdu_data_sz * free_pdus);
217 }
218
219 void
220 krrp_pdu_alloc(krrp_pdu_engine_t *pdu_engine, krrp_pdu_t **result_pdu,
221 boolean_t alloc_header)
222 {
223 clock_t time_left = 0;
224 krrp_dblk_engine_t *dblk_engine;
225 krrp_pdu_t *pdu = NULL;
226
227 VERIFY(pdu_engine != NULL);
228 VERIFY(result_pdu != NULL && *result_pdu == NULL);
229
230 #ifdef KRRP_PDU_DEBUG
231 cmn_err(CE_NOTE, "Init new PDU-[%s]",
232 (pdu_engine->type == KRRP_PET_DATA ? "DATA" : "CTRL"));
233 #endif
234
235 dblk_engine = pdu_engine->dblk_engine;
236
237 mutex_enter(&pdu_engine->mtx);
238 while ((pdu_engine->max_pdu_cnt - pdu_engine->cur_pdu_cnt) == 0) {
239 if (time_left == -1) {
240 mutex_exit(&pdu_engine->mtx);
241 return;
242 }
243
244 time_left = cv_reltimedwait(&pdu_engine->cv,
245 &pdu_engine->mtx, MSEC_TO_TICK(500), TR_CLOCK_TICK);
246 }
247
248 pdu_engine->cur_pdu_cnt++;
249 mutex_exit(&pdu_engine->mtx);
250
251 switch (pdu_engine->type) {
252 case KRRP_PET_DATA:
253 pdu = kmem_zalloc(sizeof (krrp_pdu_data_t), KM_SLEEP);
254 pdu->type = KRRP_PT_DATA;
255 break;
256 case KRRP_PET_CTRL:
257 pdu = kmem_zalloc(sizeof (krrp_pdu_ctrl_t), KM_SLEEP);
258 pdu->type = KRRP_PT_CTRL;
259 break;
260 }
261
262 pdu->max_data_sz = pdu_engine->dblks_per_pdu *
263 dblk_engine->dblk_data_sz;
264
265 if (alloc_header)
266 pdu->hdr = kmem_zalloc(sizeof (krrp_hdr_t), KM_SLEEP);
267
268 krrp_dblk_alloc(dblk_engine, &pdu->dblk,
269 pdu_engine->dblks_per_pdu);
270
271 /*
272 * Counter of free PDU is updated by events from dblk-engine,
273 * so if we here, then dblk-engine must not return NULL
274 */
275 VERIFY(pdu->dblk != NULL);
276
277 *result_pdu = pdu;
278 }
279
280 void
281 krrp_pdu_rele(krrp_pdu_t *pdu)
282 {
283 /*
284 * Async rele make sense only if PDU has dblks,
285 * because dblk_rele logic uses locks
286 *
287 * PDU has dblks at receiver side, because PDU initialized
288 * by connection and released by stream
289 *
290 * At sender side PDU does not contain dblk, because PDU intialized
291 * by stream and released by connection, but connection passes dblks
292 * to TCP/IP stack, where dblks are processed and released.
293 */
294 if (pdu->dblk == NULL) {
295 krrp_pdu_rele_task(pdu);
296 return;
297 }
298
299 krrp_svc_dispatch_task(krrp_pdu_rele_task, pdu);
300 }
301
302 static void
303 krrp_pdu_rele_task(void *void_pdu)
304 {
305 krrp_pdu_t *pdu = void_pdu;
306
307 #ifdef KRRP_PDU_DEBUG
308 cmn_err(CE_NOTE, "RELE PDU: [%d]", pdu->type);
309 #endif
310
311 if (pdu->dblk != NULL) {
312 krrp_dblk_rele(pdu->dblk);
313 pdu->dblk = NULL;
314 }
315
316 if (pdu->hdr != NULL) {
317 kmem_free(pdu->hdr, sizeof (krrp_hdr_t));
318 pdu->hdr = NULL;
319 }
320
321 switch (pdu->type) {
322 case KRRP_PT_DATA:
323 kmem_free(pdu, sizeof (krrp_pdu_data_t));
324 break;
325 case KRRP_PT_CTRL:
326 kmem_free(pdu, sizeof (krrp_pdu_ctrl_t));
327 break;
328 }
329 }
330
331 static void
332 krrp_pdu_engine_notify_cb(void *void_pdu_engine)
333 {
334 krrp_pdu_engine_t *engine = void_pdu_engine;
335
336 mutex_enter(&engine->mtx);
337
338 #ifdef KRRP_PDU_DEBUG
339 cmn_err(CE_NOTE, "Dblk rele notification [%d]", engine->type);
340 #endif
341
342 ASSERT(engine->cur_pdu_cnt > 0);
343
344 engine->cur_pdu_cnt--;
345 if (engine->notify_free.cb != NULL) {
346 engine->notify_free.cnt--;
347 if (engine->notify_free.cnt == 0) {
348 engine->notify_free.cnt =
349 engine->notify_free.init_value;
350 engine->notify_free.cb(engine->notify_free.cb_arg,
351 engine->notify_free.cnt);
352 }
353 }
354
355 cv_broadcast(&engine->cv);
356 mutex_exit(&engine->mtx);
357 }
358
359 int
360 krrp_pdu_get_payload(krrp_pdu_ctrl_t *pdu, void **res_data, size_t *res_data_sz)
361 {
362 size_t payload_sz;
363 void *data;
364 int rc;
365
366 ASSERT(pdu != NULL && pdu->hdr != NULL);
367 ASSERT(res_data != NULL && res_data_sz != NULL);
368
369 payload_sz = pdu->hdr->payload_sz;
370 if (payload_sz == 0) {
371 *res_data = NULL;
372 *res_data_sz = 0;
373 return (0);
374 }
375
376 data = kmem_alloc(payload_sz, KM_SLEEP);
377 rc = krrp_dblk_get_data(pdu->dblk, data, payload_sz);
378 if (rc != 0) {
379 kmem_free(data, payload_sz);
380 *res_data = NULL;
381 *res_data_sz = 0;
382 return (rc);
383 }
384
385 *res_data = data;
386 *res_data_sz = payload_sz;
387 return (0);
388 }
389
390 int
391 krrp_pdu_get_nvl_from_payload(krrp_pdu_ctrl_t *pdu, nvlist_t **res_nvl)
392 {
393 void *data = NULL;
394 size_t data_sz = 0;
395 int rc;
396
397 rc = krrp_pdu_get_payload(pdu, &data, &data_sz);
398 if (rc != 0)
399 return (rc);
400
401 rc = nvlist_unpack(data, data_sz, res_nvl, KM_SLEEP);
402
403 kmem_free(data, data_sz);
404
405 return (rc);
406 }