1 /*
2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
5 * 1.0 of the CDDL.
6 *
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
10 */
11
12 /*
13 * Copyright 2018 Nexenta Systems, Inc. All rights reserved.
14 */
15
16 /*
17 * Session establishment AUTH Schema:
18 *
19 * 1. The userspace over the secured Ctrl-Path negotiate
20 * about an secured value
21 *
22 * 2. The manager and agent pass digest of the secured value to
23 * kernel as part of params for 'sess_create' IOCTL
24 *
25 * 3. During connection establishment phase the manager's session sends
26 * the digest to remote side.
27 *
28 * 4. The agent's session compares the received digest to available digest
29 *
30 * 5. If the agent's session does not have digest, then the received digest
31 * will be ignored.
32 *
33 * 6. If the agent's session has a digest and this digest is not equal to
34 * the received digest, or the received params do not a digest then
35 * the correspoinding session establishment request will be rejected.
36 */
37
38 #include <sys/types.h>
39 #include <sys/conf.h>
40 #include <sys/sysmacros.h>
41 #include <sys/cmn_err.h>
42 #include <sys/stat.h>
43 #include <sys/ddi.h>
44 #include <sys/sunddi.h>
45 #include <sys/sdt.h>
46
47 #include <krrp_params.h>
48
49 #include "krrp_protocol.h"
50 #include "krrp_svc.h"
51 #include "krrp_session.h"
52
53 /* #define KRRP_SESS_DEBUG 1 */
54
55 static int krrp_sess_common_attach_conn(krrp_sess_t *sess, krrp_conn_t *conn,
56 krrp_error_t *error);
57
58 static void krrp_sess_start_ping(krrp_sess_t *sess);
59 static void krrp_sess_stop_ping(krrp_sess_t *sess);
60 static void krrp_sess_ping_cb(void *void_sess);
61 static void krrp_sess_ping_request(krrp_sess_t *sess);
62
63 static void krrp_sess_lr_stream_cb(krrp_stream_cb_ev_t ev, uintptr_t ev_arg,
64 void *void_sess);
65 static void krrp_sess_ll_stream_cb(krrp_stream_cb_ev_t ev, uintptr_t ev_arg,
66 void *void_sess);
67 static void krrp_sess_stream_error(krrp_sess_t *sess, krrp_error_t *error);
68 static void krrp_sess_txg_recv_done(krrp_sess_t *sess, uint64_t txg,
69 boolean_t complete);
70
71 static void krrp_sess_post_error_uevent(krrp_sess_t *sess,
72 krrp_error_t *error);
73 static void krrp_sess_post_send_done_uevent(krrp_sess_t *sess);
74
75 static void krrp_sess_lr_data_pdu_from_stream(krrp_sess_t *sess,
76 krrp_pdu_data_t *pdu);
77 static void krrp_sess_ll_data_pdu_from_stream(krrp_sess_t *sess,
78 krrp_pdu_data_t *pdu);
79 static void krrp_sess_lr_txg_recv_done(krrp_sess_t *sess, uint64_t txg);
80 static void krrp_sess_ll_txg_recv_done(krrp_sess_t *sess, uint64_t txg);
81 static void krrp_sess_lr_send_done(krrp_sess_t *sess);
82 static void krrp_sess_ll_send_done(krrp_sess_t *sess);
83
84 static void krrp_sess_pdu_engine_cb(void *void_sess, size_t released_pdus);
85 static void krrp_sess_conn_cb(void *void_conn, krrp_conn_cb_ev_t ev,
86 uintptr_t ev_arg, void *void_sess);
87
88 static void krrp_sess_kstat_init(krrp_sess_t *sess);
89 static int krrp_sess_kstat_update(kstat_t *ks, int rw);
90 static void krrp_sess_sender_kstat_update(krrp_sess_t *sess, kstat_t *ks);
91 static void krrp_sess_receiver_kstat_update(krrp_sess_t *sess, kstat_t *ks);
92 static void krrp_sess_compound_kstat_update(krrp_sess_t *sess, kstat_t *ks);
93
94 static void krrp_sess_conn_error(krrp_sess_t *sess, krrp_conn_t *conn,
95 krrp_error_t *error);
96 static void krrp_sess_ctrl_pdu_from_network(krrp_sess_t *sess,
97 krrp_pdu_ctrl_t *pdu);
98 static void krrp_sess_data_pdu_from_network(krrp_sess_t *sess,
99 krrp_pdu_data_t *pdu);
100
101 static int krrp_sess_fl_ctrl_validate(krrp_sess_t *sess,
102 krrp_hdr_data_t *hdr);
103 static void krrp_sess_fl_ctrl_calc_cwnd_window(krrp_fl_ctrl_t *fl_ctrl,
104 uint64_t new_recv_window);
105 static void krrp_sess_fl_ctrl_update_tx(krrp_fl_ctrl_t *fl_ctrl,
106 uint64_t max_pdu_seq_num);
107 static uint64_t krrp_sess_fl_ctrl_update_rx(krrp_fl_ctrl_t *fl_ctrl,
108 size_t window_offset);
109 static void krrp_sess_get_data_pdu_to_tx(void *void_sess,
110 krrp_pdu_t **result_pdu);
111
112 static int krrp_sess_set_kstat_id(krrp_sess_t *sess,
113 const char *kstat_id_str, krrp_error_t *error);
114
115 static int krrp_sess_set_auth_digest(krrp_sess_t *sess,
116 const char *auth_digest, krrp_error_t *error);
117
118 static void krrp_sess_nomem_error(krrp_sess_t *sess);
119 static void krrp_sess_error(krrp_sess_t *sess, krrp_error_t *error);
120
121 static int krrp_sess_inc_ref_cnt(krrp_sess_t *sess);
122 static void krrp_sess_dec_ref_cnt(krrp_sess_t *sess);
123
124 static void krrp_sess_send_shutdown(krrp_sess_t *sess);
125
126
127 #define XX(name, dtype, def_value) {#name, dtype, def_value},
128 static const krrp_sess_sender_kstat_t sess_sender_stats_templ = {
129 KRRP_SESS_SENDER_STAT_NAME_MAP(XX)
130 };
131
132 static const krrp_sess_receiver_kstat_t sess_receiver_stats_templ = {
133 KRRP_SESS_RECEIVER_STAT_NAME_MAP(XX)
134 };
135
136 static const krrp_sess_receiver_kstat_t sess_compound_stats_templ = {
137 KRRP_SESS_COMPOUND_STAT_NAME_MAP(XX)
138 };
139 #undef XX
140
141 /*
142 * 0 - disable re-calculation
143 * 1 - use algorithm1
144 * 2 - use algorithm2
145 */
146 int krrp_sess_cwnd_state = 2;
147
148 /*
149 * 0 - pass all PDUs to stream (default)
150 * any other - do not pass received PDUs to stream
151 */
152 int krrp_sess_recv_without_stream = 0;
153
154 int
155 krrp_sess_create(krrp_sess_t **result_sess, const char *id,
156 const char *kstat_id, const char *auth_digest,
157 boolean_t sender, boolean_t fake_mode,
158 boolean_t compound, krrp_error_t *error)
159 {
160 krrp_sess_t *sess;
161
162 VERIFY(result_sess != NULL && *result_sess == NULL);
163
164 if (compound && (sender || auth_digest != NULL)) {
165 krrp_error_set(error, KRRP_ERRNO_SESS, EINVAL);
166 return (-1);
167 }
168
169 sess = kmem_zalloc(sizeof (krrp_sess_t), KM_SLEEP);
170
171 if (krrp_sess_set_id(sess, id, error) != 0)
172 goto err;
173
174 if (krrp_sess_set_kstat_id(sess, kstat_id, error) != 0)
175 goto err;
176
177 if (krrp_sess_set_auth_digest(sess, auth_digest, error) != 0)
178 goto err;
179
180 mutex_init(&sess->mtx, NULL, MUTEX_DEFAULT, NULL);
181 cv_init(&sess->cv, NULL, CV_DEFAULT, NULL);
182
183 mutex_init(&sess->fl_ctrl.mtx, NULL, MUTEX_DEFAULT, NULL);
184 cv_init(&sess->fl_ctrl.cv, NULL, CV_DEFAULT, NULL);
185
186 krrp_queue_init(&sess->data_write_queue, sizeof (krrp_pdu_t),
187 offsetof(krrp_pdu_t, node));
188 krrp_queue_init(&sess->data_tx_queue, sizeof (krrp_pdu_t),
189 offsetof(krrp_pdu_t, node));
190 krrp_queue_init(&sess->ctrl_tx_queue, sizeof (krrp_pdu_t),
191 offsetof(krrp_pdu_t, node));
192
193 if (compound)
194 sess->type = KRRP_SESS_COMPOUND;
195 else if (sender)
196 sess->type = KRRP_SESS_SENDER;
197 else
198 sess->type = KRRP_SESS_RECEIVER;
199
200 sess->fake_mode = fake_mode;
201
202 *result_sess = sess;
203
204 return (0);
205
206 err:
207 kmem_free(sess, sizeof (krrp_sess_t));
208 return (-1);
209 }
210
211 void
212 krrp_sess_destroy(krrp_sess_t *sess)
213 {
214 krrp_pdu_t *pdu;
215
216 krrp_sess_stop_ping(sess);
217
218 mutex_enter(&sess->mtx);
219
220 if (!sess->shutdown && sess->conn != NULL) {
221 sess->shutdown = B_TRUE;
222 krrp_sess_send_shutdown(sess);
223
224 (void) cv_reltimedwait(&sess->cv, &sess->mtx,
225 SEC_TO_TICK(2), TR_CLOCK_TICK);
226 }
227
228 sess->destroying = B_TRUE;
229 while (sess->ref_cnt > 0)
230 cv_wait(&sess->cv, &sess->mtx);
231
232 mutex_exit(&sess->mtx);
233
234 if (sess->kstat.ctx != NULL)
235 kstat_delete(sess->kstat.ctx);
236
237 if (sess->conn != NULL)
238 krrp_conn_destroy(sess->conn);
239
240 if (sess->stream_read != NULL)
241 krrp_stream_destroy(sess->stream_read);
242
243 if (sess->stream_write != NULL)
244 krrp_stream_destroy(sess->stream_write);
245
246 while ((pdu = krrp_queue_get_no_wait(sess->data_write_queue)) != NULL)
247 krrp_pdu_rele(pdu);
248
249 while ((pdu = krrp_queue_get_no_wait(sess->data_tx_queue)) != NULL)
250 krrp_pdu_rele(pdu);
251
252 while ((pdu = krrp_queue_get_no_wait(sess->ctrl_tx_queue)) != NULL)
253 krrp_pdu_rele(pdu);
254
255 krrp_queue_fini(sess->data_write_queue);
256 krrp_queue_fini(sess->data_tx_queue);
257 krrp_queue_fini(sess->ctrl_tx_queue);
258
259 if (sess->data_pdu_engine != NULL)
260 krrp_pdu_engine_destroy(sess->data_pdu_engine);
261
262 if (sess->private_data != NULL)
263 fnvlist_free(sess->private_data);
264
265 cv_destroy(&sess->fl_ctrl.cv);
266 mutex_destroy(&sess->fl_ctrl.mtx);
267
268 cv_destroy(&sess->cv);
269 mutex_destroy(&sess->mtx);
270
271 kmem_free(sess, sizeof (krrp_sess_t));
272 }
273
274 int
275 krrp_sess_run(krrp_sess_t *sess, boolean_t only_once, krrp_error_t *error)
276 {
277 int rc = -1;
278 boolean_t fatal_error = B_FALSE;
279 boolean_t stream_is_not_defined = B_FALSE;
280
281 mutex_enter(&sess->mtx);
282
283 if (sess->error.krrp_errno != 0) {
284 (void) memcpy(error, &sess->error, sizeof (krrp_error_t));
285 goto out;
286 }
287
288 if (sess->type == KRRP_SESS_RECEIVER && only_once) {
289 krrp_error_set(error, KRRP_ERRNO_SESS, EINVAL);
290 goto out;
291 }
292
293 if (sess->started) {
294 krrp_error_set(error, KRRP_ERRNO_SESS, EALREADY);
295 goto out;
296 }
297
298 switch (sess->type) {
299 case KRRP_SESS_COMPOUND:
300 stream_is_not_defined = (sess->stream_read == NULL ||
301 sess->stream_write == NULL);
302 break;
303 case KRRP_SESS_SENDER:
304 stream_is_not_defined = (sess->stream_read == NULL);
305 break;
306 case KRRP_SESS_RECEIVER:
307 stream_is_not_defined = (sess->stream_write == NULL);
308 break;
309 }
310
311 if (stream_is_not_defined) {
312 krrp_error_set(error, KRRP_ERRNO_STREAM, ENOENT);
313 goto out;
314 }
315
316 if (sess->type != KRRP_SESS_COMPOUND && sess->conn == NULL) {
317 krrp_error_set(error, KRRP_ERRNO_CONN, ENOENT);
318 goto out;
319 }
320
321 if (sess->data_pdu_engine == NULL) {
322 krrp_error_set(error, KRRP_ERRNO_PDUENGINE, ENOENT);
323 goto out;
324 }
325
326 if (sess->type != KRRP_SESS_RECEIVER && only_once &&
327 sess->stream_read->non_continuous) {
328 krrp_error_set(error, KRRP_ERRNO_STREAM, EINVAL);
329 goto out;
330 }
331
332 /*
333 * Need to store error to session, to be able to
334 * return it again, because an error that might
335 * occur bellow cannot be fixed without
336 * recreation of session.
337 */
338 fatal_error = B_TRUE;
339
340 if (sess->type == KRRP_SESS_COMPOUND) {
341 rc = krrp_stream_run(sess->stream_write,
342 sess->data_write_queue,
343 sess->data_pdu_engine, error);
344 if (rc != 0)
345 goto out;
346
347 rc = krrp_stream_run(sess->stream_read,
348 sess->data_write_queue,
349 sess->data_pdu_engine, error);
350 if (rc != 0) {
351 krrp_stream_stop(sess->stream_write);
352 goto out;
353 }
354 } else {
355 krrp_stream_t *stream;
356
357 stream = sess->type == KRRP_SESS_SENDER ?
358 sess->stream_read : sess->stream_write;
359
360 rc = krrp_stream_run(stream, sess->data_write_queue,
361 sess->data_pdu_engine, error);
362 if (rc != 0)
363 goto out;
364
365 if (sess->type == KRRP_SESS_SENDER)
366 krrp_conn_run(sess->conn, sess->ctrl_tx_queue,
367 sess->data_pdu_engine,
368 &krrp_sess_get_data_pdu_to_tx, sess);
369 else
370 krrp_conn_run(sess->conn, sess->ctrl_tx_queue,
371 sess->data_pdu_engine, NULL, NULL);
372
373 krrp_sess_start_ping(sess);
374 }
375
376 krrp_sess_kstat_init(sess);
377
378 sess->started = B_TRUE;
379 sess->running = B_TRUE;
380
381 out:
382 if (rc != 0 && fatal_error) {
383 ASSERT(error->krrp_errno != 0);
384
385 sess->error.krrp_errno = error->krrp_errno;
386 sess->error.unix_errno = error->unix_errno;
387 }
388 mutex_exit(&sess->mtx);
389
390 if (rc == 0 && sess->type == KRRP_SESS_RECEIVER)
391 krrp_pdu_engine_force_notify(sess->data_pdu_engine, B_TRUE);
392
393 /*
394 * Only once means that after successfully start
395 * we immediately do gracefull shutdown
396 *
397 * There is no reason to implement a complex logic that will handle
398 * processing of only one snapshot
399 */
400 if (rc == 0 && sess->type != KRRP_SESS_RECEIVER && only_once)
401 (void) krrp_stream_send_stop(sess->stream_read);
402
403 return (rc);
404 }
405
406 void
407 krrp_sess_get_status(krrp_sess_t *sess, nvlist_t *result)
408 {
409 boolean_t true_value = B_TRUE;
410
411 mutex_enter(&sess->mtx);
412
413 switch (sess->type) {
414 case KRRP_SESS_SENDER:
415 VERIFY0(krrp_param_put(KRRP_PARAM_SESS_SENDER,
416 result, &true_value));
417 break;
418 case KRRP_SESS_RECEIVER:
419 break;
420 case KRRP_SESS_COMPOUND:
421 VERIFY0(krrp_param_put(KRRP_PARAM_SESS_COMPOUND,
422 result, &true_value));
423 break;
424 }
425
426 VERIFY0(krrp_param_put(KRRP_PARAM_SESS_ID,
427 result, sess->id));
428 VERIFY0(krrp_param_put(KRRP_PARAM_SESS_KSTAT_ID,
429 result, sess->kstat.id));
430 VERIFY0(krrp_param_put(KRRP_PARAM_SESS_STARTED,
431 result, &sess->started));
432 VERIFY0(krrp_param_put(KRRP_PARAM_SESS_RUNNING,
433 result, &sess->running));
434
435 if (sess->error.krrp_errno != 0)
436 krrp_error_to_nvl(&sess->error, &result);
437
438 mutex_exit(&sess->mtx);
439 }
440
441 /*
442 * Retrieve information about existing connection
443 * and place it to result NVL.
444 */
445 int
446 krrp_sess_get_conn_info(krrp_sess_t *sess, nvlist_t *result,
447 krrp_error_t *error)
448 {
449 if (sess->type == KRRP_SESS_COMPOUND) {
450 krrp_error_set(error, KRRP_ERRNO_SESS, EINVAL);
451 return (-1);
452 }
453
454 if (sess->conn == NULL) {
455 krrp_error_set(error, KRRP_ERRNO_CONN, ENOENT);
456 return (-1);
457 }
458
459 VERIFY0(krrp_param_put(KRRP_PARAM_DBLK_DATA_SIZE,
460 result, (void *)&sess->conn->blk_sz));
461
462 return (0);
463 }
464
465 /*
466 * 'Started' means 'sess_run' IOCTL was successfully processed
467 */
468 boolean_t
469 krrp_sess_is_started(krrp_sess_t *sess)
470 {
471 boolean_t result;
472
473 mutex_enter(&sess->mtx);
474 result = sess->started;
475 mutex_exit(&sess->mtx);
476
477 return (result);
478 }
479
480 /*
481 * 'Running' means 'sess_run' IOCTL was successfully processed
482 * and now session does replication.
483 *
484 * It is possible to have: started == B_TRUE && running == B_FALSE.
485 * This means an error occured and now session waits for control action
486 * from userspace
487 */
488 boolean_t
489 krrp_sess_is_running(krrp_sess_t *sess)
490 {
491 boolean_t result;
492
493 mutex_enter(&sess->mtx);
494 result = sess->running;
495 mutex_exit(&sess->mtx);
496
497 return (result);
498 }
499
500 int
501 krrp_sess_throttle_conn(krrp_sess_t *sess, size_t limit,
502 krrp_error_t *error)
503 {
504 int rc = -1;
505
506 if (sess->type != KRRP_SESS_SENDER) {
507 krrp_error_set(error, KRRP_ERRNO_SESS, EINVAL);
508 goto out;
509 }
510
511 if (sess->conn == NULL) {
512 krrp_error_set(error, KRRP_ERRNO_CONN, ENOENT);
513 goto out;
514 }
515
516 krrp_conn_throttle_set(sess->conn, limit,
517 !krrp_sess_is_running(sess));
518 rc = 0;
519
520 out:
521 return (rc);
522 }
523
524 static void
525 krrp_sess_start_ping(krrp_sess_t *sess)
526 {
527 sess->ping_timer = timeout(krrp_sess_ping_cb, sess,
528 drv_usectohz(5 * 1000000));
529 }
530
531 static void
532 krrp_sess_stop_ping(krrp_sess_t *sess)
533 {
534 timeout_id_t saved_timer;
535
536 mutex_enter(&sess->mtx);
537 saved_timer = sess->ping_timer;
538 sess->ping_timer = NULL;
539 mutex_exit(&sess->mtx);
540
541 if (saved_timer != NULL)
542 (void) untimeout(saved_timer);
543 }
544
545 static void
546 krrp_sess_ping_cb(void *void_sess)
547 {
548 krrp_pdu_ctrl_t *pdu = NULL;
549 krrp_sess_t *sess = void_sess;
550
551 if (sess->ping_wait_for_response) {
552 krrp_error_t error;
553
554 krrp_error_init(&error);
555 krrp_error_set(&error, KRRP_ERRNO_PINGTIMEOUT, 0);
556 krrp_sess_post_error_uevent(sess, &error);
557 goto out;
558 }
559
560 krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
561 if (pdu == NULL) {
562 cmn_err(CE_WARN, "No memory to send PING request");
563 return;
564 }
565
566 pdu->hdr->opcode = KRRP_OPCODE_PING;
567 krrp_queue_put(sess->ctrl_tx_queue, pdu);
568 sess->ping_wait_for_response = B_TRUE;
569
570 out:
571 mutex_enter(&sess->mtx);
572
573 if (sess->ping_timer != NULL && !sess->destroying)
574 krrp_sess_start_ping(sess);
575
576 mutex_exit(&sess->mtx);
577 }
578
579 int
580 krrp_sess_send_stop(krrp_sess_t *sess, krrp_error_t *error)
581 {
582 int rc = -1;
583
584 if (!krrp_sess_is_running(sess)) {
585 krrp_error_set(error, KRRP_ERRNO_SESS, ENOTACTIVE);
586 } else {
587 if (sess->type == KRRP_SESS_RECEIVER)
588 krrp_error_set(error, KRRP_ERRNO_SESS, EINVAL);
589 else if (sess->stream_read->non_continuous)
590 krrp_error_set(error, KRRP_ERRNO_STREAM, EINVAL);
591 else if (krrp_stream_send_stop(sess->stream_read) != 0)
592 krrp_error_set(error, KRRP_ERRNO_STREAM, EALREADY);
593 else
594 rc = 0;
595 }
596
597 return (rc);
598 }
599
600 int
601 krrp_sess_attach_pdu_engine(krrp_sess_t *sess,
602 krrp_pdu_engine_t *pdu_engine, krrp_error_t *error)
603 {
604 int rc = -1;
605
606 mutex_enter(&sess->mtx);
607
608 if (sess->data_pdu_engine != NULL) {
609 krrp_error_set(error, KRRP_ERRNO_SESS, EALREADY);
610 goto out;
611 }
612
613 VERIFY(pdu_engine->type == KRRP_PET_DATA);
614 VERIFY(sess->data_pdu_engine == NULL);
615
616 sess->data_pdu_engine = pdu_engine;
617
618 /*
619 * The receiver uses notifications from PDU engine to
620 * update RX Window and send the info to the sender
621 */
622 if (sess->type == KRRP_SESS_RECEIVER)
623 krrp_pdu_engine_register_callback(pdu_engine,
624 &krrp_sess_pdu_engine_cb, sess);
625
626 rc = 0;
627 out:
628 mutex_exit(&sess->mtx);
629 return (rc);
630 }
631
632 int
633 krrp_sess_attach_read_stream(krrp_sess_t *sess,
634 krrp_stream_t *stream, krrp_error_t *error)
635 {
636 int rc = -1;
637
638 VERIFY(stream != NULL);
639 VERIFY(sess->type != KRRP_SESS_RECEIVER);
640
641 mutex_enter(&sess->mtx);
642
643 if (sess->stream_read != NULL) {
644 krrp_error_set(error, KRRP_ERRNO_SESS, EALREADY);
645 goto out;
646 }
647
648 sess->stream_read = stream;
649
650 if (sess->type == KRRP_SESS_COMPOUND)
651 krrp_stream_register_callback(stream,
652 &krrp_sess_ll_stream_cb, sess);
653 else
654 krrp_stream_register_callback(stream,
655 &krrp_sess_lr_stream_cb, sess);
656
657 rc = 0;
658
659 out:
660 mutex_exit(&sess->mtx);
661
662 return (rc);
663 }
664
665 int
666 krrp_sess_attach_write_stream(krrp_sess_t *sess,
667 krrp_stream_t *stream, krrp_error_t *error)
668 {
669 int rc = -1;
670
671 VERIFY(stream != NULL);
672 VERIFY(sess->type != KRRP_SESS_SENDER);
673
674 mutex_enter(&sess->mtx);
675
676 if (sess->stream_write != NULL) {
677 krrp_error_set(error, KRRP_ERRNO_SESS, EALREADY);
678 goto out;
679 }
680
681 sess->stream_write = stream;
682
683 if (sess->type == KRRP_SESS_COMPOUND)
684 krrp_stream_register_callback(stream,
685 &krrp_sess_ll_stream_cb, sess);
686 else
687 krrp_stream_register_callback(stream,
688 &krrp_sess_lr_stream_cb, sess);
689
690 rc = 0;
691
692 out:
693 mutex_exit(&sess->mtx);
694
695 return (rc);
696 }
697
698 int
699 krrp_sess_target_attach_conn(krrp_sess_t *sess, krrp_conn_t *conn,
700 nvlist_t *params, krrp_error_t *error)
701 {
702 int rc;
703 boolean_t fake_mode;
704
705 if (krrp_sess_common_attach_conn(sess, conn, error) != 0)
706 return (-1);
707
708 fake_mode = krrp_param_exists(KRRP_PARAM_FAKE_MODE, params);
709
710 if (fake_mode && sess->type == KRRP_SESS_RECEIVER && !sess->fake_mode) {
711 cmn_err(CE_WARN, "It is impossible to use real receiver "
712 "together with fake sender");
713 krrp_error_set(error, KRRP_ERRNO_PROTO, EFAULT);
714 goto err;
715 }
716
717 if (sess->auth_digest[0] != '\0') {
718 const char *auth_data = NULL;
719
720 rc = krrp_param_get(KRRP_PARAM_AUTH_DATA,
721 params, (void *)&auth_data);
722 if (rc != 0) {
723 krrp_error_set(error, KRRP_ERRNO_AUTH, ENOENT);
724 goto err;
725 }
726
727 if (strcmp(sess->auth_digest, auth_data) != 0) {
728 krrp_error_set(error, KRRP_ERRNO_AUTH, EINVAL);
729 goto err;
730 }
731 }
732
733 rc = krrp_conn_send_ctrl_data(conn,
734 KRRP_OPCODE_ATTACH_SESS, NULL, error);
735 if (rc != 0)
736 goto err;
737
738 krrp_conn_register_callback(sess->conn,
739 &krrp_sess_conn_cb, sess);
740
741 return (0);
742
743 err:
744 mutex_enter(&sess->mtx);
745 sess->conn = NULL;
746 mutex_exit(&sess->mtx);
747 return (-1);
748 }
749
750 int
751 krrp_sess_initiator_attach_conn(krrp_sess_t *sess, krrp_conn_t *conn,
752 krrp_error_t *error)
753 {
754 int rc;
755 nvlist_t *params;
756 krrp_pdu_ctrl_t *pdu = NULL;
757
758 if (krrp_sess_common_attach_conn(sess, conn, error) != 0)
759 return (-1);
760
761 params = fnvlist_alloc();
762
763 if (sess->auth_digest[0] != '\0') {
764 (void) krrp_param_put(KRRP_PARAM_AUTH_DATA,
765 params, sess->auth_digest);
766 }
767
768 if (sess->type == KRRP_SESS_SENDER && sess->fake_mode)
769 (void) krrp_param_put(KRRP_PARAM_FAKE_MODE, params, NULL);
770
771 (void) krrp_param_put(KRRP_PARAM_SESS_ID,
772 params, sess->id);
773
774 rc = krrp_conn_send_ctrl_data(conn,
775 KRRP_OPCODE_ATTACH_SESS, params, error);
776 fnvlist_free(params);
777 if (rc != 0)
778 goto err;
779
780 rc = krrp_conn_rx_ctrl_pdu(conn, &pdu, error);
781 if (rc != 0)
782 goto err;
783
784 if (krrp_pdu_opcode(pdu) != KRRP_OPCODE_ATTACH_SESS) {
785 if (krrp_pdu_opcode(pdu) == KRRP_OPCODE_ERROR) {
786 nvlist_t *error_nvl = NULL;
787
788 cmn_err(CE_WARN, "Remote side returned an error");
789
790 rc = krrp_pdu_get_nvl_from_payload(pdu, &error_nvl);
791 if (rc == 0) {
792 rc = krrp_error_from_nvl(error, error_nvl);
793 if (rc == 0) {
794 if (error->krrp_errno == 0) {
795 /*
796 * Something wrong, so we will return
797 * KRRP_ERRNO_BADRESP
798 */
799 rc = -1;
800 } else {
801 krrp_error_set_flag(error,
802 KRRP_ERRF_REMOTE);
803 }
804 }
805
806 fnvlist_free(error_nvl);
807 }
808 }
809
810 if (rc != 0)
811 krrp_error_set(error, KRRP_ERRNO_BADRESP, 0);
812 else
813 rc = -1;
814 }
815
816 krrp_pdu_rele((krrp_pdu_t *)pdu);
817
818 if (rc != 0)
819 goto err;
820
821 krrp_conn_register_callback(sess->conn,
822 &krrp_sess_conn_cb, sess);
823
824 return (0);
825
826 err:
827 mutex_enter(&sess->mtx);
828 sess->conn = NULL;
829 mutex_exit(&sess->mtx);
830 return (-1);
831 }
832
833 /*
834 * Here we check that:
835 * - the session does not have attached connection
836 * - the session is not compound (local 2 local replication)
837 */
838 static int
839 krrp_sess_common_attach_conn(krrp_sess_t *sess, krrp_conn_t *conn,
840 krrp_error_t *error)
841 {
842 int rc = -1;
843
844 mutex_enter(&sess->mtx);
845
846 if (sess->conn != NULL)
847 krrp_error_set(error, KRRP_ERRNO_SESS, EALREADY);
848 else if (sess->type == KRRP_SESS_COMPOUND)
849 krrp_error_set(error, KRRP_ERRNO_SESS, EINVAL);
850 else {
851 sess->conn = conn;
852 rc = 0;
853 }
854
855 mutex_exit(&sess->mtx);
856
857 return (rc);
858 }
859
860 /* This function is used by logic that adds session to the sessions AVL */
861 int
862 krrp_sess_compare_id(const void *opaque_sess1, const void *opaque_sess2)
863 {
864 size_t i;
865 const krrp_sess_t *sess1, *sess2;
866
867 sess1 = opaque_sess1;
868 sess2 = opaque_sess2;
869
870 for (i = 0; i < UUID_PRINTABLE_STRING_LENGTH - 1; i++) {
871 if (sess1->id[i] > sess2->id[i])
872 return (1);
873
874 if (sess1->id[i] < sess2->id[i])
875 return (-1);
876 }
877
878 return (0);
879 }
880
881 int
882 krrp_sess_set_id(krrp_sess_t *sess, const char *sess_id,
883 krrp_error_t *error)
884 {
885 int rc = -1;
886
887 VERIFY(sess_id != NULL);
888
889 if (strlen(sess_id) != (UUID_PRINTABLE_STRING_LENGTH - 1)) {
890 krrp_error_set(error, KRRP_ERRNO_SESSID, EINVAL);
891 goto out;
892 }
893
894 (void) strlcpy(sess->id, sess_id, UUID_PRINTABLE_STRING_LENGTH);
895 rc = 0;
896
897 out:
898 return (rc);
899 }
900
901 static int
902 krrp_sess_set_kstat_id(krrp_sess_t *sess, const char *kstat_id,
903 krrp_error_t *error)
904 {
905 int rc = -1;
906
907 VERIFY(kstat_id != NULL);
908
909 if (strlen(kstat_id) != (KRRP_KSTAT_ID_STRING_LENGTH - 1)) {
910 krrp_error_set(error, KRRP_ERRNO_KSTATID, EINVAL);
911 goto out;
912 }
913
914 (void) strlcpy(sess->kstat.id, kstat_id, KRRP_KSTAT_ID_STRING_LENGTH);
915 rc = 0;
916
917 out:
918 return (rc);
919 }
920
921 static int
922 krrp_sess_set_auth_digest(krrp_sess_t *sess, const char *auth_digest,
923 krrp_error_t *error)
924 {
925 if (auth_digest != NULL) {
926 if (strlcpy(sess->auth_digest, auth_digest,
927 KRRP_AUTH_DIGEST_MAX_LEN) >= KRRP_AUTH_DIGEST_MAX_LEN) {
928 krrp_error_set(error, KRRP_ERRNO_AUTH, EINVAL);
929 return (-1);
930 }
931 }
932
933 return (0);
934 }
935
936 static void
937 krrp_sess_kstat_init(krrp_sess_t *sess)
938 {
939 sess->kstat.ctx = kstat_create("krrp_session", 0,
940 sess->kstat.id, "misc", KSTAT_TYPE_NAMED,
941 0, KSTAT_FLAG_VIRTUAL);
942
943 if (sess->kstat.ctx == NULL) {
944 cmn_err(CE_WARN, "Failed to create kstat with id %s"
945 "for session %s", sess->kstat.id, sess->id);
946 return;
947 }
948
949 sess->kstat.ctx->ks_private = sess;
950
951 switch (sess->type) {
952 case KRRP_SESS_SENDER:
953 sess->kstat.ctx->ks_data =
954 &sess->kstat.data.sender;
955 sess->kstat.ctx->ks_data_size =
956 sizeof (sess->kstat.data.sender);
957 sess->kstat.ctx->ks_ndata =
958 NUM_OF_FIELDS(sess->kstat.data.sender);
959 (void) memcpy(sess->kstat.ctx->ks_data,
960 &sess_sender_stats_templ,
961 sizeof (krrp_sess_sender_kstat_t));
962 break;
963 case KRRP_SESS_RECEIVER:
964 sess->kstat.ctx->ks_data =
965 &sess->kstat.data.receiver;
966 sess->kstat.ctx->ks_data_size =
967 sizeof (sess->kstat.data.receiver);
968 sess->kstat.ctx->ks_ndata =
969 NUM_OF_FIELDS(sess->kstat.data.receiver);
970 (void) memcpy(sess->kstat.ctx->ks_data,
971 &sess_receiver_stats_templ,
972 sizeof (krrp_sess_receiver_kstat_t));
973 break;
974 case KRRP_SESS_COMPOUND:
975 sess->kstat.ctx->ks_data =
976 &sess->kstat.data.compound;
977 sess->kstat.ctx->ks_data_size =
978 sizeof (sess->kstat.data.compound);
979 sess->kstat.ctx->ks_ndata =
980 NUM_OF_FIELDS(sess->kstat.data.compound);
981 (void) memcpy(sess->kstat.ctx->ks_data,
982 &sess_compound_stats_templ,
983 sizeof (krrp_sess_compound_kstat_t));
984 break;
985 }
986
987 sess->kstat.ctx->ks_update = &krrp_sess_kstat_update;
988 kstat_install(sess->kstat.ctx);
989 }
990
991 static int
992 krrp_sess_kstat_update(kstat_t *ks, int rw)
993 {
994 int rc = EACCES;
995 krrp_sess_t *sess = ks->ks_private;
996
997 if (rw == KSTAT_WRITE)
998 goto out;
999
1000 if (krrp_sess_inc_ref_cnt(sess) != 0) {
1001 rc = EIO;
1002 goto out;
1003 }
1004
1005 switch (sess->type) {
1006 case KRRP_SESS_SENDER:
1007 krrp_sess_sender_kstat_update(sess, ks);
1008 break;
1009 case KRRP_SESS_RECEIVER:
1010 krrp_sess_receiver_kstat_update(sess, ks);
1011 break;
1012 case KRRP_SESS_COMPOUND:
1013 krrp_sess_compound_kstat_update(sess, ks);
1014 break;
1015 }
1016
1017 rc = 0;
1018 krrp_sess_dec_ref_cnt(sess);
1019
1020 out:
1021 return (rc);
1022 }
1023
1024 /*
1025 * To extend stats need to add new field to KRRP_SESS_STAT_NAME_MAP
1026 * in krrp_session.h and write corresponding update-code
1027 * in the following function
1028 */
1029 static void
1030 krrp_sess_sender_kstat_update(krrp_sess_t *sess, kstat_t *ks)
1031 {
1032 krrp_sess_sender_kstat_t *stats = &sess->kstat.data.sender;
1033
1034 /* The RPO-time between snap create and total recv by the remote ZFS */
1035 stats->avg_rpo.value.ui64 =
1036 sess->stream_read->avg_total_rpo.value;
1037
1038 /* The RPO-time between snap create and total recv by the remote KRRP */
1039 stats->avg_network_rpo.value.ui64 = sess->stream_read->avg_rpo.value;
1040
1041 stats->bytes_tx.value.ui64 = sess->conn->bytes_tx;
1042 stats->bytes_rx.value.ui64 = sess->conn->bytes_rx;
1043
1044 /* The TXG that is now read by ZFS */
1045 stats->cur_send_stream_txg.value.ui64 = sess->stream_read->cur_send_txg;
1046
1047 /* The TXG that is now sent over Network */
1048 stats->cur_send_network_txg.value.ui64 = sess->conn->cur_txg;
1049
1050 /* The last TXG that was complettly received by the remote ZFS */
1051 stats->last_stream_ack_txg.value.ui64 =
1052 sess->stream_read->last_full_ack_txg;
1053
1054 /* The last TXG that was complettly received by the remote KRRP */
1055 stats->last_network_ack_txg.value.ui64 =
1056 sess->stream_read->last_ack_txg;
1057
1058 stats->max_pdu_seq_num.value.ui64 =
1059 sess->fl_ctrl.max_pdu_seq_num_orig;
1060 stats->max_pdu_seq_num_adjusted.value.ui64 =
1061 sess->fl_ctrl.max_pdu_seq_num;
1062 stats->cur_pdu_seq_num.value.ui64 = sess->fl_ctrl.cur_pdu_seq_num;
1063 stats->fl_ctrl_window_size.value.ui64 = sess->fl_ctrl.cwnd;
1064
1065 stats->rbytes.value.ui64 = sess->stream_read->bytes_processed;
1066
1067 stats->mem_used.value.ui64 =
1068 krrp_pdu_engine_get_used_mem(sess->data_pdu_engine);
1069
1070 stats->uptime.value.ui64 =
1071 (gethrtime() - ks->ks_crtime) / 1000 / 1000;
1072 }
1073
1074 static void
1075 krrp_sess_receiver_kstat_update(krrp_sess_t *sess, kstat_t *ks)
1076 {
1077 krrp_sess_receiver_kstat_t *stats =
1078 &sess->kstat.data.receiver;
1079
1080 stats->bytes_tx.value.ui64 = sess->conn->bytes_tx;
1081 stats->bytes_rx.value.ui64 = sess->conn->bytes_rx;
1082
1083 /* The TXG that is now wrote by ZFS */
1084 stats->cur_recv_stream_txg.value.ui64 =
1085 sess->stream_write->cur_recv_txg;
1086
1087 /* The TXG that is now received from Network */
1088 stats->cur_recv_network_txg.value.ui64 = sess->conn->cur_txg;
1089
1090 stats->max_pdu_seq_num.value.ui64 = sess->fl_ctrl.max_pdu_seq_num;
1091 stats->cur_pdu_seq_num.value.ui64 = sess->fl_ctrl.cur_pdu_seq_num;
1092
1093 stats->wbytes.value.ui64 = sess->stream_write->bytes_processed;
1094
1095 stats->mem_used.value.ui64 =
1096 krrp_pdu_engine_get_used_mem(sess->data_pdu_engine);
1097
1098 stats->uptime.value.ui64 =
1099 (gethrtime() - ks->ks_crtime) / 1000 / 1000;
1100 }
1101
1102 static void
1103 krrp_sess_compound_kstat_update(krrp_sess_t *sess, kstat_t *ks)
1104 {
1105 krrp_sess_compound_kstat_t *stats =
1106 &sess->kstat.data.compound;
1107
1108 /* The RPO-time between snap create and total recv by the ZFS */
1109 stats->avg_rpo.value.ui64 =
1110 sess->stream_read->avg_total_rpo.value;
1111
1112 /* The TXG that is now read by ZFS */
1113 stats->cur_send_stream_txg.value.ui64 =
1114 sess->stream_read->cur_send_txg;
1115
1116 /* The TXG that is now wrote by ZFS */
1117 stats->cur_recv_stream_txg.value.ui64 =
1118 sess->stream_write->cur_recv_txg;
1119
1120 stats->rbytes.value.ui64 = sess->stream_read->bytes_processed;
1121 stats->wbytes.value.ui64 = sess->stream_write->bytes_processed;
1122
1123 stats->mem_used.value.ui64 =
1124 krrp_pdu_engine_get_used_mem(sess->data_pdu_engine);
1125
1126 stats->uptime.value.ui64 =
1127 (gethrtime() - ks->ks_crtime) / 1000 / 1000;
1128 }
1129
1130 static void
1131 krrp_sess_lr_stream_cb(krrp_stream_cb_ev_t ev, uintptr_t ev_arg,
1132 void *void_sess)
1133 {
1134 krrp_sess_t *sess = void_sess;
1135
1136 if (krrp_sess_inc_ref_cnt(sess) != 0) {
1137 if (ev == KRRP_STREAM_DATA_PDU)
1138 krrp_pdu_rele((krrp_pdu_t *)ev_arg);
1139
1140 return;
1141 }
1142
1143 switch (ev) {
1144 case KRRP_STREAM_DATA_PDU:
1145 krrp_sess_lr_data_pdu_from_stream(sess,
1146 (krrp_pdu_data_t *)ev_arg);
1147 break;
1148 case KRRP_STREAM_TXG_RECV_DONE:
1149 krrp_sess_lr_txg_recv_done(sess, (uint64_t)ev_arg);
1150 break;
1151 case KRRP_STREAM_SEND_DONE:
1152 krrp_sess_lr_send_done(sess);
1153 break;
1154 case KRRP_STREAM_ERROR:
1155 krrp_sess_stream_error(sess, (krrp_error_t *)ev_arg);
1156 break;
1157 default:
1158 break;
1159 }
1160
1161 krrp_sess_dec_ref_cnt(sess);
1162 }
1163
1164 static void
1165 krrp_sess_ll_stream_cb(krrp_stream_cb_ev_t ev, uintptr_t ev_arg,
1166 void *void_sess)
1167 {
1168 krrp_sess_t *sess = void_sess;
1169
1170 if (krrp_sess_inc_ref_cnt(sess) != 0) {
1171 if (ev == KRRP_STREAM_DATA_PDU)
1172 krrp_pdu_rele((krrp_pdu_t *)ev_arg);
1173
1174 return;
1175 }
1176
1177 switch (ev) {
1178 case KRRP_STREAM_DATA_PDU:
1179 krrp_sess_ll_data_pdu_from_stream(sess,
1180 (krrp_pdu_data_t *)ev_arg);
1181 break;
1182 case KRRP_STREAM_TXG_RECV_DONE:
1183 krrp_sess_ll_txg_recv_done(sess, (uint64_t)ev_arg);
1184 break;
1185 case KRRP_STREAM_SEND_DONE:
1186 krrp_sess_ll_send_done(sess);
1187 break;
1188 case KRRP_STREAM_ERROR:
1189 krrp_sess_stream_error(sess, (krrp_error_t *)ev_arg);
1190 break;
1191 default:
1192 break;
1193 }
1194
1195 krrp_sess_dec_ref_cnt(sess);
1196 }
1197
1198 static void
1199 krrp_sess_stream_error(krrp_sess_t *sess, krrp_error_t *error)
1200 {
1201 cmn_err(CE_WARN, "An stream error has occured: %s (%d)",
1202 krrp_error_errno_to_str(error->krrp_errno),
1203 error->unix_errno);
1204
1205 switch (sess->type) {
1206 case KRRP_SESS_SENDER:
1207 krrp_stream_stop(sess->stream_read);
1208 break;
1209 case KRRP_SESS_RECEIVER:
1210 krrp_stream_stop(sess->stream_write);
1211 break;
1212 case KRRP_SESS_COMPOUND:
1213 krrp_stream_stop(sess->stream_read);
1214 krrp_stream_stop(sess->stream_write);
1215 break;
1216 }
1217
1218 krrp_sess_error(sess, error);
1219 }
1220
1221 static void
1222 krrp_sess_txg_recv_done(krrp_sess_t *sess, uint64_t txg, boolean_t complete)
1223 {
1224 krrp_pdu_ctrl_t *pdu = NULL;
1225
1226 krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
1227 if (pdu == NULL) {
1228 cmn_err(CE_WARN, "Failed to allocate Ctrl PDU "
1229 "to send KRRP_OPCODE_TXG_ACK/KRRP_OPCODE_TXG_ACK2");
1230 krrp_sess_nomem_error(sess);
1231 return;
1232 }
1233
1234 if (complete)
1235 pdu->hdr->opcode = KRRP_OPCODE_TXG_ACK2;
1236 else
1237 pdu->hdr->opcode = KRRP_OPCODE_TXG_ACK;
1238
1239 *((uint64_t *)(pdu->hdr->data)) = htonll(txg);
1240
1241 krrp_queue_put(sess->ctrl_tx_queue, pdu);
1242 }
1243
1244 static void
1245 krrp_sess_post_send_done_uevent(krrp_sess_t *sess)
1246 {
1247 nvlist_t *attrs = fnvlist_alloc();
1248
1249 mutex_enter(&sess->mtx);
1250 sess->running = B_FALSE;
1251 mutex_exit(&sess->mtx);
1252
1253 (void) krrp_param_put(KRRP_PARAM_SESS_ID, attrs, sess->id);
1254 krrp_svc_post_uevent(ESC_KRRP_SESS_SEND_DONE, attrs);
1255 fnvlist_free(attrs);
1256 }
1257
1258 static void
1259 krrp_sess_post_error_uevent(krrp_sess_t *sess, krrp_error_t *error)
1260 {
1261 nvlist_t *attrs = fnvlist_alloc();
1262
1263 krrp_error_to_nvl(error, &attrs);
1264
1265 (void) krrp_param_put(KRRP_PARAM_SESS_ID, attrs, sess->id);
1266 krrp_svc_post_uevent(ESC_KRRP_SESS_ERROR, attrs);
1267
1268 fnvlist_free(attrs);
1269 }
1270
1271 static void
1272 krrp_sess_pdu_engine_cb(void *void_sess, size_t released_pdus)
1273 {
1274 krrp_pdu_ctrl_t *pdu = NULL;
1275 krrp_sess_t *sess = void_sess;
1276 uint64_t max_pdu_seq_num;
1277
1278 ASSERT(sess->type == KRRP_SESS_RECEIVER);
1279
1280 if (krrp_sess_inc_ref_cnt(sess) != 0)
1281 return;
1282
1283 krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
1284 if (pdu == NULL) {
1285 cmn_err(CE_WARN, "Failed to allocate Ctrl PDU "
1286 "to send KRRP_OPCODE_FL_CTRL_UPDATE");
1287 krrp_sess_nomem_error(sess);
1288 goto out;
1289 }
1290
1291 pdu->hdr->opcode = KRRP_OPCODE_FL_CTRL_UPDATE;
1292
1293 max_pdu_seq_num = krrp_sess_fl_ctrl_update_rx(&sess->fl_ctrl,
1294 released_pdus);
1295 *((uint64_t *)(pdu->hdr->data)) = htonll(max_pdu_seq_num);
1296
1297 krrp_queue_put(sess->ctrl_tx_queue, pdu);
1298
1299 out:
1300 krrp_sess_dec_ref_cnt(sess);
1301 }
1302
1303 static void
1304 krrp_sess_conn_cb(void *void_conn, krrp_conn_cb_ev_t ev, uintptr_t ev_arg,
1305 void *void_sess)
1306 {
1307 krrp_sess_t *sess = void_sess;
1308 krrp_conn_t *conn = void_conn;
1309
1310 if (krrp_sess_inc_ref_cnt(sess) != 0) {
1311 if (ev == KRRP_CONN_DATA_PDU || ev == KRRP_CONN_CTRL_PDU)
1312 krrp_pdu_rele((krrp_pdu_t *)ev_arg);
1313
1314 return;
1315 }
1316
1317 switch (ev) {
1318 case KRRP_CONN_DATA_PDU:
1319 /* Data-PDU flow: Sender >>> Receiver */
1320 if (sess->type == KRRP_SESS_SENDER) {
1321 krrp_error_t error;
1322
1323 krrp_error_set(&error, KRRP_ERRNO_PROTO, EBADMSG);
1324 krrp_sess_error(sess, &error);
1325 break;
1326 }
1327
1328 krrp_sess_data_pdu_from_network(sess,
1329 (krrp_pdu_data_t *)ev_arg);
1330 break;
1331 case KRRP_CONN_CTRL_PDU:
1332 krrp_sess_ctrl_pdu_from_network(sess,
1333 (krrp_pdu_ctrl_t *)ev_arg);
1334 break;
1335 case KRRP_CONN_ERROR:
1336 krrp_sess_conn_error(sess, conn,
1337 (krrp_error_t *)ev_arg);
1338 break;
1339 default:
1340 cmn_err(CE_PANIC, "Unknown conn cb-event");
1341 }
1342
1343 krrp_sess_dec_ref_cnt(sess);
1344 }
1345
1346 /* ARGSUSED */
1347 static void
1348 krrp_sess_conn_error(krrp_sess_t *sess, krrp_conn_t *conn,
1349 krrp_error_t *error)
1350 {
1351 boolean_t error_case;
1352 krrp_sess_stop_ping(sess);
1353
1354 mutex_enter(&sess->mtx);
1355 error_case = !sess->shutdown;
1356 mutex_exit(&sess->mtx);
1357
1358 if (error_case) {
1359 cmn_err(CE_WARN, "An connection error has occured: %s (%d)",
1360 krrp_error_errno_to_str(error->krrp_errno),
1361 error->unix_errno);
1362
1363 krrp_sess_error(sess, error);
1364 }
1365 }
1366
1367 static void
1368 krrp_sess_ctrl_pdu_from_network(krrp_sess_t *sess, krrp_pdu_ctrl_t *pdu)
1369 {
1370 krrp_opcode_t opcode;
1371 krrp_hdr_ctrl_t *hdr;
1372
1373 hdr = krrp_pdu_hdr(pdu);
1374 opcode = krrp_pdu_opcode(pdu);
1375
1376 switch (opcode) {
1377 case KRRP_OPCODE_TXG_ACK:
1378 case KRRP_OPCODE_TXG_ACK2:
1379 {
1380 uint64_t txg;
1381
1382 txg = ntohll(*((uint64_t *)(hdr->data)));
1383 if (opcode == KRRP_OPCODE_TXG_ACK2)
1384 krrp_stream_txg_confirmed(sess->stream_read,
1385 txg, B_TRUE);
1386 else
1387 krrp_stream_txg_confirmed(sess->stream_read,
1388 txg, B_FALSE);
1389 }
1390 break;
1391 case KRRP_OPCODE_FL_CTRL_UPDATE:
1392 krrp_sess_fl_ctrl_update_tx(&sess->fl_ctrl,
1393 ntohll(*((uint64_t *)(hdr->data))));
1394 break;
1395 case KRRP_OPCODE_SEND_DONE:
1396 krrp_sess_post_send_done_uevent(sess);
1397 break;
1398 case KRRP_OPCODE_PING:
1399 krrp_sess_ping_request(sess);
1400 break;
1401 case KRRP_OPCODE_PONG:
1402 sess->ping_wait_for_response = B_FALSE;
1403 break;
1404 case KRRP_OPCODE_ERROR:
1405 break;
1406 case KRRP_OPCODE_SHUTDOWN:
1407 mutex_enter(&sess->mtx);
1408
1409 if (sess->shutdown) {
1410 cv_signal(&sess->cv);
1411 } else {
1412 sess->shutdown = B_TRUE;
1413 krrp_sess_send_shutdown(sess);
1414 }
1415
1416 mutex_exit(&sess->mtx);
1417 break;
1418 default:
1419 break;
1420 }
1421
1422 krrp_pdu_rele((krrp_pdu_t *)pdu);
1423 }
1424
1425 static void
1426 krrp_sess_ping_request(krrp_sess_t *sess)
1427 {
1428 krrp_pdu_ctrl_t *pdu = NULL;
1429
1430 krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
1431 if (pdu == NULL) {
1432 cmn_err(CE_WARN, "No memory to send PING response");
1433 return;
1434 }
1435
1436 pdu->hdr->opcode = KRRP_OPCODE_PONG;
1437
1438 krrp_queue_put(sess->ctrl_tx_queue, pdu);
1439 }
1440
1441 static uint64_t
1442 krrp_sess_fl_ctrl_update_rx(krrp_fl_ctrl_t *fl_ctrl,
1443 size_t window_offset)
1444 {
1445 uint64_t max_pdu_seq_num;
1446
1447 mutex_enter(&fl_ctrl->mtx);
1448 fl_ctrl->max_pdu_seq_num += window_offset;
1449 max_pdu_seq_num = fl_ctrl->max_pdu_seq_num;
1450 mutex_exit(&fl_ctrl->mtx);
1451
1452 return (max_pdu_seq_num);
1453 }
1454
1455 static void
1456 krrp_sess_fl_ctrl_update_tx(krrp_fl_ctrl_t *fl_ctrl, uint64_t max_pdu_seq_num)
1457 {
1458 uint64_t cur_window, cur_pdu_seq_num;
1459
1460 mutex_enter(&fl_ctrl->mtx);
1461
1462 cur_pdu_seq_num = fl_ctrl->cur_pdu_seq_num;
1463
1464 cur_window = max_pdu_seq_num - cur_pdu_seq_num;
1465 krrp_sess_fl_ctrl_calc_cwnd_window(fl_ctrl, cur_window);
1466
1467 DTRACE_PROBE1(krrp_fl_ctrl_cwnd, uint64_t, fl_ctrl->cwnd);
1468
1469 fl_ctrl->max_pdu_seq_num_orig = max_pdu_seq_num;
1470 fl_ctrl->max_pdu_seq_num = cur_pdu_seq_num + fl_ctrl->cwnd;
1471
1472 cv_signal(&fl_ctrl->cv);
1473 mutex_exit(&fl_ctrl->mtx);
1474 }
1475
1476 static void
1477 krrp_sess_fl_ctrl_calc_cwnd_window(krrp_fl_ctrl_t *fl_ctrl,
1478 uint64_t new_recv_window)
1479 {
1480 switch (krrp_sess_cwnd_state) {
1481 default:
1482 case 0:
1483 fl_ctrl->cwnd = new_recv_window;
1484 break;
1485 case 1:
1486 if (fl_ctrl->cwnd == 0)
1487 fl_ctrl->cwnd = new_recv_window / 2;
1488 else if (new_recv_window >
1489 (fl_ctrl->cwnd + (fl_ctrl->cwnd >> 3)))
1490 fl_ctrl->cwnd = (fl_ctrl->cwnd + new_recv_window) >> 1;
1491 else if (new_recv_window < fl_ctrl->cwnd)
1492 fl_ctrl->cwnd = new_recv_window >> 1;
1493 break;
1494 case 2:
1495 if (fl_ctrl->cwnd == 0)
1496 fl_ctrl->cwnd = new_recv_window -
1497 (new_recv_window >> 2);
1498 else if (new_recv_window >
1499 (fl_ctrl->cwnd + (fl_ctrl->cwnd >> 3)))
1500 fl_ctrl->cwnd = (fl_ctrl->cwnd + new_recv_window) >> 1;
1501 else if (new_recv_window < fl_ctrl->cwnd)
1502 fl_ctrl->cwnd = new_recv_window -
1503 (new_recv_window >> 3);
1504 break;
1505 }
1506 }
1507
1508 static int
1509 krrp_sess_fl_ctrl_validate(krrp_sess_t *sess, krrp_hdr_data_t *hdr)
1510 {
1511 int rc = -1;
1512 krrp_fl_ctrl_t *fl_ctrl;
1513
1514 fl_ctrl = &sess->fl_ctrl;
1515
1516 mutex_enter(&fl_ctrl->mtx);
1517 if (!fl_ctrl->disabled) {
1518 if (hdr->pdu_seq_num > fl_ctrl->max_pdu_seq_num) {
1519 cmn_err(CE_WARN, "Detected violation of "
1520 "flow control rules: [%" PRIu64 "] > "
1521 "[%" PRIu64 "]", hdr->pdu_seq_num,
1522 fl_ctrl->max_pdu_seq_num);
1523 goto out;
1524 }
1525 }
1526
1527 rc = 0;
1528 fl_ctrl->cur_pdu_seq_num = hdr->pdu_seq_num;
1529
1530 out:
1531 mutex_exit(&fl_ctrl->mtx);
1532 return (rc);
1533 }
1534
1535
1536 static void
1537 krrp_sess_data_pdu_from_network(krrp_sess_t *sess, krrp_pdu_data_t *pdu)
1538 {
1539 krrp_hdr_data_t *hdr;
1540
1541 hdr = krrp_pdu_hdr(pdu);
1542
1543 pdu->txg = hdr->txg;
1544
1545 if (hdr->flags & KRRP_HDR_FLAG_INIT_PDU) {
1546 pdu->initial = B_TRUE;
1547
1548 DTRACE_PROBE1(krrp_network_recv_txg_start,
1549 uint64_t, pdu->txg);
1550 }
1551
1552 if (hdr->flags & KRRP_HDR_FLAG_FINI_PDU) {
1553 pdu->final = B_TRUE;
1554
1555 DTRACE_PROBE1(krrp_network_recv_txg_stop,
1556 uint64_t, pdu->txg);
1557
1558 krrp_sess_txg_recv_done(sess, pdu->txg, B_FALSE);
1559 }
1560
1561 /* Now just ignore, later need to do something */
1562 (void) krrp_sess_fl_ctrl_validate(sess, krrp_pdu_hdr(pdu));
1563
1564 if (krrp_sess_recv_without_stream) {
1565 if (hdr->flags & KRRP_HDR_FLAG_FINI_PDU)
1566 krrp_sess_txg_recv_done(sess, pdu->txg, B_TRUE);
1567
1568 krrp_pdu_rele((krrp_pdu_t *)pdu);
1569 return;
1570 }
1571
1572 krrp_queue_put(sess->data_write_queue, pdu);
1573 }
1574
1575 /*
1576 * The function does the following actions:
1577 * 1. checks FlowControl
1578 * 2. retrieves next PDU from TX-Queue
1579 * 3. assigns PDU SeqNum to the retrieved PDU
1580 */
1581 static void
1582 krrp_sess_get_data_pdu_to_tx(void *void_sess, krrp_pdu_t **result_pdu)
1583 {
1584 clock_t time_left = 0;
1585 krrp_sess_t *sess;
1586 krrp_fl_ctrl_t *fl_ctrl;
1587 boolean_t win_open;
1588 boolean_t queue_is_empty;
1589
1590 sess = (krrp_sess_t *)void_sess;
1591 fl_ctrl = &sess->fl_ctrl;
1592
1593 mutex_enter(&fl_ctrl->mtx);
1594
1595 repeat:
1596 if ((fl_ctrl->cur_pdu_seq_num < fl_ctrl->max_pdu_seq_num) ||
1597 fl_ctrl->disabled) {
1598 krrp_pdu_t *pdu = NULL;
1599
1600 win_open = B_TRUE;
1601 mutex_exit(&fl_ctrl->mtx);
1602 pdu = krrp_queue_get(sess->data_tx_queue);
1603 mutex_enter(&fl_ctrl->mtx);
1604 if (pdu != NULL) {
1605 krrp_hdr_data_t *hdr;
1606
1607 hdr = (krrp_hdr_data_t *)krrp_pdu_hdr(pdu);
1608 fl_ctrl->cur_pdu_seq_num++;
1609 hdr->pdu_seq_num = fl_ctrl->cur_pdu_seq_num;
1610 *result_pdu = pdu;
1611 queue_is_empty = B_FALSE;
1612 } else
1613 queue_is_empty = B_TRUE;
1614
1615 time_left = 0;
1616 } else {
1617 win_open = B_FALSE;
1618 if (krrp_queue_length(sess->data_tx_queue) == 0)
1619 queue_is_empty = B_TRUE;
1620 else
1621 queue_is_empty = B_FALSE;
1622
1623 time_left = cv_reltimedwait(&fl_ctrl->cv, &fl_ctrl->mtx,
1624 MSEC_TO_TICK(100), TR_CLOCK_TICK);
1625 }
1626
1627 if (time_left > 0)
1628 goto repeat;
1629
1630 mutex_exit(&fl_ctrl->mtx);
1631
1632 DTRACE_PROBE2(krrp_data_path_state, boolean_t, win_open,
1633 boolean_t, queue_is_empty);
1634 }
1635
1636 static void
1637 krrp_sess_nomem_error(krrp_sess_t *sess)
1638 {
1639 krrp_error_t error;
1640
1641 krrp_error_set(&error, KRRP_ERRNO_NOMEM, 0);
1642 krrp_sess_error(sess, &error);
1643 }
1644
1645 static void
1646 krrp_sess_error(krrp_sess_t *sess, krrp_error_t *error)
1647 {
1648 mutex_enter(&sess->mtx);
1649
1650 if (sess->error.krrp_errno == 0) {
1651 sess->error.krrp_errno = error->krrp_errno;
1652 sess->error.unix_errno = error->unix_errno;
1653 sess->running = B_FALSE;
1654
1655 krrp_sess_post_error_uevent(sess, &sess->error);
1656 }
1657
1658 mutex_exit(&sess->mtx);
1659 }
1660
1661 static int
1662 krrp_sess_inc_ref_cnt(krrp_sess_t *sess)
1663 {
1664 int rc = -1;
1665
1666 mutex_enter(&sess->mtx);
1667 if (sess->destroying)
1668 goto out;
1669
1670 rc = 0;
1671 sess->ref_cnt++;
1672
1673 out:
1674 mutex_exit(&sess->mtx);
1675
1676 return (rc);
1677 }
1678
1679 static void
1680 krrp_sess_dec_ref_cnt(krrp_sess_t *sess)
1681 {
1682 mutex_enter(&sess->mtx);
1683 ASSERT(sess->ref_cnt > 0);
1684 sess->ref_cnt--;
1685 cv_signal(&sess->cv);
1686 mutex_exit(&sess->mtx);
1687 }
1688
1689 int
1690 krrp_sess_try_hold(krrp_sess_t *sess)
1691 {
1692 int rc = -1;
1693
1694 mutex_enter(&sess->mtx);
1695 if (sess->on_hold)
1696 goto out;
1697
1698 rc = 0;
1699 sess->on_hold = B_TRUE;
1700
1701 out:
1702 mutex_exit(&sess->mtx);
1703
1704 return (rc);
1705 }
1706
1707 void
1708 krrp_sess_rele(krrp_sess_t *sess)
1709 {
1710 mutex_enter(&sess->mtx);
1711 ASSERT(sess->on_hold);
1712 sess->on_hold = B_FALSE;
1713 mutex_exit(&sess->mtx);
1714 }
1715
1716 static void
1717 krrp_sess_lr_data_pdu_from_stream(krrp_sess_t *sess, krrp_pdu_data_t *pdu)
1718 {
1719 krrp_hdr_data_t *hdr;
1720
1721 hdr = krrp_pdu_hdr(pdu);
1722
1723 hdr->opcode = KRRP_OPCODE_DATA_WRITE;
1724 hdr->txg = pdu->txg;
1725 hdr->payload_sz = pdu->cur_data_sz;
1726
1727 if (pdu->initial)
1728 hdr->flags |= KRRP_HDR_FLAG_INIT_PDU;
1729
1730 if (pdu->final)
1731 hdr->flags |= KRRP_HDR_FLAG_FINI_PDU;
1732
1733 krrp_queue_put(sess->data_tx_queue, pdu);
1734 }
1735
1736 static void
1737 krrp_sess_ll_data_pdu_from_stream(krrp_sess_t *sess, krrp_pdu_data_t *pdu)
1738 {
1739 if (pdu->final)
1740 krrp_stream_txg_confirmed(sess->stream_read, pdu->txg, B_FALSE);
1741
1742 krrp_queue_put(sess->data_write_queue, pdu);
1743 }
1744
1745 static void
1746 krrp_sess_lr_txg_recv_done(krrp_sess_t *sess, uint64_t txg)
1747 {
1748 krrp_sess_txg_recv_done(sess, txg, B_TRUE);
1749 }
1750
1751 static void
1752 krrp_sess_ll_txg_recv_done(krrp_sess_t *sess, uint64_t txg)
1753 {
1754 krrp_stream_txg_confirmed(sess->stream_read, txg, B_TRUE);
1755 }
1756
1757 static void
1758 krrp_sess_lr_send_done(krrp_sess_t *sess)
1759 {
1760 krrp_pdu_ctrl_t *pdu = NULL;
1761
1762 /* Notify the receiver that send has been done */
1763 krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
1764 if (pdu == NULL) {
1765 cmn_err(CE_WARN, "Failed to allocate Ctrl PDU "
1766 "to send KRRP_OPCODE_SEND_DONE");
1767 krrp_sess_nomem_error(sess);
1768 goto out;
1769 }
1770
1771 pdu->hdr->opcode = KRRP_OPCODE_SEND_DONE;
1772
1773 krrp_queue_put(sess->ctrl_tx_queue, pdu);
1774
1775 out:
1776 /* Notify the userspace that send has been done */
1777 krrp_sess_post_send_done_uevent(sess);
1778 }
1779
1780 static void
1781 krrp_sess_ll_send_done(krrp_sess_t *sess)
1782 {
1783 /* Notify the userspace that send has been done */
1784 krrp_sess_post_send_done_uevent(sess);
1785 }
1786
1787 static void
1788 krrp_sess_send_shutdown(krrp_sess_t *sess)
1789 {
1790 krrp_pdu_ctrl_t *pdu = NULL;
1791
1792 krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
1793 if (pdu == NULL) {
1794 cmn_err(CE_WARN, "Failed to allocate Ctrl PDU "
1795 "to send KRRP_OPCODE_SHUTDOWN");
1796 return;
1797 }
1798
1799 pdu->hdr->opcode = KRRP_OPCODE_SHUTDOWN;
1800
1801 krrp_queue_put(sess->ctrl_tx_queue, pdu);
1802 }