1 /*
   2  * This file and its contents are supplied under the terms of the
   3  * Common Development and Distribution License ("CDDL"), version 1.0.
   4  * You may only use this file in accordance with the terms of version
   5  * 1.0 of the CDDL.
   6  *
   7  * A full copy of the text of the CDDL should have accompanied this
   8  * source.  A copy of the CDDL is also available via the Internet at
   9  * http://www.illumos.org/license/CDDL.
  10  */
  11 
  12 /*
  13  * Copyright 2018 Nexenta Systems, Inc.  All rights reserved.
  14  */
  15 
  16 /*
  17  * Session establishment AUTH Schema:
  18  *
  19  * 1. The userspace over the secured Ctrl-Path negotiate
  20  * about an secured value
  21  *
  22  * 2. The manager and agent pass digest of the secured value to
  23  * kernel as part of params for 'sess_create' IOCTL
  24  *
  25  * 3. During connection establishment phase the manager's session sends
  26  * the digest to remote side.
  27  *
  28  * 4. The agent's session compares the received digest to available digest
  29  *
  30  * 5. If the agent's session does not have digest, then the received digest
  31  * will be ignored.
  32  *
  33  * 6. If the agent's session has a digest and this digest is not equal to
  34  * the received digest, or the received params do not a digest then
  35  * the correspoinding session establishment request will be rejected.
  36  */
  37 
  38 #include <sys/types.h>
  39 #include <sys/conf.h>
  40 #include <sys/sysmacros.h>
  41 #include <sys/cmn_err.h>
  42 #include <sys/stat.h>
  43 #include <sys/ddi.h>
  44 #include <sys/sunddi.h>
  45 #include <sys/sdt.h>
  46 
  47 #include <krrp_params.h>
  48 
  49 #include "krrp_protocol.h"
  50 #include "krrp_svc.h"
  51 #include "krrp_session.h"
  52 
  53 /* #define KRRP_SESS_DEBUG 1 */
  54 
  55 static int krrp_sess_common_attach_conn(krrp_sess_t *sess, krrp_conn_t *conn,
  56     krrp_error_t *error);
  57 
  58 static void krrp_sess_start_ping(krrp_sess_t *sess);
  59 static void krrp_sess_stop_ping(krrp_sess_t *sess);
  60 static void krrp_sess_ping_cb(void *void_sess);
  61 static void krrp_sess_ping_request(krrp_sess_t *sess);
  62 
  63 static void krrp_sess_lr_stream_cb(krrp_stream_cb_ev_t ev, uintptr_t ev_arg,
  64     void *void_sess);
  65 static void krrp_sess_ll_stream_cb(krrp_stream_cb_ev_t ev, uintptr_t ev_arg,
  66     void *void_sess);
  67 static void krrp_sess_stream_error(krrp_sess_t *sess, krrp_error_t *error);
  68 static void krrp_sess_txg_recv_done(krrp_sess_t *sess, uint64_t txg,
  69     boolean_t complete);
  70 
  71 static void krrp_sess_post_error_uevent(krrp_sess_t *sess,
  72     krrp_error_t *error);
  73 static void krrp_sess_post_send_done_uevent(krrp_sess_t *sess);
  74 
  75 static void krrp_sess_lr_data_pdu_from_stream(krrp_sess_t *sess,
  76     krrp_pdu_data_t *pdu);
  77 static void krrp_sess_ll_data_pdu_from_stream(krrp_sess_t *sess,
  78     krrp_pdu_data_t *pdu);
  79 static void krrp_sess_lr_txg_recv_done(krrp_sess_t *sess, uint64_t txg);
  80 static void krrp_sess_ll_txg_recv_done(krrp_sess_t *sess, uint64_t txg);
  81 static void krrp_sess_lr_send_done(krrp_sess_t *sess);
  82 static void krrp_sess_ll_send_done(krrp_sess_t *sess);
  83 
  84 static void krrp_sess_pdu_engine_cb(void *void_sess, size_t released_pdus);
  85 static void krrp_sess_conn_cb(void *void_conn, krrp_conn_cb_ev_t ev,
  86     uintptr_t ev_arg, void *void_sess);
  87 
  88 static void krrp_sess_kstat_init(krrp_sess_t *sess);
  89 static int krrp_sess_kstat_update(kstat_t *ks, int rw);
  90 static void krrp_sess_sender_kstat_update(krrp_sess_t *sess, kstat_t *ks);
  91 static void krrp_sess_receiver_kstat_update(krrp_sess_t *sess, kstat_t *ks);
  92 static void krrp_sess_compound_kstat_update(krrp_sess_t *sess, kstat_t *ks);
  93 
  94 static void krrp_sess_conn_error(krrp_sess_t *sess, krrp_conn_t *conn,
  95     krrp_error_t *error);
  96 static void krrp_sess_ctrl_pdu_from_network(krrp_sess_t *sess,
  97     krrp_pdu_ctrl_t *pdu);
  98 static void krrp_sess_data_pdu_from_network(krrp_sess_t *sess,
  99     krrp_pdu_data_t *pdu);
 100 
 101 static int krrp_sess_fl_ctrl_validate(krrp_sess_t *sess,
 102     krrp_hdr_data_t *hdr);
 103 static void krrp_sess_fl_ctrl_calc_cwnd_window(krrp_fl_ctrl_t *fl_ctrl,
 104     uint64_t new_recv_window);
 105 static void krrp_sess_fl_ctrl_update_tx(krrp_fl_ctrl_t *fl_ctrl,
 106     uint64_t max_pdu_seq_num);
 107 static uint64_t krrp_sess_fl_ctrl_update_rx(krrp_fl_ctrl_t *fl_ctrl,
 108     size_t window_offset);
 109 static void krrp_sess_get_data_pdu_to_tx(void *void_sess,
 110     krrp_pdu_t **result_pdu);
 111 
 112 static int krrp_sess_set_kstat_id(krrp_sess_t *sess,
 113     const char *kstat_id_str, krrp_error_t *error);
 114 
 115 static int krrp_sess_set_auth_digest(krrp_sess_t *sess,
 116     const char *auth_digest, krrp_error_t *error);
 117 
 118 static void krrp_sess_nomem_error(krrp_sess_t *sess);
 119 static void krrp_sess_error(krrp_sess_t *sess, krrp_error_t *error);
 120 
 121 static int krrp_sess_inc_ref_cnt(krrp_sess_t *sess);
 122 static void krrp_sess_dec_ref_cnt(krrp_sess_t *sess);
 123 
 124 static void krrp_sess_send_shutdown(krrp_sess_t *sess);
 125 
 126 
 127 #define XX(name, dtype, def_value) {#name, dtype, def_value},
 128 static const krrp_sess_sender_kstat_t sess_sender_stats_templ = {
 129         KRRP_SESS_SENDER_STAT_NAME_MAP(XX)
 130 };
 131 
 132 static const krrp_sess_receiver_kstat_t sess_receiver_stats_templ = {
 133         KRRP_SESS_RECEIVER_STAT_NAME_MAP(XX)
 134 };
 135 
 136 static const krrp_sess_receiver_kstat_t sess_compound_stats_templ = {
 137         KRRP_SESS_COMPOUND_STAT_NAME_MAP(XX)
 138 };
 139 #undef XX
 140 
 141 /*
 142  * 0 - disable re-calculation
 143  * 1 - use algorithm1
 144  * 2 - use algorithm2
 145  */
 146 int krrp_sess_cwnd_state = 2;
 147 
 148 /*
 149  * 0 - pass all PDUs to stream (default)
 150  * any other - do not pass received PDUs to stream
 151  */
 152 int krrp_sess_recv_without_stream = 0;
 153 
 154 int
 155 krrp_sess_create(krrp_sess_t **result_sess, const char *id,
 156     const char *kstat_id, const char *auth_digest,
 157     boolean_t sender, boolean_t fake_mode,
 158     boolean_t compound, krrp_error_t *error)
 159 {
 160         krrp_sess_t *sess;
 161 
 162         VERIFY(result_sess != NULL && *result_sess == NULL);
 163 
 164         if (compound && (sender || auth_digest != NULL)) {
 165                 krrp_error_set(error, KRRP_ERRNO_SESS, EINVAL);
 166                 return (-1);
 167         }
 168 
 169         sess = kmem_zalloc(sizeof (krrp_sess_t), KM_SLEEP);
 170 
 171         if (krrp_sess_set_id(sess, id, error) != 0)
 172                 goto err;
 173 
 174         if (krrp_sess_set_kstat_id(sess, kstat_id, error) != 0)
 175                 goto err;
 176 
 177         if (krrp_sess_set_auth_digest(sess, auth_digest, error) != 0)
 178                 goto err;
 179 
 180         mutex_init(&sess->mtx, NULL, MUTEX_DEFAULT, NULL);
 181         cv_init(&sess->cv, NULL, CV_DEFAULT, NULL);
 182 
 183         mutex_init(&sess->fl_ctrl.mtx, NULL, MUTEX_DEFAULT, NULL);
 184         cv_init(&sess->fl_ctrl.cv, NULL, CV_DEFAULT, NULL);
 185 
 186         krrp_queue_init(&sess->data_write_queue, sizeof (krrp_pdu_t),
 187             offsetof(krrp_pdu_t, node));
 188         krrp_queue_init(&sess->data_tx_queue, sizeof (krrp_pdu_t),
 189             offsetof(krrp_pdu_t, node));
 190         krrp_queue_init(&sess->ctrl_tx_queue, sizeof (krrp_pdu_t),
 191             offsetof(krrp_pdu_t, node));
 192 
 193         if (compound)
 194                 sess->type = KRRP_SESS_COMPOUND;
 195         else if (sender)
 196                 sess->type = KRRP_SESS_SENDER;
 197         else
 198                 sess->type = KRRP_SESS_RECEIVER;
 199 
 200         sess->fake_mode = fake_mode;
 201 
 202         *result_sess = sess;
 203 
 204         return (0);
 205 
 206 err:
 207         kmem_free(sess, sizeof (krrp_sess_t));
 208         return (-1);
 209 }
 210 
 211 void
 212 krrp_sess_destroy(krrp_sess_t *sess)
 213 {
 214         krrp_pdu_t *pdu;
 215 
 216         krrp_sess_stop_ping(sess);
 217 
 218         mutex_enter(&sess->mtx);
 219 
 220         if (!sess->shutdown && sess->conn != NULL) {
 221                 sess->shutdown = B_TRUE;
 222                 krrp_sess_send_shutdown(sess);
 223 
 224                 (void) cv_reltimedwait(&sess->cv, &sess->mtx,
 225                     SEC_TO_TICK(2), TR_CLOCK_TICK);
 226         }
 227 
 228         sess->destroying = B_TRUE;
 229         while (sess->ref_cnt > 0)
 230                 cv_wait(&sess->cv, &sess->mtx);
 231 
 232         mutex_exit(&sess->mtx);
 233 
 234         if (sess->kstat.ctx != NULL)
 235                 kstat_delete(sess->kstat.ctx);
 236 
 237         if (sess->conn != NULL)
 238                 krrp_conn_destroy(sess->conn);
 239 
 240         if (sess->stream_read != NULL)
 241                 krrp_stream_destroy(sess->stream_read);
 242 
 243         if (sess->stream_write != NULL)
 244                 krrp_stream_destroy(sess->stream_write);
 245 
 246         while ((pdu = krrp_queue_get_no_wait(sess->data_write_queue)) != NULL)
 247                 krrp_pdu_rele(pdu);
 248 
 249         while ((pdu = krrp_queue_get_no_wait(sess->data_tx_queue)) != NULL)
 250                 krrp_pdu_rele(pdu);
 251 
 252         while ((pdu = krrp_queue_get_no_wait(sess->ctrl_tx_queue)) != NULL)
 253                 krrp_pdu_rele(pdu);
 254 
 255         krrp_queue_fini(sess->data_write_queue);
 256         krrp_queue_fini(sess->data_tx_queue);
 257         krrp_queue_fini(sess->ctrl_tx_queue);
 258 
 259         if (sess->data_pdu_engine != NULL)
 260                 krrp_pdu_engine_destroy(sess->data_pdu_engine);
 261 
 262         if (sess->private_data != NULL)
 263                 fnvlist_free(sess->private_data);
 264 
 265         cv_destroy(&sess->fl_ctrl.cv);
 266         mutex_destroy(&sess->fl_ctrl.mtx);
 267 
 268         cv_destroy(&sess->cv);
 269         mutex_destroy(&sess->mtx);
 270 
 271         kmem_free(sess, sizeof (krrp_sess_t));
 272 }
 273 
 274 int
 275 krrp_sess_run(krrp_sess_t *sess, boolean_t only_once, krrp_error_t *error)
 276 {
 277         int rc = -1;
 278         boolean_t fatal_error = B_FALSE;
 279         boolean_t stream_is_not_defined = B_FALSE;
 280 
 281         mutex_enter(&sess->mtx);
 282 
 283         if (sess->error.krrp_errno != 0) {
 284                 (void) memcpy(error, &sess->error, sizeof (krrp_error_t));
 285                 goto out;
 286         }
 287 
 288         if (sess->type == KRRP_SESS_RECEIVER && only_once) {
 289                 krrp_error_set(error, KRRP_ERRNO_SESS, EINVAL);
 290                 goto out;
 291         }
 292 
 293         if (sess->started) {
 294                 krrp_error_set(error, KRRP_ERRNO_SESS, EALREADY);
 295                 goto out;
 296         }
 297 
 298         switch (sess->type) {
 299         case KRRP_SESS_COMPOUND:
 300                 stream_is_not_defined = (sess->stream_read == NULL ||
 301                     sess->stream_write == NULL);
 302                 break;
 303         case KRRP_SESS_SENDER:
 304                 stream_is_not_defined = (sess->stream_read == NULL);
 305                 break;
 306         case KRRP_SESS_RECEIVER:
 307                 stream_is_not_defined = (sess->stream_write == NULL);
 308                 break;
 309         }
 310 
 311         if (stream_is_not_defined) {
 312                 krrp_error_set(error, KRRP_ERRNO_STREAM, ENOENT);
 313                 goto out;
 314         }
 315 
 316         if (sess->type != KRRP_SESS_COMPOUND && sess->conn == NULL) {
 317                 krrp_error_set(error, KRRP_ERRNO_CONN, ENOENT);
 318                 goto out;
 319         }
 320 
 321         if (sess->data_pdu_engine == NULL) {
 322                 krrp_error_set(error, KRRP_ERRNO_PDUENGINE, ENOENT);
 323                 goto out;
 324         }
 325 
 326         if (sess->type != KRRP_SESS_RECEIVER && only_once &&
 327             sess->stream_read->non_continuous) {
 328                 krrp_error_set(error, KRRP_ERRNO_STREAM, EINVAL);
 329                 goto out;
 330         }
 331 
 332         /*
 333          * Need to store error to session, to be able to
 334          * return it again, because an error that might
 335          * occur bellow cannot be fixed without
 336          * recreation of session.
 337          */
 338         fatal_error = B_TRUE;
 339 
 340         if (sess->type == KRRP_SESS_COMPOUND) {
 341                 rc = krrp_stream_run(sess->stream_write,
 342                     sess->data_write_queue,
 343                     sess->data_pdu_engine, error);
 344                 if (rc != 0)
 345                         goto out;
 346 
 347                 rc = krrp_stream_run(sess->stream_read,
 348                     sess->data_write_queue,
 349                     sess->data_pdu_engine, error);
 350                 if (rc != 0) {
 351                         krrp_stream_stop(sess->stream_write);
 352                         goto out;
 353                 }
 354         } else {
 355                 krrp_stream_t *stream;
 356 
 357                 stream = sess->type == KRRP_SESS_SENDER ?
 358                     sess->stream_read : sess->stream_write;
 359 
 360                 rc = krrp_stream_run(stream, sess->data_write_queue,
 361                     sess->data_pdu_engine, error);
 362                 if (rc != 0)
 363                         goto out;
 364 
 365                 if (sess->type == KRRP_SESS_SENDER)
 366                         krrp_conn_run(sess->conn, sess->ctrl_tx_queue,
 367                             sess->data_pdu_engine,
 368                             &krrp_sess_get_data_pdu_to_tx, sess);
 369                 else
 370                         krrp_conn_run(sess->conn, sess->ctrl_tx_queue,
 371                             sess->data_pdu_engine, NULL, NULL);
 372 
 373                 krrp_sess_start_ping(sess);
 374         }
 375 
 376         krrp_sess_kstat_init(sess);
 377 
 378         sess->started = B_TRUE;
 379         sess->running = B_TRUE;
 380 
 381 out:
 382         if (rc != 0 && fatal_error) {
 383                 ASSERT(error->krrp_errno != 0);
 384 
 385                 sess->error.krrp_errno = error->krrp_errno;
 386                 sess->error.unix_errno = error->unix_errno;
 387         }
 388         mutex_exit(&sess->mtx);
 389 
 390         if (rc == 0 && sess->type == KRRP_SESS_RECEIVER)
 391                 krrp_pdu_engine_force_notify(sess->data_pdu_engine, B_TRUE);
 392 
 393         /*
 394          * Only once means that after successfully start
 395          * we immediately do gracefull shutdown
 396          *
 397          * There is no reason to implement a complex logic that will handle
 398          * processing of only one snapshot
 399          */
 400         if (rc == 0 && sess->type != KRRP_SESS_RECEIVER && only_once)
 401                 (void) krrp_stream_send_stop(sess->stream_read);
 402 
 403         return (rc);
 404 }
 405 
 406 void
 407 krrp_sess_get_status(krrp_sess_t *sess, nvlist_t *result)
 408 {
 409         boolean_t true_value = B_TRUE;
 410 
 411         mutex_enter(&sess->mtx);
 412 
 413         switch (sess->type) {
 414         case KRRP_SESS_SENDER:
 415                 VERIFY0(krrp_param_put(KRRP_PARAM_SESS_SENDER,
 416                     result, &true_value));
 417                 break;
 418         case KRRP_SESS_RECEIVER:
 419                 break;
 420         case KRRP_SESS_COMPOUND:
 421                 VERIFY0(krrp_param_put(KRRP_PARAM_SESS_COMPOUND,
 422                     result, &true_value));
 423                 break;
 424         }
 425 
 426         VERIFY0(krrp_param_put(KRRP_PARAM_SESS_ID,
 427             result, sess->id));
 428         VERIFY0(krrp_param_put(KRRP_PARAM_SESS_KSTAT_ID,
 429             result, sess->kstat.id));
 430         VERIFY0(krrp_param_put(KRRP_PARAM_SESS_STARTED,
 431             result, &sess->started));
 432         VERIFY0(krrp_param_put(KRRP_PARAM_SESS_RUNNING,
 433             result, &sess->running));
 434 
 435         if (sess->error.krrp_errno != 0)
 436                 krrp_error_to_nvl(&sess->error, &result);
 437 
 438         mutex_exit(&sess->mtx);
 439 }
 440 
 441 /*
 442  * Retrieve information about existing connection
 443  * and place it to result NVL.
 444  */
 445 int
 446 krrp_sess_get_conn_info(krrp_sess_t *sess, nvlist_t *result,
 447     krrp_error_t *error)
 448 {
 449         if (sess->type == KRRP_SESS_COMPOUND) {
 450                 krrp_error_set(error, KRRP_ERRNO_SESS, EINVAL);
 451                 return (-1);
 452         }
 453 
 454         if (sess->conn == NULL) {
 455                 krrp_error_set(error, KRRP_ERRNO_CONN, ENOENT);
 456                 return (-1);
 457         }
 458 
 459         VERIFY0(krrp_param_put(KRRP_PARAM_DBLK_DATA_SIZE,
 460             result, (void *)&sess->conn->blk_sz));
 461 
 462         return (0);
 463 }
 464 
 465 /*
 466  * 'Started' means 'sess_run' IOCTL was successfully processed
 467  */
 468 boolean_t
 469 krrp_sess_is_started(krrp_sess_t *sess)
 470 {
 471         boolean_t result;
 472 
 473         mutex_enter(&sess->mtx);
 474         result = sess->started;
 475         mutex_exit(&sess->mtx);
 476 
 477         return (result);
 478 }
 479 
 480 /*
 481  * 'Running' means 'sess_run' IOCTL was successfully processed
 482  * and now session does replication.
 483  *
 484  * It is possible to have: started == B_TRUE && running == B_FALSE.
 485  * This means an error occured and now session waits for control action
 486  * from userspace
 487  */
 488 boolean_t
 489 krrp_sess_is_running(krrp_sess_t *sess)
 490 {
 491         boolean_t result;
 492 
 493         mutex_enter(&sess->mtx);
 494         result = sess->running;
 495         mutex_exit(&sess->mtx);
 496 
 497         return (result);
 498 }
 499 
 500 int
 501 krrp_sess_throttle_conn(krrp_sess_t *sess, size_t limit,
 502     krrp_error_t *error)
 503 {
 504         int rc = -1;
 505 
 506         if (sess->type != KRRP_SESS_SENDER) {
 507                 krrp_error_set(error, KRRP_ERRNO_SESS, EINVAL);
 508                 goto out;
 509         }
 510 
 511         if (sess->conn == NULL) {
 512                 krrp_error_set(error, KRRP_ERRNO_CONN, ENOENT);
 513                 goto out;
 514         }
 515 
 516         krrp_conn_throttle_set(sess->conn, limit,
 517             !krrp_sess_is_running(sess));
 518         rc = 0;
 519 
 520 out:
 521         return (rc);
 522 }
 523 
 524 static void
 525 krrp_sess_start_ping(krrp_sess_t *sess)
 526 {
 527         sess->ping_timer = timeout(krrp_sess_ping_cb, sess,
 528             drv_usectohz(5 * 1000000));
 529 }
 530 
 531 static void
 532 krrp_sess_stop_ping(krrp_sess_t *sess)
 533 {
 534         timeout_id_t saved_timer;
 535 
 536         mutex_enter(&sess->mtx);
 537         saved_timer = sess->ping_timer;
 538         sess->ping_timer = NULL;
 539         mutex_exit(&sess->mtx);
 540 
 541         if (saved_timer != NULL)
 542                 (void) untimeout(saved_timer);
 543 }
 544 
 545 static void
 546 krrp_sess_ping_cb(void *void_sess)
 547 {
 548         krrp_pdu_ctrl_t *pdu = NULL;
 549         krrp_sess_t *sess = void_sess;
 550 
 551         if (sess->ping_wait_for_response) {
 552                 krrp_error_t error;
 553 
 554                 krrp_error_init(&error);
 555                 krrp_error_set(&error, KRRP_ERRNO_PINGTIMEOUT, 0);
 556                 krrp_sess_post_error_uevent(sess, &error);
 557                 goto out;
 558         }
 559 
 560         krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
 561         if (pdu == NULL) {
 562                 cmn_err(CE_WARN, "No memory to send PING request");
 563                 return;
 564         }
 565 
 566         pdu->hdr->opcode = KRRP_OPCODE_PING;
 567         krrp_queue_put(sess->ctrl_tx_queue, pdu);
 568         sess->ping_wait_for_response = B_TRUE;
 569 
 570 out:
 571         mutex_enter(&sess->mtx);
 572 
 573         if (sess->ping_timer != NULL && !sess->destroying)
 574                 krrp_sess_start_ping(sess);
 575 
 576         mutex_exit(&sess->mtx);
 577 }
 578 
 579 int
 580 krrp_sess_send_stop(krrp_sess_t *sess, krrp_error_t *error)
 581 {
 582         int rc = -1;
 583 
 584         if (!krrp_sess_is_running(sess)) {
 585                 krrp_error_set(error, KRRP_ERRNO_SESS, ENOTACTIVE);
 586         } else {
 587                 if (sess->type == KRRP_SESS_RECEIVER)
 588                         krrp_error_set(error, KRRP_ERRNO_SESS, EINVAL);
 589                 else if (sess->stream_read->non_continuous)
 590                         krrp_error_set(error, KRRP_ERRNO_STREAM, EINVAL);
 591                 else if (krrp_stream_send_stop(sess->stream_read) != 0)
 592                         krrp_error_set(error, KRRP_ERRNO_STREAM, EALREADY);
 593                 else
 594                         rc = 0;
 595         }
 596 
 597         return (rc);
 598 }
 599 
 600 int
 601 krrp_sess_attach_pdu_engine(krrp_sess_t *sess,
 602     krrp_pdu_engine_t *pdu_engine, krrp_error_t *error)
 603 {
 604         int rc = -1;
 605 
 606         mutex_enter(&sess->mtx);
 607 
 608         if (sess->data_pdu_engine != NULL) {
 609                 krrp_error_set(error, KRRP_ERRNO_SESS, EALREADY);
 610                 goto out;
 611         }
 612 
 613         VERIFY(pdu_engine->type == KRRP_PET_DATA);
 614         VERIFY(sess->data_pdu_engine == NULL);
 615 
 616         sess->data_pdu_engine = pdu_engine;
 617 
 618         /*
 619          * The receiver uses notifications from PDU engine to
 620          * update RX Window and send the info to the sender
 621          */
 622         if (sess->type == KRRP_SESS_RECEIVER)
 623                 krrp_pdu_engine_register_callback(pdu_engine,
 624                     &krrp_sess_pdu_engine_cb, sess);
 625 
 626         rc = 0;
 627 out:
 628         mutex_exit(&sess->mtx);
 629         return (rc);
 630 }
 631 
 632 int
 633 krrp_sess_attach_read_stream(krrp_sess_t *sess,
 634     krrp_stream_t *stream, krrp_error_t *error)
 635 {
 636         int rc = -1;
 637 
 638         VERIFY(stream != NULL);
 639         VERIFY(sess->type != KRRP_SESS_RECEIVER);
 640 
 641         mutex_enter(&sess->mtx);
 642 
 643         if (sess->stream_read != NULL) {
 644                 krrp_error_set(error, KRRP_ERRNO_SESS, EALREADY);
 645                 goto out;
 646         }
 647 
 648         sess->stream_read = stream;
 649 
 650         if (sess->type == KRRP_SESS_COMPOUND)
 651                 krrp_stream_register_callback(stream,
 652                     &krrp_sess_ll_stream_cb, sess);
 653         else
 654                 krrp_stream_register_callback(stream,
 655                     &krrp_sess_lr_stream_cb, sess);
 656 
 657         rc = 0;
 658 
 659 out:
 660         mutex_exit(&sess->mtx);
 661 
 662         return (rc);
 663 }
 664 
 665 int
 666 krrp_sess_attach_write_stream(krrp_sess_t *sess,
 667     krrp_stream_t *stream, krrp_error_t *error)
 668 {
 669         int rc = -1;
 670 
 671         VERIFY(stream != NULL);
 672         VERIFY(sess->type != KRRP_SESS_SENDER);
 673 
 674         mutex_enter(&sess->mtx);
 675 
 676         if (sess->stream_write != NULL) {
 677                 krrp_error_set(error, KRRP_ERRNO_SESS, EALREADY);
 678                 goto out;
 679         }
 680 
 681         sess->stream_write = stream;
 682 
 683         if (sess->type == KRRP_SESS_COMPOUND)
 684                 krrp_stream_register_callback(stream,
 685                     &krrp_sess_ll_stream_cb, sess);
 686         else
 687                 krrp_stream_register_callback(stream,
 688                     &krrp_sess_lr_stream_cb, sess);
 689 
 690         rc = 0;
 691 
 692 out:
 693         mutex_exit(&sess->mtx);
 694 
 695         return (rc);
 696 }
 697 
 698 int
 699 krrp_sess_target_attach_conn(krrp_sess_t *sess, krrp_conn_t *conn,
 700     nvlist_t *params, krrp_error_t *error)
 701 {
 702         int rc;
 703         boolean_t fake_mode;
 704 
 705         if (krrp_sess_common_attach_conn(sess, conn, error) != 0)
 706                 return (-1);
 707 
 708         fake_mode = krrp_param_exists(KRRP_PARAM_FAKE_MODE, params);
 709 
 710         if (fake_mode && sess->type == KRRP_SESS_RECEIVER && !sess->fake_mode) {
 711                 cmn_err(CE_WARN, "It is impossible to use real receiver "
 712                     "together with fake sender");
 713                 krrp_error_set(error, KRRP_ERRNO_PROTO, EFAULT);
 714                 goto err;
 715         }
 716 
 717         if (sess->auth_digest[0] != '\0') {
 718                 const char *auth_data = NULL;
 719 
 720                 rc = krrp_param_get(KRRP_PARAM_AUTH_DATA,
 721                     params, (void *)&auth_data);
 722                 if (rc != 0) {
 723                         krrp_error_set(error, KRRP_ERRNO_AUTH, ENOENT);
 724                         goto err;
 725                 }
 726 
 727                 if (strcmp(sess->auth_digest, auth_data) != 0) {
 728                         krrp_error_set(error, KRRP_ERRNO_AUTH, EINVAL);
 729                         goto err;
 730                 }
 731         }
 732 
 733         rc = krrp_conn_send_ctrl_data(conn,
 734             KRRP_OPCODE_ATTACH_SESS, NULL, error);
 735         if (rc != 0)
 736                 goto err;
 737 
 738         krrp_conn_register_callback(sess->conn,
 739             &krrp_sess_conn_cb, sess);
 740 
 741         return (0);
 742 
 743 err:
 744         mutex_enter(&sess->mtx);
 745         sess->conn = NULL;
 746         mutex_exit(&sess->mtx);
 747         return (-1);
 748 }
 749 
 750 int
 751 krrp_sess_initiator_attach_conn(krrp_sess_t *sess, krrp_conn_t *conn,
 752     krrp_error_t *error)
 753 {
 754         int rc;
 755         nvlist_t *params;
 756         krrp_pdu_ctrl_t *pdu = NULL;
 757 
 758         if (krrp_sess_common_attach_conn(sess, conn, error) != 0)
 759                 return (-1);
 760 
 761         params = fnvlist_alloc();
 762 
 763         if (sess->auth_digest[0] != '\0') {
 764                 (void) krrp_param_put(KRRP_PARAM_AUTH_DATA,
 765                     params, sess->auth_digest);
 766         }
 767 
 768         if (sess->type == KRRP_SESS_SENDER && sess->fake_mode)
 769                 (void) krrp_param_put(KRRP_PARAM_FAKE_MODE, params, NULL);
 770 
 771         (void) krrp_param_put(KRRP_PARAM_SESS_ID,
 772             params, sess->id);
 773 
 774         rc = krrp_conn_send_ctrl_data(conn,
 775             KRRP_OPCODE_ATTACH_SESS, params, error);
 776         fnvlist_free(params);
 777         if (rc != 0)
 778                 goto err;
 779 
 780         rc = krrp_conn_rx_ctrl_pdu(conn, &pdu, error);
 781         if (rc != 0)
 782                 goto err;
 783 
 784         if (krrp_pdu_opcode(pdu) != KRRP_OPCODE_ATTACH_SESS) {
 785                 if (krrp_pdu_opcode(pdu) == KRRP_OPCODE_ERROR) {
 786                         nvlist_t *error_nvl = NULL;
 787 
 788                         cmn_err(CE_WARN, "Remote side returned an error");
 789 
 790                         rc = krrp_pdu_get_nvl_from_payload(pdu, &error_nvl);
 791                         if (rc == 0) {
 792                                 rc = krrp_error_from_nvl(error, error_nvl);
 793                                 if (rc == 0) {
 794                                         if (error->krrp_errno == 0) {
 795                                                 /*
 796                                                  * Something wrong, so we will return
 797                                                  * KRRP_ERRNO_BADRESP
 798                                                  */
 799                                                 rc = -1;
 800                                         } else {
 801                                                 krrp_error_set_flag(error,
 802                                                     KRRP_ERRF_REMOTE);
 803                                         }
 804                                 }
 805 
 806                                 fnvlist_free(error_nvl);
 807                         }
 808                 }
 809 
 810                 if (rc != 0)
 811                         krrp_error_set(error, KRRP_ERRNO_BADRESP, 0);
 812                 else
 813                         rc = -1;
 814         }
 815 
 816         krrp_pdu_rele((krrp_pdu_t *)pdu);
 817 
 818         if (rc != 0)
 819                 goto err;
 820 
 821         krrp_conn_register_callback(sess->conn,
 822             &krrp_sess_conn_cb, sess);
 823 
 824         return (0);
 825 
 826 err:
 827         mutex_enter(&sess->mtx);
 828         sess->conn = NULL;
 829         mutex_exit(&sess->mtx);
 830         return (-1);
 831 }
 832 
 833 /*
 834  * Here we check that:
 835  * - the session does not have attached connection
 836  * - the session is not compound (local 2 local replication)
 837  */
 838 static int
 839 krrp_sess_common_attach_conn(krrp_sess_t *sess, krrp_conn_t *conn,
 840     krrp_error_t *error)
 841 {
 842         int rc = -1;
 843 
 844         mutex_enter(&sess->mtx);
 845 
 846         if (sess->conn != NULL)
 847                 krrp_error_set(error, KRRP_ERRNO_SESS, EALREADY);
 848         else if (sess->type == KRRP_SESS_COMPOUND)
 849                 krrp_error_set(error, KRRP_ERRNO_SESS, EINVAL);
 850         else {
 851                 sess->conn = conn;
 852                 rc = 0;
 853         }
 854 
 855         mutex_exit(&sess->mtx);
 856 
 857         return (rc);
 858 }
 859 
 860 /* This function is used by logic that adds session to the sessions AVL */
 861 int
 862 krrp_sess_compare_id(const void *opaque_sess1, const void *opaque_sess2)
 863 {
 864         size_t i;
 865         const krrp_sess_t *sess1, *sess2;
 866 
 867         sess1 = opaque_sess1;
 868         sess2 = opaque_sess2;
 869 
 870         for (i = 0; i < UUID_PRINTABLE_STRING_LENGTH - 1; i++) {
 871                 if (sess1->id[i] > sess2->id[i])
 872                         return (1);
 873 
 874                 if (sess1->id[i] < sess2->id[i])
 875                         return (-1);
 876         }
 877 
 878         return (0);
 879 }
 880 
 881 int
 882 krrp_sess_set_id(krrp_sess_t *sess, const char *sess_id,
 883     krrp_error_t *error)
 884 {
 885         int rc = -1;
 886 
 887         VERIFY(sess_id != NULL);
 888 
 889         if (strlen(sess_id) != (UUID_PRINTABLE_STRING_LENGTH - 1)) {
 890                 krrp_error_set(error, KRRP_ERRNO_SESSID, EINVAL);
 891                 goto out;
 892         }
 893 
 894         (void) strlcpy(sess->id, sess_id, UUID_PRINTABLE_STRING_LENGTH);
 895         rc = 0;
 896 
 897 out:
 898         return (rc);
 899 }
 900 
 901 static int
 902 krrp_sess_set_kstat_id(krrp_sess_t *sess, const char *kstat_id,
 903     krrp_error_t *error)
 904 {
 905         int rc = -1;
 906 
 907         VERIFY(kstat_id != NULL);
 908 
 909         if (strlen(kstat_id) != (KRRP_KSTAT_ID_STRING_LENGTH - 1)) {
 910                 krrp_error_set(error, KRRP_ERRNO_KSTATID, EINVAL);
 911                 goto out;
 912         }
 913 
 914         (void) strlcpy(sess->kstat.id, kstat_id, KRRP_KSTAT_ID_STRING_LENGTH);
 915         rc = 0;
 916 
 917 out:
 918         return (rc);
 919 }
 920 
 921 static int
 922 krrp_sess_set_auth_digest(krrp_sess_t *sess, const char *auth_digest,
 923     krrp_error_t *error)
 924 {
 925         if (auth_digest != NULL) {
 926                 if (strlcpy(sess->auth_digest, auth_digest,
 927                     KRRP_AUTH_DIGEST_MAX_LEN) >= KRRP_AUTH_DIGEST_MAX_LEN) {
 928                         krrp_error_set(error, KRRP_ERRNO_AUTH, EINVAL);
 929                         return (-1);
 930                 }
 931         }
 932 
 933         return (0);
 934 }
 935 
 936 static void
 937 krrp_sess_kstat_init(krrp_sess_t *sess)
 938 {
 939         sess->kstat.ctx = kstat_create("krrp_session", 0,
 940             sess->kstat.id, "misc", KSTAT_TYPE_NAMED,
 941             0, KSTAT_FLAG_VIRTUAL);
 942 
 943         if (sess->kstat.ctx == NULL) {
 944                 cmn_err(CE_WARN, "Failed to create kstat with id %s"
 945                     "for session %s", sess->kstat.id, sess->id);
 946                 return;
 947         }
 948 
 949         sess->kstat.ctx->ks_private = sess;
 950 
 951         switch (sess->type) {
 952         case KRRP_SESS_SENDER:
 953                 sess->kstat.ctx->ks_data =
 954                     &sess->kstat.data.sender;
 955                 sess->kstat.ctx->ks_data_size =
 956                     sizeof (sess->kstat.data.sender);
 957                 sess->kstat.ctx->ks_ndata =
 958                     NUM_OF_FIELDS(sess->kstat.data.sender);
 959                 (void) memcpy(sess->kstat.ctx->ks_data,
 960                     &sess_sender_stats_templ,
 961                     sizeof (krrp_sess_sender_kstat_t));
 962                 break;
 963         case KRRP_SESS_RECEIVER:
 964                 sess->kstat.ctx->ks_data =
 965                     &sess->kstat.data.receiver;
 966                 sess->kstat.ctx->ks_data_size =
 967                     sizeof (sess->kstat.data.receiver);
 968                 sess->kstat.ctx->ks_ndata =
 969                     NUM_OF_FIELDS(sess->kstat.data.receiver);
 970                 (void) memcpy(sess->kstat.ctx->ks_data,
 971                     &sess_receiver_stats_templ,
 972                     sizeof (krrp_sess_receiver_kstat_t));
 973                 break;
 974         case KRRP_SESS_COMPOUND:
 975                 sess->kstat.ctx->ks_data =
 976                     &sess->kstat.data.compound;
 977                 sess->kstat.ctx->ks_data_size =
 978                     sizeof (sess->kstat.data.compound);
 979                 sess->kstat.ctx->ks_ndata =
 980                     NUM_OF_FIELDS(sess->kstat.data.compound);
 981                 (void) memcpy(sess->kstat.ctx->ks_data,
 982                     &sess_compound_stats_templ,
 983                     sizeof (krrp_sess_compound_kstat_t));
 984                 break;
 985         }
 986 
 987         sess->kstat.ctx->ks_update = &krrp_sess_kstat_update;
 988         kstat_install(sess->kstat.ctx);
 989 }
 990 
 991 static int
 992 krrp_sess_kstat_update(kstat_t *ks, int rw)
 993 {
 994         int rc = EACCES;
 995         krrp_sess_t *sess = ks->ks_private;
 996 
 997         if (rw == KSTAT_WRITE)
 998                 goto out;
 999 
1000         if (krrp_sess_inc_ref_cnt(sess) != 0) {
1001                 rc = EIO;
1002                 goto out;
1003         }
1004 
1005         switch (sess->type) {
1006         case KRRP_SESS_SENDER:
1007                 krrp_sess_sender_kstat_update(sess, ks);
1008                 break;
1009         case KRRP_SESS_RECEIVER:
1010                 krrp_sess_receiver_kstat_update(sess, ks);
1011                 break;
1012         case KRRP_SESS_COMPOUND:
1013                 krrp_sess_compound_kstat_update(sess, ks);
1014                 break;
1015         }
1016 
1017         rc = 0;
1018         krrp_sess_dec_ref_cnt(sess);
1019 
1020 out:
1021         return (rc);
1022 }
1023 
1024 /*
1025  * To extend stats need to add new field to KRRP_SESS_STAT_NAME_MAP
1026  * in krrp_session.h and write corresponding update-code
1027  * in the following function
1028  */
1029 static void
1030 krrp_sess_sender_kstat_update(krrp_sess_t *sess, kstat_t *ks)
1031 {
1032         krrp_sess_sender_kstat_t *stats = &sess->kstat.data.sender;
1033 
1034         /* The RPO-time between snap create and total recv by the remote ZFS */
1035         stats->avg_rpo.value.ui64 =
1036             sess->stream_read->avg_total_rpo.value;
1037 
1038         /* The RPO-time between snap create and total recv by the remote KRRP */
1039         stats->avg_network_rpo.value.ui64 = sess->stream_read->avg_rpo.value;
1040 
1041         stats->bytes_tx.value.ui64 = sess->conn->bytes_tx;
1042         stats->bytes_rx.value.ui64 = sess->conn->bytes_rx;
1043 
1044         /* The TXG that is now read by ZFS */
1045         stats->cur_send_stream_txg.value.ui64 = sess->stream_read->cur_send_txg;
1046 
1047         /* The TXG that is now sent over Network */
1048         stats->cur_send_network_txg.value.ui64 = sess->conn->cur_txg;
1049 
1050         /* The last TXG that was complettly received by the remote ZFS */
1051         stats->last_stream_ack_txg.value.ui64 =
1052             sess->stream_read->last_full_ack_txg;
1053 
1054         /* The last TXG that was complettly received by the remote KRRP */
1055         stats->last_network_ack_txg.value.ui64 =
1056             sess->stream_read->last_ack_txg;
1057 
1058         stats->max_pdu_seq_num.value.ui64 =
1059             sess->fl_ctrl.max_pdu_seq_num_orig;
1060         stats->max_pdu_seq_num_adjusted.value.ui64 =
1061             sess->fl_ctrl.max_pdu_seq_num;
1062         stats->cur_pdu_seq_num.value.ui64 = sess->fl_ctrl.cur_pdu_seq_num;
1063         stats->fl_ctrl_window_size.value.ui64 = sess->fl_ctrl.cwnd;
1064 
1065         stats->rbytes.value.ui64 = sess->stream_read->bytes_processed;
1066 
1067         stats->mem_used.value.ui64 =
1068             krrp_pdu_engine_get_used_mem(sess->data_pdu_engine);
1069 
1070         stats->uptime.value.ui64 =
1071             (gethrtime() - ks->ks_crtime) / 1000 / 1000;
1072 }
1073 
1074 static void
1075 krrp_sess_receiver_kstat_update(krrp_sess_t *sess, kstat_t *ks)
1076 {
1077         krrp_sess_receiver_kstat_t *stats =
1078             &sess->kstat.data.receiver;
1079 
1080         stats->bytes_tx.value.ui64 = sess->conn->bytes_tx;
1081         stats->bytes_rx.value.ui64 = sess->conn->bytes_rx;
1082 
1083         /* The TXG that is now wrote by ZFS */
1084         stats->cur_recv_stream_txg.value.ui64 =
1085             sess->stream_write->cur_recv_txg;
1086 
1087         /* The TXG that is now received from Network */
1088         stats->cur_recv_network_txg.value.ui64 = sess->conn->cur_txg;
1089 
1090         stats->max_pdu_seq_num.value.ui64 = sess->fl_ctrl.max_pdu_seq_num;
1091         stats->cur_pdu_seq_num.value.ui64 = sess->fl_ctrl.cur_pdu_seq_num;
1092 
1093         stats->wbytes.value.ui64 = sess->stream_write->bytes_processed;
1094 
1095         stats->mem_used.value.ui64 =
1096             krrp_pdu_engine_get_used_mem(sess->data_pdu_engine);
1097 
1098         stats->uptime.value.ui64 =
1099             (gethrtime() - ks->ks_crtime) / 1000 / 1000;
1100 }
1101 
1102 static void
1103 krrp_sess_compound_kstat_update(krrp_sess_t *sess, kstat_t *ks)
1104 {
1105         krrp_sess_compound_kstat_t *stats =
1106             &sess->kstat.data.compound;
1107 
1108         /* The RPO-time between snap create and total recv by the ZFS */
1109         stats->avg_rpo.value.ui64 =
1110             sess->stream_read->avg_total_rpo.value;
1111 
1112         /* The TXG that is now read by ZFS */
1113         stats->cur_send_stream_txg.value.ui64 =
1114             sess->stream_read->cur_send_txg;
1115 
1116         /* The TXG that is now wrote by ZFS */
1117         stats->cur_recv_stream_txg.value.ui64 =
1118             sess->stream_write->cur_recv_txg;
1119 
1120         stats->rbytes.value.ui64 = sess->stream_read->bytes_processed;
1121         stats->wbytes.value.ui64 = sess->stream_write->bytes_processed;
1122 
1123         stats->mem_used.value.ui64 =
1124             krrp_pdu_engine_get_used_mem(sess->data_pdu_engine);
1125 
1126         stats->uptime.value.ui64 =
1127             (gethrtime() - ks->ks_crtime) / 1000 / 1000;
1128 }
1129 
1130 static void
1131 krrp_sess_lr_stream_cb(krrp_stream_cb_ev_t ev, uintptr_t ev_arg,
1132     void *void_sess)
1133 {
1134         krrp_sess_t *sess = void_sess;
1135 
1136         if (krrp_sess_inc_ref_cnt(sess) != 0) {
1137                 if (ev == KRRP_STREAM_DATA_PDU)
1138                         krrp_pdu_rele((krrp_pdu_t *)ev_arg);
1139 
1140                 return;
1141         }
1142 
1143         switch (ev) {
1144         case KRRP_STREAM_DATA_PDU:
1145                 krrp_sess_lr_data_pdu_from_stream(sess,
1146                     (krrp_pdu_data_t *)ev_arg);
1147                 break;
1148         case KRRP_STREAM_TXG_RECV_DONE:
1149                 krrp_sess_lr_txg_recv_done(sess, (uint64_t)ev_arg);
1150                 break;
1151         case KRRP_STREAM_SEND_DONE:
1152                 krrp_sess_lr_send_done(sess);
1153                 break;
1154         case KRRP_STREAM_ERROR:
1155                 krrp_sess_stream_error(sess, (krrp_error_t *)ev_arg);
1156                 break;
1157         default:
1158                 break;
1159         }
1160 
1161         krrp_sess_dec_ref_cnt(sess);
1162 }
1163 
1164 static void
1165 krrp_sess_ll_stream_cb(krrp_stream_cb_ev_t ev, uintptr_t ev_arg,
1166     void *void_sess)
1167 {
1168         krrp_sess_t *sess = void_sess;
1169 
1170         if (krrp_sess_inc_ref_cnt(sess) != 0) {
1171                 if (ev == KRRP_STREAM_DATA_PDU)
1172                         krrp_pdu_rele((krrp_pdu_t *)ev_arg);
1173 
1174                 return;
1175         }
1176 
1177         switch (ev) {
1178         case KRRP_STREAM_DATA_PDU:
1179                 krrp_sess_ll_data_pdu_from_stream(sess,
1180                     (krrp_pdu_data_t *)ev_arg);
1181                 break;
1182         case KRRP_STREAM_TXG_RECV_DONE:
1183                 krrp_sess_ll_txg_recv_done(sess, (uint64_t)ev_arg);
1184                 break;
1185         case KRRP_STREAM_SEND_DONE:
1186                 krrp_sess_ll_send_done(sess);
1187                 break;
1188         case KRRP_STREAM_ERROR:
1189                 krrp_sess_stream_error(sess, (krrp_error_t *)ev_arg);
1190                 break;
1191         default:
1192                 break;
1193         }
1194 
1195         krrp_sess_dec_ref_cnt(sess);
1196 }
1197 
1198 static void
1199 krrp_sess_stream_error(krrp_sess_t *sess, krrp_error_t *error)
1200 {
1201         cmn_err(CE_WARN, "An stream error has occured: %s (%d)",
1202             krrp_error_errno_to_str(error->krrp_errno),
1203             error->unix_errno);
1204 
1205         switch (sess->type) {
1206         case KRRP_SESS_SENDER:
1207                 krrp_stream_stop(sess->stream_read);
1208                 break;
1209         case KRRP_SESS_RECEIVER:
1210                 krrp_stream_stop(sess->stream_write);
1211                 break;
1212         case KRRP_SESS_COMPOUND:
1213                 krrp_stream_stop(sess->stream_read);
1214                 krrp_stream_stop(sess->stream_write);
1215                 break;
1216         }
1217 
1218         krrp_sess_error(sess, error);
1219 }
1220 
1221 static void
1222 krrp_sess_txg_recv_done(krrp_sess_t *sess, uint64_t txg, boolean_t complete)
1223 {
1224         krrp_pdu_ctrl_t *pdu = NULL;
1225 
1226         krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
1227         if (pdu == NULL) {
1228                 cmn_err(CE_WARN, "Failed to allocate Ctrl PDU "
1229                     "to send KRRP_OPCODE_TXG_ACK/KRRP_OPCODE_TXG_ACK2");
1230                 krrp_sess_nomem_error(sess);
1231                 return;
1232         }
1233 
1234         if (complete)
1235                 pdu->hdr->opcode = KRRP_OPCODE_TXG_ACK2;
1236         else
1237                 pdu->hdr->opcode = KRRP_OPCODE_TXG_ACK;
1238 
1239         *((uint64_t *)(pdu->hdr->data)) = htonll(txg);
1240 
1241         krrp_queue_put(sess->ctrl_tx_queue, pdu);
1242 }
1243 
1244 static void
1245 krrp_sess_post_send_done_uevent(krrp_sess_t *sess)
1246 {
1247         nvlist_t *attrs = fnvlist_alloc();
1248 
1249         mutex_enter(&sess->mtx);
1250         sess->running = B_FALSE;
1251         mutex_exit(&sess->mtx);
1252 
1253         (void) krrp_param_put(KRRP_PARAM_SESS_ID, attrs, sess->id);
1254         krrp_svc_post_uevent(ESC_KRRP_SESS_SEND_DONE, attrs);
1255         fnvlist_free(attrs);
1256 }
1257 
1258 static void
1259 krrp_sess_post_error_uevent(krrp_sess_t *sess, krrp_error_t *error)
1260 {
1261         nvlist_t *attrs = fnvlist_alloc();
1262 
1263         krrp_error_to_nvl(error, &attrs);
1264 
1265         (void) krrp_param_put(KRRP_PARAM_SESS_ID, attrs, sess->id);
1266         krrp_svc_post_uevent(ESC_KRRP_SESS_ERROR, attrs);
1267 
1268         fnvlist_free(attrs);
1269 }
1270 
1271 static void
1272 krrp_sess_pdu_engine_cb(void *void_sess, size_t released_pdus)
1273 {
1274         krrp_pdu_ctrl_t *pdu = NULL;
1275         krrp_sess_t *sess = void_sess;
1276         uint64_t max_pdu_seq_num;
1277 
1278         ASSERT(sess->type == KRRP_SESS_RECEIVER);
1279 
1280         if (krrp_sess_inc_ref_cnt(sess) != 0)
1281                 return;
1282 
1283         krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
1284         if (pdu == NULL) {
1285                 cmn_err(CE_WARN, "Failed to allocate Ctrl PDU "
1286                     "to send KRRP_OPCODE_FL_CTRL_UPDATE");
1287                 krrp_sess_nomem_error(sess);
1288                 goto out;
1289         }
1290 
1291         pdu->hdr->opcode = KRRP_OPCODE_FL_CTRL_UPDATE;
1292 
1293         max_pdu_seq_num = krrp_sess_fl_ctrl_update_rx(&sess->fl_ctrl,
1294             released_pdus);
1295         *((uint64_t *)(pdu->hdr->data)) = htonll(max_pdu_seq_num);
1296 
1297         krrp_queue_put(sess->ctrl_tx_queue, pdu);
1298 
1299 out:
1300         krrp_sess_dec_ref_cnt(sess);
1301 }
1302 
1303 static void
1304 krrp_sess_conn_cb(void *void_conn, krrp_conn_cb_ev_t ev, uintptr_t ev_arg,
1305     void *void_sess)
1306 {
1307         krrp_sess_t *sess = void_sess;
1308         krrp_conn_t *conn = void_conn;
1309 
1310         if (krrp_sess_inc_ref_cnt(sess) != 0) {
1311                 if (ev == KRRP_CONN_DATA_PDU || ev == KRRP_CONN_CTRL_PDU)
1312                         krrp_pdu_rele((krrp_pdu_t *)ev_arg);
1313 
1314                 return;
1315         }
1316 
1317         switch (ev) {
1318         case KRRP_CONN_DATA_PDU:
1319                 /* Data-PDU flow: Sender >>> Receiver */
1320                 if (sess->type == KRRP_SESS_SENDER) {
1321                         krrp_error_t error;
1322 
1323                         krrp_error_set(&error, KRRP_ERRNO_PROTO, EBADMSG);
1324                         krrp_sess_error(sess, &error);
1325                         break;
1326                 }
1327 
1328                 krrp_sess_data_pdu_from_network(sess,
1329                     (krrp_pdu_data_t *)ev_arg);
1330                 break;
1331         case KRRP_CONN_CTRL_PDU:
1332                 krrp_sess_ctrl_pdu_from_network(sess,
1333                     (krrp_pdu_ctrl_t *)ev_arg);
1334                 break;
1335         case KRRP_CONN_ERROR:
1336                 krrp_sess_conn_error(sess, conn,
1337                     (krrp_error_t *)ev_arg);
1338                 break;
1339         default:
1340                 cmn_err(CE_PANIC, "Unknown conn cb-event");
1341         }
1342 
1343         krrp_sess_dec_ref_cnt(sess);
1344 }
1345 
1346 /* ARGSUSED */
1347 static void
1348 krrp_sess_conn_error(krrp_sess_t *sess, krrp_conn_t *conn,
1349     krrp_error_t *error)
1350 {
1351         boolean_t error_case;
1352         krrp_sess_stop_ping(sess);
1353 
1354         mutex_enter(&sess->mtx);
1355         error_case = !sess->shutdown;
1356         mutex_exit(&sess->mtx);
1357 
1358         if (error_case) {
1359                 cmn_err(CE_WARN, "An connection error has occured: %s (%d)",
1360                     krrp_error_errno_to_str(error->krrp_errno),
1361                     error->unix_errno);
1362 
1363                 krrp_sess_error(sess, error);
1364         }
1365 }
1366 
1367 static void
1368 krrp_sess_ctrl_pdu_from_network(krrp_sess_t *sess, krrp_pdu_ctrl_t *pdu)
1369 {
1370         krrp_opcode_t opcode;
1371         krrp_hdr_ctrl_t *hdr;
1372 
1373         hdr = krrp_pdu_hdr(pdu);
1374         opcode = krrp_pdu_opcode(pdu);
1375 
1376         switch (opcode) {
1377         case KRRP_OPCODE_TXG_ACK:
1378         case KRRP_OPCODE_TXG_ACK2:
1379                 {
1380                         uint64_t txg;
1381 
1382                         txg = ntohll(*((uint64_t *)(hdr->data)));
1383                         if (opcode == KRRP_OPCODE_TXG_ACK2)
1384                                 krrp_stream_txg_confirmed(sess->stream_read,
1385                                     txg, B_TRUE);
1386                         else
1387                                 krrp_stream_txg_confirmed(sess->stream_read,
1388                                     txg, B_FALSE);
1389                 }
1390                 break;
1391         case KRRP_OPCODE_FL_CTRL_UPDATE:
1392                 krrp_sess_fl_ctrl_update_tx(&sess->fl_ctrl,
1393                     ntohll(*((uint64_t *)(hdr->data))));
1394                 break;
1395         case KRRP_OPCODE_SEND_DONE:
1396                 krrp_sess_post_send_done_uevent(sess);
1397                 break;
1398         case KRRP_OPCODE_PING:
1399                 krrp_sess_ping_request(sess);
1400                 break;
1401         case KRRP_OPCODE_PONG:
1402                 sess->ping_wait_for_response = B_FALSE;
1403                 break;
1404         case KRRP_OPCODE_ERROR:
1405                 break;
1406         case KRRP_OPCODE_SHUTDOWN:
1407                 mutex_enter(&sess->mtx);
1408 
1409                 if (sess->shutdown) {
1410                         cv_signal(&sess->cv);
1411                 } else {
1412                         sess->shutdown = B_TRUE;
1413                         krrp_sess_send_shutdown(sess);
1414                 }
1415 
1416                 mutex_exit(&sess->mtx);
1417                 break;
1418         default:
1419                 break;
1420         }
1421 
1422         krrp_pdu_rele((krrp_pdu_t *)pdu);
1423 }
1424 
1425 static void
1426 krrp_sess_ping_request(krrp_sess_t *sess)
1427 {
1428         krrp_pdu_ctrl_t *pdu = NULL;
1429 
1430         krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
1431         if (pdu == NULL) {
1432                 cmn_err(CE_WARN, "No memory to send PING response");
1433                 return;
1434         }
1435 
1436         pdu->hdr->opcode = KRRP_OPCODE_PONG;
1437 
1438         krrp_queue_put(sess->ctrl_tx_queue, pdu);
1439 }
1440 
1441 static uint64_t
1442 krrp_sess_fl_ctrl_update_rx(krrp_fl_ctrl_t *fl_ctrl,
1443     size_t window_offset)
1444 {
1445         uint64_t max_pdu_seq_num;
1446 
1447         mutex_enter(&fl_ctrl->mtx);
1448         fl_ctrl->max_pdu_seq_num += window_offset;
1449         max_pdu_seq_num = fl_ctrl->max_pdu_seq_num;
1450         mutex_exit(&fl_ctrl->mtx);
1451 
1452         return (max_pdu_seq_num);
1453 }
1454 
1455 static void
1456 krrp_sess_fl_ctrl_update_tx(krrp_fl_ctrl_t *fl_ctrl, uint64_t max_pdu_seq_num)
1457 {
1458         uint64_t cur_window, cur_pdu_seq_num;
1459 
1460         mutex_enter(&fl_ctrl->mtx);
1461 
1462         cur_pdu_seq_num = fl_ctrl->cur_pdu_seq_num;
1463 
1464         cur_window = max_pdu_seq_num - cur_pdu_seq_num;
1465         krrp_sess_fl_ctrl_calc_cwnd_window(fl_ctrl, cur_window);
1466 
1467         DTRACE_PROBE1(krrp_fl_ctrl_cwnd, uint64_t, fl_ctrl->cwnd);
1468 
1469         fl_ctrl->max_pdu_seq_num_orig = max_pdu_seq_num;
1470         fl_ctrl->max_pdu_seq_num = cur_pdu_seq_num + fl_ctrl->cwnd;
1471 
1472         cv_signal(&fl_ctrl->cv);
1473         mutex_exit(&fl_ctrl->mtx);
1474 }
1475 
1476 static void
1477 krrp_sess_fl_ctrl_calc_cwnd_window(krrp_fl_ctrl_t *fl_ctrl,
1478     uint64_t new_recv_window)
1479 {
1480         switch (krrp_sess_cwnd_state) {
1481         default:
1482         case 0:
1483                 fl_ctrl->cwnd = new_recv_window;
1484                 break;
1485         case 1:
1486                 if (fl_ctrl->cwnd == 0)
1487                         fl_ctrl->cwnd = new_recv_window / 2;
1488                 else if (new_recv_window >
1489                     (fl_ctrl->cwnd + (fl_ctrl->cwnd >> 3)))
1490                         fl_ctrl->cwnd = (fl_ctrl->cwnd + new_recv_window) >> 1;
1491                 else if (new_recv_window < fl_ctrl->cwnd)
1492                         fl_ctrl->cwnd = new_recv_window >> 1;
1493                 break;
1494         case 2:
1495                 if (fl_ctrl->cwnd == 0)
1496                         fl_ctrl->cwnd = new_recv_window -
1497                             (new_recv_window >> 2);
1498                 else if (new_recv_window >
1499                     (fl_ctrl->cwnd + (fl_ctrl->cwnd >> 3)))
1500                         fl_ctrl->cwnd = (fl_ctrl->cwnd + new_recv_window) >> 1;
1501                 else if (new_recv_window < fl_ctrl->cwnd)
1502                         fl_ctrl->cwnd = new_recv_window -
1503                             (new_recv_window >> 3);
1504                 break;
1505         }
1506 }
1507 
1508 static int
1509 krrp_sess_fl_ctrl_validate(krrp_sess_t *sess, krrp_hdr_data_t *hdr)
1510 {
1511         int rc = -1;
1512         krrp_fl_ctrl_t *fl_ctrl;
1513 
1514         fl_ctrl = &sess->fl_ctrl;
1515 
1516         mutex_enter(&fl_ctrl->mtx);
1517         if (!fl_ctrl->disabled) {
1518                 if (hdr->pdu_seq_num > fl_ctrl->max_pdu_seq_num) {
1519                         cmn_err(CE_WARN, "Detected violation of "
1520                             "flow control rules: [%" PRIu64 "] > "
1521                             "[%" PRIu64 "]", hdr->pdu_seq_num,
1522                             fl_ctrl->max_pdu_seq_num);
1523                         goto out;
1524                 }
1525         }
1526 
1527         rc = 0;
1528         fl_ctrl->cur_pdu_seq_num = hdr->pdu_seq_num;
1529 
1530 out:
1531         mutex_exit(&fl_ctrl->mtx);
1532         return (rc);
1533 }
1534 
1535 
1536 static void
1537 krrp_sess_data_pdu_from_network(krrp_sess_t *sess, krrp_pdu_data_t *pdu)
1538 {
1539         krrp_hdr_data_t *hdr;
1540 
1541         hdr = krrp_pdu_hdr(pdu);
1542 
1543         pdu->txg = hdr->txg;
1544 
1545         if (hdr->flags & KRRP_HDR_FLAG_INIT_PDU) {
1546                 pdu->initial = B_TRUE;
1547 
1548                 DTRACE_PROBE1(krrp_network_recv_txg_start,
1549                     uint64_t, pdu->txg);
1550         }
1551 
1552         if (hdr->flags & KRRP_HDR_FLAG_FINI_PDU) {
1553                 pdu->final = B_TRUE;
1554 
1555                 DTRACE_PROBE1(krrp_network_recv_txg_stop,
1556                     uint64_t, pdu->txg);
1557 
1558                 krrp_sess_txg_recv_done(sess, pdu->txg, B_FALSE);
1559         }
1560 
1561         /* Now just ignore, later need to do something */
1562         (void) krrp_sess_fl_ctrl_validate(sess, krrp_pdu_hdr(pdu));
1563 
1564         if (krrp_sess_recv_without_stream) {
1565                 if (hdr->flags & KRRP_HDR_FLAG_FINI_PDU)
1566                         krrp_sess_txg_recv_done(sess, pdu->txg, B_TRUE);
1567 
1568                 krrp_pdu_rele((krrp_pdu_t *)pdu);
1569                 return;
1570         }
1571 
1572         krrp_queue_put(sess->data_write_queue, pdu);
1573 }
1574 
1575 /*
1576  * The function does the following actions:
1577  * 1. checks FlowControl
1578  * 2. retrieves next PDU from TX-Queue
1579  * 3. assigns PDU SeqNum to the retrieved PDU
1580  */
1581 static void
1582 krrp_sess_get_data_pdu_to_tx(void *void_sess, krrp_pdu_t **result_pdu)
1583 {
1584         clock_t time_left = 0;
1585         krrp_sess_t *sess;
1586         krrp_fl_ctrl_t *fl_ctrl;
1587         boolean_t win_open;
1588         boolean_t queue_is_empty;
1589 
1590         sess = (krrp_sess_t *)void_sess;
1591         fl_ctrl = &sess->fl_ctrl;
1592 
1593         mutex_enter(&fl_ctrl->mtx);
1594 
1595 repeat:
1596         if ((fl_ctrl->cur_pdu_seq_num < fl_ctrl->max_pdu_seq_num) ||
1597             fl_ctrl->disabled) {
1598                 krrp_pdu_t *pdu = NULL;
1599 
1600                 win_open = B_TRUE;
1601                 mutex_exit(&fl_ctrl->mtx);
1602                 pdu = krrp_queue_get(sess->data_tx_queue);
1603                 mutex_enter(&fl_ctrl->mtx);
1604                 if (pdu != NULL) {
1605                         krrp_hdr_data_t *hdr;
1606 
1607                         hdr = (krrp_hdr_data_t *)krrp_pdu_hdr(pdu);
1608                         fl_ctrl->cur_pdu_seq_num++;
1609                         hdr->pdu_seq_num = fl_ctrl->cur_pdu_seq_num;
1610                         *result_pdu = pdu;
1611                         queue_is_empty = B_FALSE;
1612                 } else
1613                         queue_is_empty = B_TRUE;
1614 
1615                 time_left = 0;
1616         } else {
1617                 win_open = B_FALSE;
1618                 if (krrp_queue_length(sess->data_tx_queue) == 0)
1619                         queue_is_empty = B_TRUE;
1620                 else
1621                         queue_is_empty = B_FALSE;
1622 
1623                 time_left = cv_reltimedwait(&fl_ctrl->cv, &fl_ctrl->mtx,
1624                     MSEC_TO_TICK(100), TR_CLOCK_TICK);
1625         }
1626 
1627         if (time_left > 0)
1628                 goto repeat;
1629 
1630         mutex_exit(&fl_ctrl->mtx);
1631 
1632         DTRACE_PROBE2(krrp_data_path_state, boolean_t, win_open,
1633             boolean_t, queue_is_empty);
1634 }
1635 
1636 static void
1637 krrp_sess_nomem_error(krrp_sess_t *sess)
1638 {
1639         krrp_error_t error;
1640 
1641         krrp_error_set(&error, KRRP_ERRNO_NOMEM, 0);
1642         krrp_sess_error(sess, &error);
1643 }
1644 
1645 static void
1646 krrp_sess_error(krrp_sess_t *sess, krrp_error_t *error)
1647 {
1648         mutex_enter(&sess->mtx);
1649 
1650         if (sess->error.krrp_errno == 0) {
1651                 sess->error.krrp_errno = error->krrp_errno;
1652                 sess->error.unix_errno = error->unix_errno;
1653                 sess->running = B_FALSE;
1654 
1655                 krrp_sess_post_error_uevent(sess, &sess->error);
1656         }
1657 
1658         mutex_exit(&sess->mtx);
1659 }
1660 
1661 static int
1662 krrp_sess_inc_ref_cnt(krrp_sess_t *sess)
1663 {
1664         int rc = -1;
1665 
1666         mutex_enter(&sess->mtx);
1667         if (sess->destroying)
1668                 goto out;
1669 
1670         rc = 0;
1671         sess->ref_cnt++;
1672 
1673 out:
1674         mutex_exit(&sess->mtx);
1675 
1676         return (rc);
1677 }
1678 
1679 static void
1680 krrp_sess_dec_ref_cnt(krrp_sess_t *sess)
1681 {
1682         mutex_enter(&sess->mtx);
1683         ASSERT(sess->ref_cnt > 0);
1684         sess->ref_cnt--;
1685         cv_signal(&sess->cv);
1686         mutex_exit(&sess->mtx);
1687 }
1688 
1689 int
1690 krrp_sess_try_hold(krrp_sess_t *sess)
1691 {
1692         int rc = -1;
1693 
1694         mutex_enter(&sess->mtx);
1695         if (sess->on_hold)
1696                 goto out;
1697 
1698         rc = 0;
1699         sess->on_hold = B_TRUE;
1700 
1701 out:
1702         mutex_exit(&sess->mtx);
1703 
1704         return (rc);
1705 }
1706 
1707 void
1708 krrp_sess_rele(krrp_sess_t *sess)
1709 {
1710         mutex_enter(&sess->mtx);
1711         ASSERT(sess->on_hold);
1712         sess->on_hold = B_FALSE;
1713         mutex_exit(&sess->mtx);
1714 }
1715 
1716 static void
1717 krrp_sess_lr_data_pdu_from_stream(krrp_sess_t *sess, krrp_pdu_data_t *pdu)
1718 {
1719         krrp_hdr_data_t *hdr;
1720 
1721         hdr = krrp_pdu_hdr(pdu);
1722 
1723         hdr->opcode = KRRP_OPCODE_DATA_WRITE;
1724         hdr->txg = pdu->txg;
1725         hdr->payload_sz = pdu->cur_data_sz;
1726 
1727         if (pdu->initial)
1728                 hdr->flags |= KRRP_HDR_FLAG_INIT_PDU;
1729 
1730         if (pdu->final)
1731                 hdr->flags |= KRRP_HDR_FLAG_FINI_PDU;
1732 
1733         krrp_queue_put(sess->data_tx_queue, pdu);
1734 }
1735 
1736 static void
1737 krrp_sess_ll_data_pdu_from_stream(krrp_sess_t *sess, krrp_pdu_data_t *pdu)
1738 {
1739         if (pdu->final)
1740                 krrp_stream_txg_confirmed(sess->stream_read, pdu->txg, B_FALSE);
1741 
1742         krrp_queue_put(sess->data_write_queue, pdu);
1743 }
1744 
1745 static void
1746 krrp_sess_lr_txg_recv_done(krrp_sess_t *sess, uint64_t txg)
1747 {
1748         krrp_sess_txg_recv_done(sess, txg, B_TRUE);
1749 }
1750 
1751 static void
1752 krrp_sess_ll_txg_recv_done(krrp_sess_t *sess, uint64_t txg)
1753 {
1754         krrp_stream_txg_confirmed(sess->stream_read, txg, B_TRUE);
1755 }
1756 
1757 static void
1758 krrp_sess_lr_send_done(krrp_sess_t *sess)
1759 {
1760         krrp_pdu_ctrl_t *pdu = NULL;
1761 
1762         /* Notify the receiver that send has been done */
1763         krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
1764         if (pdu == NULL) {
1765                 cmn_err(CE_WARN, "Failed to allocate Ctrl PDU "
1766                     "to send KRRP_OPCODE_SEND_DONE");
1767                 krrp_sess_nomem_error(sess);
1768                 goto out;
1769         }
1770 
1771         pdu->hdr->opcode = KRRP_OPCODE_SEND_DONE;
1772 
1773         krrp_queue_put(sess->ctrl_tx_queue, pdu);
1774 
1775 out:
1776         /* Notify the userspace that send has been done */
1777         krrp_sess_post_send_done_uevent(sess);
1778 }
1779 
1780 static void
1781 krrp_sess_ll_send_done(krrp_sess_t *sess)
1782 {
1783         /* Notify the userspace that send has been done */
1784         krrp_sess_post_send_done_uevent(sess);
1785 }
1786 
1787 static void
1788 krrp_sess_send_shutdown(krrp_sess_t *sess)
1789 {
1790         krrp_pdu_ctrl_t *pdu = NULL;
1791 
1792         krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
1793         if (pdu == NULL) {
1794                 cmn_err(CE_WARN, "Failed to allocate Ctrl PDU "
1795                     "to send KRRP_OPCODE_SHUTDOWN");
1796                 return;
1797         }
1798 
1799         pdu->hdr->opcode = KRRP_OPCODE_SHUTDOWN;
1800 
1801         krrp_queue_put(sess->ctrl_tx_queue, pdu);
1802 }