1 /*
   2  * This file and its contents are supplied under the terms of the
   3  * Common Development and Distribution License ("CDDL"), version 1.0.
   4  * You may only use this file in accordance with the terms of version
   5  * 1.0 of the CDDL.
   6  *
   7  * A full copy of the text of the CDDL should have accompanied this
   8  * source.  A copy of the CDDL is also available via the Internet at
   9  * http://www.illumos.org/license/CDDL.
  10  */
  11 
  12 /*
  13  * Copyright 2019 Nexenta Systems, Inc.  All rights reserved.
  14  */
  15 
  16 #include <sys/socket.h>
  17 #include <netinet/tcp.h>
  18 #include <inet/tcp.h>
  19 #include <sys/strsubr.h>
  20 #include <sys/socketvar.h>
  21 #include <sys/filio.h>
  22 
  23 #include "krrp_connection.h"
  24 
  25 /* #define KRRP_CONN_DEBUG 1 */
  26 
  27 /* Interval in us */
  28 #define KRRP_THROTTLE_INTERVAL_US (10 * MILLISEC)
  29 
  30 #define krrp_conn_callback(conn, ev, ev_arg) \
  31         (conn)->callback(conn, ev, \
  32         (uintptr_t)ev_arg, (conn)->callback_arg)
  33 
  34 /*
  35  * KRRP TCP-level connection timeout default (60000 ms).
  36  * Note that TCP's own default is 5min or 300,000ms.
  37  */
  38 #define KRRP_TCP_ABORT_THRESHOLD_DEFAULT 60000
  39 
  40 /*
  41  * The value is specified in milliseconds and does not
  42  * have any effect on an already created KRRP connection(s)
  43  * should be in range: 100 ... UINT32_MAX
  44  *
  45  * Note: use with caution
  46  */
  47 uint32_t krrp_tcp_abort_threshold = 0;
  48 
  49 typedef struct {
  50         kmutex_t        mtx;
  51         kcondvar_t      cv;
  52         int                     rc;
  53         boolean_t       cb_done;
  54 } krrp_conn_connect_timeout_t;
  55 
  56 static void krrp_conn_throttle_init(krrp_throttle_t *throttle);
  57 static void krrp_conn_throttle_fini(krrp_throttle_t *throttle);
  58 static void krrp_conn_throttle_enable(krrp_throttle_t *throttle);
  59 static void krrp_conn_throttle_disable(krrp_throttle_t *throttle);
  60 static void krrp_conn_throttle_cb(void *void_throttle);
  61 static void krrp_conn_throttle(krrp_throttle_t *throttle, size_t send_sz);
  62 
  63 static int krrp_conn_post_create(krrp_conn_t *conn, krrp_error_t *error);
  64 static int krrp_conn_connect(krrp_conn_t *conn, const char *host,
  65     int port, int timeout, krrp_error_t *error);
  66 static int krrp_conn_connect_with_timeout(ksocket_t ks,
  67     struct sockaddr *servaddr, int timeout, krrp_error_t *error);
  68 void krrp_conn_connect_cb(ksocket_t ks,
  69     ksocket_callback_event_t ev, void *arg, uintptr_t info);
  70 static int krrp_conn_post_configure(krrp_conn_t *conn, krrp_error_t *error);
  71 
  72 static void krrp_conn_tx_handler(void *void_conn);
  73 static void krrp_conn_rx_handler(void *void_conn);
  74 
  75 static void krrp_conn_process_received_pdu(krrp_conn_t *conn,
  76     krrp_pdu_t *pdu);
  77 
  78 static int krrp_conn_tx_pdu(krrp_conn_t *conn, krrp_pdu_t *pdu,
  79     krrp_error_t *error);
  80 static int krrp_conn_tx(ksocket_t ks, void *buff, size_t buff_sz,
  81     krrp_error_t *error);
  82 static int krrp_conn_tx_mblk(ksocket_t ks, mblk_t *mp, krrp_error_t *error);
  83 static int krrp_conn_tx_ctrl_pdu_dblk(krrp_conn_t *conn, krrp_dblk_t *dblk,
  84     krrp_error_t *error);
  85 static int krrp_conn_tx_data_pdu_dblk(krrp_conn_t *conn, krrp_dblk_t **dblk,
  86     krrp_error_t *error);
  87 static mblk_t *krrp_conn_dblk_to_mblk(krrp_dblk_t *dblk,
  88     size_t wroff, size_t tail_len);
  89 
  90 static int krrp_conn_rx_header(krrp_conn_t *, krrp_hdr_t **, krrp_error_t *);
  91 static int krrp_conn_rx_pdu(krrp_conn_t *, krrp_pdu_t *, krrp_error_t *);
  92 static int krrp_conn_rx(ksocket_t ks, void *, size_t, krrp_error_t *);
  93 
  94 int
  95 krrp_conn_create_from_scratch(krrp_conn_t **result_conn,
  96     const char *address, int port, int timeout, krrp_error_t *error)
  97 {
  98         krrp_conn_t *conn = NULL;
  99 
 100         VERIFY(result_conn != NULL && *result_conn == NULL);
 101         VERIFY(address != NULL);
 102         VERIFY(port > 0 && port < 65535);
 103 
 104         conn = kmem_zalloc(sizeof (krrp_conn_t), KM_SLEEP);
 105 
 106         if (krrp_conn_connect(conn, address, port, timeout, error) != 0)
 107                 goto fail;
 108 
 109         if (krrp_conn_post_create(conn, error) != 0)
 110                 goto fail;
 111 
 112         *result_conn = conn;
 113         return (0);
 114 
 115 fail:
 116         if (conn->ks != NULL)
 117                 (void) ksocket_close(conn->ks, CRED());
 118 
 119         kmem_free(conn, sizeof (krrp_conn_t));
 120         return (-1);
 121 }
 122 
 123 /* ARGSUSED */
 124 int
 125 krrp_conn_create_from_ksocket(krrp_conn_t **result_conn,
 126     ksocket_t ks, krrp_error_t *error)
 127 {
 128         krrp_conn_t *conn = NULL;
 129 
 130         VERIFY(result_conn != NULL && *result_conn == NULL);
 131 
 132         conn = kmem_zalloc(sizeof (krrp_conn_t), KM_SLEEP);
 133 
 134         conn->ks = ks;
 135 
 136         if (krrp_conn_post_create(conn, error) != 0) {
 137                 kmem_free(conn, sizeof (krrp_conn_t));
 138                 return (-1);
 139         }
 140 
 141         *result_conn = conn;
 142 
 143         return (0);
 144 }
 145 
 146 void
 147 krrp_conn_destroy(krrp_conn_t *conn)
 148 {
 149         mutex_enter(&conn->mtx);
 150 
 151         conn->state = KRRP_CS_DISCONNECTING;
 152         conn->tx_running = B_FALSE;
 153         conn->rx_running = B_FALSE;
 154         mutex_exit(&conn->mtx);
 155 
 156         krrp_conn_throttle_disable(&conn->throttle);
 157 
 158         /*
 159          * We do not join TX and RX thread, because:
 160          *  - conn->ks is held by TX and RX threads on start
 161          *  - TX and RX threads do ksocket_rele() before exit thread
 162          *  - ksocket_close() is blocked while the given ksocket is held
 163          */
 164         if (conn->ks != NULL) {
 165                 (void) ksocket_shutdown(conn->ks, SHUT_RDWR, CRED());
 166                 (void) ksocket_close(conn->ks, CRED());
 167         }
 168 
 169         mutex_enter(&conn->mtx);
 170         conn->state = KRRP_CS_DISCONNECTED;
 171         mutex_exit(&conn->mtx);
 172 
 173         krrp_conn_throttle_fini(&conn->throttle);
 174 
 175         cv_destroy(&conn->cv);
 176 
 177         mutex_destroy(&conn->mtx);
 178 
 179         kmem_free(conn, sizeof (krrp_conn_t));
 180 }
 181 
 182 void
 183 krrp_conn_register_callback(krrp_conn_t *conn,
 184     krrp_conn_cb_t *ev_cb, void *cb_arg)
 185 {
 186         VERIFY(ev_cb != NULL);
 187 
 188         mutex_enter(&conn->mtx);
 189         VERIFY(conn->state == KRRP_CS_CONNECTED);
 190 
 191         conn->state = KRRP_CS_READY_TO_RUN;
 192         conn->callback = ev_cb;
 193         conn->callback_arg = cb_arg;
 194 
 195         mutex_exit(&conn->mtx);
 196 }
 197 
 198 void
 199 krrp_conn_run(krrp_conn_t *conn, krrp_queue_t *ctrl_tx_queue,
 200     krrp_pdu_engine_t *data_pdu_engine,
 201     krrp_get_data_pdu_cb_t *get_data_pdu_cb, void *cb_arg)
 202 {
 203         VERIFY(ctrl_tx_queue != NULL);
 204         VERIFY(data_pdu_engine != NULL);
 205         VERIFY(data_pdu_engine->type == KRRP_PET_DATA);
 206 
 207         mutex_enter(&conn->mtx);
 208         VERIFY(conn->state == KRRP_CS_READY_TO_RUN);
 209 
 210         conn->data_pdu_engine = data_pdu_engine;
 211         conn->ctrl_tx_queue = ctrl_tx_queue;
 212         conn->get_data_pdu_cb = get_data_pdu_cb;
 213         conn->get_data_pdu_cb_arg = cb_arg;
 214 
 215         conn->state = KRRP_CS_ACTIVE;
 216 
 217         conn->tx_running = B_TRUE;
 218         conn->rx_running = B_TRUE;
 219 
 220         krrp_conn_throttle_enable(&conn->throttle);
 221 
 222         ksocket_hold(conn->ks);
 223         /* thread_create never fails */
 224         (void) thread_create(NULL, 0, &krrp_conn_tx_handler,
 225             conn, 0, &p0, TS_RUN, minclsyspri);
 226 
 227         ksocket_hold(conn->ks);
 228         /* thread_create never fails */
 229         (void) thread_create(NULL, 0, &krrp_conn_rx_handler,
 230             conn, 0, &p0, TS_RUN, minclsyspri);
 231 
 232         mutex_exit(&conn->mtx);
 233 }
 234 
 235 void
 236 krrp_conn_stop(krrp_conn_t *conn)
 237 {
 238         mutex_enter(&conn->mtx);
 239         VERIFY3U(conn->state, ==, KRRP_CS_ACTIVE);
 240         conn->state = KRRP_CS_STOPPED;
 241         conn->tx_running = B_FALSE;
 242         conn->rx_running = B_FALSE;
 243         mutex_exit(&conn->mtx);
 244 }
 245 
 246 int
 247 krrp_conn_send_ctrl_data(krrp_conn_t *conn, krrp_opcode_t opcode,
 248     nvlist_t *nvl, krrp_error_t *error)
 249 {
 250         krrp_pdu_ctrl_t *pdu = NULL;
 251         int rc = -1;
 252 
 253         krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
 254         if (pdu != NULL) {
 255                 pdu->hdr->opcode = (uint16_t)opcode;
 256 
 257                 if (nvl != NULL) {
 258                         size_t packed_size = 0;
 259 
 260                         /*
 261                          * fnvlist_size cannot be used, because it uses
 262                          * hardcoded encode-type == NV_ENCODE_NATIVE
 263                          */
 264                         VERIFY3U(nvlist_size(nvl, &packed_size,
 265                             NV_ENCODE_XDR), ==, 0);
 266 
 267                         VERIFY3U(packed_size, <, pdu->dblk->max_data_sz);
 268 
 269                         /*
 270                          * fnvlist_pack cannot be used,
 271                          * because cannot work with preallocated buffers,
 272                          * so just reimplement it here
 273                          */
 274                         VERIFY3U(nvlist_pack(nvl, (char **)&pdu->dblk->data,
 275                             &packed_size, NV_ENCODE_XDR, KM_SLEEP), ==, 0);
 276 
 277                         pdu->dblk->cur_data_sz = packed_size;
 278                         pdu->hdr->payload_sz = (uint32_t)packed_size;
 279                 }
 280 
 281                 rc = krrp_conn_tx_ctrl_pdu(conn, pdu, error);
 282                 krrp_pdu_rele((krrp_pdu_t *)pdu);
 283         } else
 284                 krrp_error_set(error, KRRP_ERRNO_NOMEM, 0);
 285 
 286         return (rc);
 287 }
 288 
 289 int
 290 krrp_conn_rx_ctrl_pdu(krrp_conn_t *conn, krrp_pdu_ctrl_t **result_pdu,
 291     krrp_error_t *error)
 292 {
 293         krrp_pdu_ctrl_t *pdu = NULL;
 294 
 295         VERIFY(conn != NULL);
 296         VERIFY(result_pdu != NULL && *result_pdu == NULL);
 297 
 298         krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
 299         if (pdu == NULL) {
 300                 krrp_error_set(error, KRRP_ERRNO_NOMEM, 0);
 301                 return (-1);
 302         }
 303 
 304         if (krrp_conn_rx_header(conn, (krrp_hdr_t **)&pdu->hdr, error) != 0)
 305                 goto err;
 306 
 307         if (krrp_conn_rx_pdu(conn, (krrp_pdu_t *)pdu, error) != 0)
 308                 goto err;
 309 
 310         *result_pdu = pdu;
 311 
 312         return (0);
 313 
 314 err:
 315         krrp_pdu_rele((krrp_pdu_t *)pdu);
 316         return (-1);
 317 }
 318 
 319 int
 320 krrp_conn_tx_ctrl_pdu(krrp_conn_t *conn, krrp_pdu_ctrl_t *pdu,
 321     krrp_error_t *error)
 322 {
 323         int rc;
 324 
 325         VERIFY(conn != NULL);
 326         VERIFY(pdu != NULL);
 327 
 328         rc = krrp_conn_tx(conn->ks, pdu->hdr,
 329             sizeof (krrp_hdr_t), error);
 330         if (rc != 0)
 331                 return (rc);
 332 
 333         conn->bytes_tx += sizeof (krrp_hdr_t);
 334         rc = krrp_conn_tx_ctrl_pdu_dblk(conn, pdu->dblk, error);
 335         if (rc == 0)
 336                 conn->bytes_tx += sizeof (krrp_hdr_t);
 337 
 338         return (rc);
 339 }
 340 
 341 static int
 342 krrp_conn_post_create(krrp_conn_t *conn, krrp_error_t *error)
 343 {
 344         if (krrp_conn_post_configure(conn, error) != 0)
 345                 return (-1);
 346 
 347         krrp_conn_throttle_init(&conn->throttle);
 348 
 349         mutex_init(&conn->mtx, NULL, MUTEX_DEFAULT, NULL);
 350         cv_init(&conn->cv, NULL, CV_DEFAULT, NULL);
 351 
 352         conn->state = KRRP_CS_CONNECTED;
 353         return (0);
 354 }
 355 
 356 static void
 357 krrp_conn_throttle_init(krrp_throttle_t *throttle)
 358 {
 359         mutex_init(&throttle->mtx, NULL, MUTEX_DEFAULT, NULL);
 360         cv_init(&throttle->cv, NULL, CV_DEFAULT, NULL);
 361 }
 362 
 363 static void
 364 krrp_conn_throttle_fini(krrp_throttle_t *throttle)
 365 {
 366         cv_destroy(&throttle->cv);
 367         mutex_destroy(&throttle->mtx);
 368 }
 369 
 370 static void
 371 krrp_conn_throttle_enable(krrp_throttle_t *throttle)
 372 {
 373         mutex_enter(&throttle->mtx);
 374 
 375         if (throttle->limit != 0) {
 376                 throttle->timer = timeout(&krrp_conn_throttle_cb,
 377                     throttle, drv_usectohz(KRRP_THROTTLE_INTERVAL_US));
 378         }
 379 
 380         mutex_exit(&throttle->mtx);
 381 }
 382 
 383 static void
 384 krrp_conn_throttle_disable(krrp_throttle_t *throttle)
 385 {
 386         timeout_id_t saved_timer;
 387 
 388         mutex_enter(&throttle->mtx);
 389         saved_timer = throttle->timer;
 390         throttle->timer = NULL;
 391         throttle->limit = 0;
 392         cv_signal(&throttle->cv);
 393         mutex_exit(&throttle->mtx);
 394 
 395         if (saved_timer != NULL)
 396                 (void) untimeout(saved_timer);
 397 }
 398 
 399 static void
 400 krrp_conn_throttle_cb(void *void_throttle)
 401 {
 402         krrp_throttle_t *throttle = void_throttle;
 403 
 404         mutex_enter(&throttle->mtx);
 405         if (throttle->limit == 0)
 406                 throttle->remains = SIZE_MAX;
 407         else
 408                 throttle->remains = throttle->limit;
 409 
 410         if (throttle->timer != NULL) {
 411                 throttle->timer = timeout(&krrp_conn_throttle_cb,
 412                     throttle, drv_usectohz(KRRP_THROTTLE_INTERVAL_US));
 413         }
 414 
 415         cv_signal(&throttle->cv);
 416         mutex_exit(&throttle->mtx);
 417 }
 418 
 419 static void
 420 krrp_conn_throttle(krrp_throttle_t *throttle, size_t send_sz)
 421 {
 422         if (throttle->limit == 0)
 423                 return;
 424 
 425         mutex_enter(&throttle->mtx);
 426 
 427         while (throttle->remains == 0 && throttle->limit != 0)
 428                 cv_wait(&throttle->cv, &throttle->mtx);
 429 
 430         if (throttle->remains > send_sz)
 431                 throttle->remains -= send_sz;
 432         else
 433                 throttle->remains = 0;
 434 
 435         mutex_exit(&throttle->mtx);
 436 }
 437 
 438 /*
 439  * This function is called on a krrp session that is either
 440  * already running or not yet. Using the 'only_set' (2nd arg)
 441  * the caller explicitly controls whether to just set
 442  * the limiting rate, or (re)set the rate and start (or continue)
 443  * throttling the traffic right away
 444  */
 445 void
 446 krrp_conn_throttle_set(krrp_conn_t *conn, size_t new_limit,
 447     boolean_t only_set)
 448 {
 449         boolean_t require_enable = B_FALSE;
 450         krrp_throttle_t *throttle = &conn->throttle;
 451 
 452         /*
 453          * We update "remains" each 10ms, so need to
 454          * calculate limit according to this logic
 455          */
 456 
 457         new_limit /= 100;
 458 
 459         if (new_limit == 0)
 460                 krrp_conn_throttle_disable(throttle);
 461         else {
 462                 mutex_enter(&throttle->mtx);
 463 
 464                 /*
 465                  * limit == 0 means that throttle-logic
 466                  * is not active.
 467                  */
 468                 if (throttle->limit == 0)
 469                         require_enable = B_TRUE;
 470 
 471                 throttle->limit = new_limit;
 472                 mutex_exit(&throttle->mtx);
 473 
 474                 if (require_enable && !only_set)
 475                         krrp_conn_throttle_enable(throttle);
 476         }
 477 }
 478 
 479 static int
 480 krrp_conn_connect(krrp_conn_t *conn, const char *host,
 481     int port, int timeout, krrp_error_t *error)
 482 {
 483         int rc;
 484         struct sockaddr_in servaddr;
 485 
 486         VERIFY(host != NULL);
 487         VERIFY(port > 0 && port < 65535);
 488         VERIFY(timeout >= 5 && timeout <= 120);
 489 
 490         (void) memset(&servaddr, 0, sizeof (servaddr));
 491         servaddr.sin_family = AF_INET;
 492         servaddr.sin_port = htons(port);
 493         if (inet_pton(AF_INET, (char *)host, &servaddr.sin_addr) != 1) {
 494                 krrp_error_set(error, KRRP_ERRNO_ADDR, EINVAL);
 495                 return (-1);
 496         }
 497 
 498         rc = ksocket_socket(&conn->ks, AF_INET, SOCK_STREAM, 0,
 499             KSOCKET_SLEEP, CRED());
 500         if (rc != 0) {
 501                 krrp_error_set(error, KRRP_ERRNO_CREATEFAIL, rc);
 502                 return (-1);
 503         }
 504 
 505         rc = krrp_conn_connect_with_timeout(conn->ks,
 506             (struct sockaddr *)&servaddr,
 507             timeout, error);
 508         if (rc != 0) {
 509                 (void) ksocket_close(conn->ks, CRED());
 510                 conn->ks = NULL;
 511         }
 512 
 513         return (rc);
 514 }
 515 
 516 static int
 517 krrp_conn_connect_with_timeout(ksocket_t ks, struct sockaddr *servaddr,
 518     int timeout, krrp_error_t *error)
 519 {
 520         int rc, nonblocking, rval = 0;
 521         ksocket_callbacks_t     ks_cb;
 522         krrp_conn_connect_timeout_t ct;
 523 
 524         nonblocking = 1;
 525         rc = ksocket_ioctl(ks, FIONBIO, (intptr_t)&nonblocking,
 526             &rval, CRED());
 527         if (rc != 0) {
 528                 krrp_error_set(error, KRRP_ERRNO_SETSOCKOPTFAIL, rc);
 529                 goto out;
 530         }
 531 
 532         ks_cb.ksock_cb_flags = KSOCKET_CB_CONNECTED |
 533             KSOCKET_CB_CONNECTFAILED | KSOCKET_CB_DISCONNECTED;
 534         ks_cb.ksock_cb_connected = &krrp_conn_connect_cb;
 535         ks_cb.ksock_cb_connectfailed = &krrp_conn_connect_cb;
 536         ks_cb.ksock_cb_disconnected = &krrp_conn_connect_cb;
 537 
 538         rc = ksocket_setcallbacks(ks, &ks_cb, &ct, CRED());
 539         if (rc != 0) {
 540                 krrp_error_set(error, KRRP_ERRNO_SETSOCKOPTFAIL, rc);
 541                 goto out;
 542         }
 543 
 544         mutex_init(&ct.mtx, NULL, MUTEX_DEFAULT, NULL);
 545         cv_init(&ct.cv, NULL, CV_DEFAULT, NULL);
 546         ct.cb_done = B_FALSE;
 547         ct.rc = 0;
 548 
 549         rc = ksocket_connect(ks, servaddr, sizeof (*servaddr), CRED());
 550         if (rc == 0 || rc == EISCONN) {
 551                 rc = 0;
 552                 goto cleanup;
 553         }
 554 
 555         if (rc != EINPROGRESS && rc != EALREADY) {
 556                 krrp_error_set(error, KRRP_ERRNO_CONNFAIL, rc);
 557                 goto cleanup;
 558         }
 559 
 560         mutex_enter(&ct.mtx);
 561         if (!ct.cb_done)
 562                 (void) cv_reltimedwait_sig(&ct.cv, &ct.mtx,
 563                     SEC_TO_TICK(timeout), TR_CLOCK_TICK);
 564 
 565         rc = ct.cb_done ? ct.rc : ETIMEDOUT;
 566 
 567         if (rc != 0)
 568                 krrp_error_set(error, KRRP_ERRNO_CONNFAIL, rc);
 569 
 570         mutex_exit(&ct.mtx);
 571 
 572 cleanup:
 573         nonblocking = 0;
 574         (void) ksocket_ioctl(ks, FIONBIO, (intptr_t)&nonblocking,
 575             &rval, CRED());
 576 
 577         (void) ksocket_setcallbacks(ks, NULL, NULL, CRED());
 578 
 579         cv_destroy(&ct.cv);
 580         mutex_destroy(&ct.mtx);
 581 
 582 out:
 583         return (rc);
 584 }
 585 
 586 /* ARGSUSED */
 587 void
 588 krrp_conn_connect_cb(ksocket_t ks,
 589     ksocket_callback_event_t ev, void *arg, uintptr_t info)
 590 {
 591         krrp_conn_connect_timeout_t *ct = arg;
 592 
 593         VERIFY(ct != NULL);
 594         VERIFY(ev == KSOCKET_EV_CONNECTED ||
 595             ev == KSOCKET_EV_CONNECTFAILED ||
 596             ev == KSOCKET_EV_DISCONNECTED);
 597 
 598         mutex_enter(&ct->mtx);
 599         ct->cb_done = B_TRUE;
 600         if (ev == KSOCKET_EV_CONNECTED)
 601                 ct->rc = 0;
 602         else
 603                 ct->rc = info == 0 ? ECONNRESET : (int)info;
 604 
 605         cv_signal(&ct->cv);
 606         mutex_exit(&ct->mtx);
 607 }
 608 
 609 static int
 610 krrp_conn_post_configure(krrp_conn_t *conn, krrp_error_t *error)
 611 {
 612         struct so_snd_bufinfo snd_bufinfo;
 613         uint32_t value;
 614         int value_len;
 615         int rc;
 616 
 617         value = 1024 * 1024;
 618         rc = ksocket_setsockopt(conn->ks, SOL_SOCKET, SO_SNDBUF,
 619             (const void *) &value, sizeof (value), CRED());
 620         if (rc != 0)
 621                 goto err_set;
 622 
 623         value = 1024 * 1024;
 624         rc = ksocket_setsockopt(conn->ks, SOL_SOCKET, SO_RCVBUF,
 625             (const void *) &value, sizeof (value), CRED());
 626         if (rc != 0)
 627                 goto err_set;
 628 
 629         value = 1;
 630         rc = ksocket_setsockopt(conn->ks, IPPROTO_TCP, TCP_NODELAY,
 631             (const void *) &value, sizeof (value), CRED());
 632         if (rc != 0)
 633                 goto err_set;
 634 
 635         /* Do not allow to set it less 100 to exclude any side-effect */
 636         value = krrp_tcp_abort_threshold > 100 ?
 637             krrp_tcp_abort_threshold : KRRP_TCP_ABORT_THRESHOLD_DEFAULT;
 638         rc = ksocket_setsockopt(conn->ks, IPPROTO_TCP, TCP_ABORT_THRESHOLD,
 639             (const void *) &value, sizeof (value), CRED());
 640         if (rc != 0)
 641                 goto err_set;
 642 
 643         if (get_udatamodel() == DATAMODEL_NONE ||
 644             get_udatamodel() == DATAMODEL_NATIVE) {
 645                 struct timeval tl;
 646 
 647                 tl.tv_sec = KRRP_RX_TIMEOUT;
 648                 tl.tv_usec = 0;
 649 
 650                 rc = ksocket_setsockopt(conn->ks, SOL_SOCKET, SO_RCVTIMEO,
 651                     &tl, sizeof (struct timeval), CRED());
 652         } else {
 653                 struct timeval32 tl;
 654 
 655                 tl.tv_sec = KRRP_RX_TIMEOUT;
 656                 tl.tv_usec = 0;
 657 
 658                 rc = ksocket_setsockopt(conn->ks, SOL_SOCKET, SO_RCVTIMEO,
 659                     &tl, sizeof (struct timeval32), CRED());
 660         }
 661 
 662         if (rc != 0)
 663                 goto err_set;
 664 
 665         value_len = sizeof (snd_bufinfo);
 666         rc = ksocket_getsockopt(conn->ks, SOL_SOCKET, SO_SND_BUFINFO,
 667             (void *)&snd_bufinfo, &value_len, CRED());
 668         if (rc != 0)
 669                 goto err_get;
 670 
 671         conn->mblk_wroff = (size_t)snd_bufinfo.sbi_wroff;
 672         conn->mblk_tail_len = (size_t)snd_bufinfo.sbi_tail;
 673 
 674         if (snd_bufinfo.sbi_maxblk == INFPSZ) {
 675                 /* LSO is enabled */
 676                 conn->blk_sz = snd_bufinfo.sbi_maxpsz;
 677 
 678                 /*
 679                  * kmem_alloc for allocations that are less 128k
 680                  * uses kmem_cache, otherwise some slow-path,
 681                  * so to exclude performance problems if LSO allows
 682                  * very big buffer the maximum block size is 128k
 683                  */
 684                 if (conn->blk_sz > 128 * 1024)
 685                         conn->blk_sz = 128 * 1024;
 686         } else {
 687                 conn->blk_sz = snd_bufinfo.sbi_maxblk;
 688         }
 689 
 690         return (0);
 691 
 692 err_set:
 693         krrp_error_set(error, KRRP_ERRNO_SETSOCKOPTFAIL, rc);
 694         return (-1);
 695 
 696 err_get:
 697         krrp_error_set(error, KRRP_ERRNO_GETSOCKOPTFAIL, rc);
 698         return (-1);
 699 }
 700 
 701 static void
 702 krrp_conn_tx_handler(void *void_conn)
 703 {
 704         krrp_conn_t *conn = void_conn;
 705         krrp_pdu_t *pdu;
 706         krrp_error_t error;
 707         int rc = 0;
 708 
 709         krrp_error_init(&error);
 710 
 711         mutex_enter(&conn->mtx);
 712 
 713         while (conn->tx_running) {
 714                 mutex_exit(&conn->mtx);
 715 
 716                 /*
 717                  * At the sender side TX path sends CTRL and DATA PDUs
 718                  */
 719                 if (conn->get_data_pdu_cb != NULL) {
 720                         pdu = krrp_queue_get_no_wait(conn->ctrl_tx_queue);
 721 
 722                         if (pdu == NULL) {
 723                                 conn->get_data_pdu_cb(conn->get_data_pdu_cb_arg,
 724                                     &pdu);
 725                                 if (pdu == NULL) {
 726                                         mutex_enter(&conn->mtx);
 727                                         continue;
 728                                 }
 729 
 730                                 conn->cur_txg = ((krrp_pdu_data_t *)pdu)->txg;
 731                         }
 732                 } else {
 733                         pdu = krrp_queue_get(conn->ctrl_tx_queue);
 734                         if (pdu == NULL) {
 735                                 mutex_enter(&conn->mtx);
 736                                 continue;
 737                         }
 738                 }
 739 
 740                 rc = krrp_conn_tx_pdu(conn, pdu, &error);
 741                 krrp_pdu_rele(pdu);
 742                 mutex_enter(&conn->mtx);
 743                 if (rc != 0)
 744                         break;
 745 
 746                 conn->cur_txg = 0;
 747         }
 748 
 749         if (conn->state == KRRP_CS_DISCONNECTING)
 750                 (void) memset(&error, 0, sizeof (error));
 751 
 752         if (error.krrp_errno != 0) {
 753                 conn->state = KRRP_CS_DISCONNECTING;
 754                 conn->rx_running = B_FALSE;
 755         }
 756 
 757         mutex_exit(&conn->mtx);
 758 
 759         ksocket_rele(conn->ks);
 760 
 761         if (error.krrp_errno != 0)
 762                 krrp_conn_callback(conn, KRRP_CONN_ERROR, &error);
 763 
 764         thread_exit();
 765 }
 766 
 767 static int
 768 krrp_conn_tx_pdu(krrp_conn_t *conn, krrp_pdu_t *pdu, krrp_error_t *error)
 769 {
 770         int rc;
 771 
 772 #ifdef KRRP_CONN_DEBUG
 773         cmn_err(CE_NOTE, "TX PDU-[%s], payload:[%u][%lu]",
 774             (pdu->type == KRRP_PT_DATA ? "DATA" : "CTRL"),
 775             pdu->hdr->payload_sz, pdu->cur_data_sz);
 776 #endif
 777 
 778         rc = krrp_conn_tx(conn->ks, pdu->hdr,
 779             sizeof (krrp_hdr_t), error);
 780         if (rc != 0)
 781                 return (rc);
 782 
 783         switch (pdu->type) {
 784         case KRRP_PT_DATA:
 785                 conn->bytes_tx += sizeof (krrp_hdr_t);
 786                 rc = krrp_conn_tx_data_pdu_dblk(conn, &pdu->dblk, error);
 787                 break;
 788         case KRRP_PT_CTRL:
 789                 conn->bytes_tx += sizeof (krrp_hdr_t);
 790                 rc = krrp_conn_tx_ctrl_pdu_dblk(conn, pdu->dblk, error);
 791                 break;
 792         }
 793 
 794         return (rc);
 795 }
 796 
 797 static int
 798 krrp_conn_tx(ksocket_t ks, void *buff, size_t buff_sz, krrp_error_t *error)
 799 {
 800         int rc = 0;
 801         size_t sent = 0, remains, offset = 0;
 802 
 803         remains = buff_sz;
 804         while (remains > 0) {
 805                 rc = ksocket_send(ks, (void *)(((uintptr_t)buff) + offset),
 806                     remains, 0, &sent, CRED());
 807                 if (rc != 0) {
 808                         krrp_error_set(error, KRRP_ERRNO_SENDFAIL, rc);
 809                         break;
 810                 }
 811 
 812                 remains -= sent;
 813                 offset += sent;
 814                 sent = 0;
 815         }
 816 
 817         return (rc);
 818 }
 819 
 820 static int
 821 krrp_conn_tx_mblk(ksocket_t ks, mblk_t *mp, krrp_error_t *error)
 822 {
 823         int rc;
 824         struct nmsghdr msghdr;
 825 
 826         msghdr.msg_name = NULL;
 827         msghdr.msg_namelen = 0;
 828         msghdr.msg_control = NULL;
 829         msghdr.msg_controllen = 0;
 830         msghdr.msg_flags = MSG_EOR;
 831 
 832         rc = ksocket_sendmblk(ks, &msghdr, 0, &mp, CRED());
 833         if (rc != 0) {
 834                 krrp_error_set(error, KRRP_ERRNO_SENDMBLKFAIL, rc);
 835                 if (mp != NULL)
 836                         freeb(mp);
 837         }
 838 
 839         return (rc);
 840 }
 841 
 842 static int
 843 krrp_conn_tx_ctrl_pdu_dblk(krrp_conn_t *conn, krrp_dblk_t *dblk,
 844     krrp_error_t *error)
 845 {
 846         while (dblk != NULL) {
 847                 int rc;
 848 
 849                 rc = krrp_conn_tx(conn->ks, dblk->data,
 850                     dblk->cur_data_sz, error);
 851                 if (rc != 0)
 852                         return (rc);
 853 
 854                 conn->bytes_tx += dblk->cur_data_sz;
 855                 dblk = dblk->next;
 856         }
 857 
 858         return (0);
 859 }
 860 
 861 static int
 862 krrp_conn_tx_data_pdu_dblk(krrp_conn_t *conn, krrp_dblk_t **dblk,
 863     krrp_error_t *error)
 864 {
 865         krrp_dblk_t *dblk_cur;
 866 
 867         dblk_cur = *dblk;
 868         while (dblk_cur != NULL && dblk_cur->cur_data_sz != 0) {
 869                 mblk_t *mp;
 870                 int rc;
 871 
 872                 *dblk = dblk_cur->next;
 873                 dblk_cur->next = NULL;
 874 
 875                 mp = krrp_conn_dblk_to_mblk(dblk_cur,
 876                     conn->mblk_wroff, 0);
 877                 if (mp == NULL) {
 878                         krrp_error_set(error, KRRP_ERRNO_NOMEM, 0);
 879                         return (ENOMEM);
 880                 }
 881 
 882                 krrp_conn_throttle(&conn->throttle, dblk_cur->cur_data_sz);
 883 
 884                 conn->bytes_tx += dblk_cur->cur_data_sz;
 885                 rc = krrp_conn_tx_mblk(conn->ks, mp, error);
 886                 if (rc != 0) {
 887                         return (rc);
 888                 }
 889 
 890                 dblk_cur = *dblk;
 891         }
 892 
 893         return (0);
 894 }
 895 
 896 /* ARGSUSED */
 897 static mblk_t *
 898 krrp_conn_dblk_to_mblk(krrp_dblk_t *dblk,
 899     size_t wroff, size_t tail_len)
 900 {
 901         mblk_t *mp;
 902 
 903         mp = desballoc(dblk->head, dblk->total_sz, 0, &dblk->free_rtns);
 904         if (mp != NULL) {
 905                 mp->b_rptr += wroff;
 906                 mp->b_wptr = mp->b_rptr + dblk->cur_data_sz;
 907         }
 908 
 909         return (mp);
 910 }
 911 
 912 static void
 913 krrp_conn_rx_handler(void *void_conn)
 914 {
 915         krrp_conn_t *conn = void_conn;
 916 
 917         int rc;
 918         krrp_pdu_t *pdu = NULL;
 919         krrp_hdr_t *hdr = NULL;
 920         krrp_error_t error;
 921 
 922         krrp_error_init(&error);
 923 
 924         mutex_enter(&conn->mtx);
 925 
 926         while (conn->rx_running) {
 927                 mutex_exit(&conn->mtx);
 928 
 929                 if (hdr == NULL) {
 930                         if (krrp_conn_rx_header(conn, &hdr, &error) != 0) {
 931                                 mutex_enter(&conn->mtx);
 932                                 conn->rx_running = B_FALSE;
 933                                 continue;
 934                         }
 935                 }
 936 
 937 #ifdef KRRP_CONN_DEBUG
 938                 cmn_err(CE_NOTE, "HDR: opcode:[%u]; flags:[%u]; "
 939                     "payload_sz:[%u]",
 940                     hdr->opcode, hdr->flags, hdr->payload_sz);
 941 #endif
 942 
 943                 if (hdr->opcode & KRRP_CTRL_OPCODE_MASK)
 944                         krrp_pdu_ctrl_alloc((krrp_pdu_ctrl_t **)&pdu,
 945                             KRRP_PDU_WITHOUT_HDR);
 946                 else if (conn->data_pdu_engine != NULL) {
 947                         krrp_pdu_alloc(conn->data_pdu_engine, &pdu,
 948                             KRRP_PDU_WITHOUT_HDR);
 949                         conn->cur_txg = ((krrp_hdr_data_t *)hdr)->txg;
 950                 } else {
 951                         /*
 952                          * This thread is not used at initial stage,
 953                          * so at the running stage DataPDUEngine must be defined
 954                          */
 955                         cmn_err(CE_PANIC, "Data PDU Engined is not defined");
 956                 }
 957 
 958                 if (pdu == NULL) {
 959                         mutex_enter(&conn->mtx);
 960                         continue;
 961                 }
 962 
 963                 pdu->hdr = hdr;
 964                 hdr = NULL;
 965 
 966                 rc = krrp_conn_rx_pdu(conn, pdu, &error);
 967                 if (rc == 0) {
 968                         krrp_conn_process_received_pdu(conn, pdu);
 969                         pdu = NULL;
 970                         conn->cur_txg = 0;
 971                 }
 972 
 973                 mutex_enter(&conn->mtx);
 974                 if (rc != 0)
 975                         break;
 976         }
 977 
 978         if (conn->state == KRRP_CS_DISCONNECTING)
 979                 (void) memset(&error, 0, sizeof (error));
 980 
 981         if (error.krrp_errno != 0) {
 982                 conn->state = KRRP_CS_DISCONNECTING;
 983                 conn->tx_running = B_FALSE;
 984         }
 985 
 986         mutex_exit(&conn->mtx);
 987 
 988         if (hdr != NULL)
 989                 kmem_free(hdr, sizeof (krrp_hdr_t));
 990 
 991         if (pdu != NULL)
 992                 krrp_pdu_rele(pdu);
 993 
 994         if (error.krrp_errno != 0)
 995                 krrp_conn_callback(conn, KRRP_CONN_ERROR, &error);
 996 
 997         ksocket_rele(conn->ks);
 998 
 999         thread_exit();
1000 }
1001 
1002 static int
1003 krrp_conn_rx_header(krrp_conn_t *conn, krrp_hdr_t **result_hdr,
1004     krrp_error_t *error)
1005 {
1006         krrp_hdr_t *hdr;
1007         int rc;
1008 
1009         hdr = kmem_zalloc(sizeof (krrp_hdr_t), KM_SLEEP);
1010         rc = krrp_conn_rx(conn->ks, hdr, sizeof (krrp_hdr_t), error);
1011         if (rc == 0) {
1012                 conn->bytes_rx += sizeof (krrp_hdr_t);
1013                 *result_hdr = hdr;
1014                 return (0);
1015         }
1016 
1017         kmem_free(hdr, sizeof (krrp_hdr_t));
1018         return (rc);
1019 }
1020 
1021 static int
1022 krrp_conn_rx_pdu(krrp_conn_t *conn, krrp_pdu_t *pdu, krrp_error_t *error)
1023 {
1024         krrp_dblk_t *dblk;
1025         size_t remaining_sz;
1026         size_t cnt = 0;
1027 
1028         if (pdu->hdr->payload_sz > pdu->max_data_sz) {
1029                 krrp_error_set(error, KRRP_ERRNO_BIGPAYLOAD, 0);
1030                 return (-1);
1031         }
1032 
1033         remaining_sz = pdu->hdr->payload_sz;
1034         dblk = pdu->dblk;
1035         while (remaining_sz != 0) {
1036                 int rc;
1037                 size_t need_to_recv;
1038 
1039                 /* Something wrong in our PDU Engine */
1040                 ASSERT(dblk != NULL);
1041 
1042                 if (remaining_sz > dblk->max_data_sz)
1043                         need_to_recv = dblk->max_data_sz;
1044                 else
1045                         need_to_recv = remaining_sz;
1046 
1047 #ifdef KRRP_CONN_DEBUG
1048                 cmn_err(CE_NOTE, "RX dblk #[%lu] [%lu]",
1049                     cnt, remaining_sz);
1050 #endif
1051 
1052                 rc = krrp_conn_rx(conn->ks, dblk->data, need_to_recv, error);
1053                 if (rc != 0)
1054                         return (rc);
1055 
1056                 conn->bytes_rx += need_to_recv;
1057                 dblk->cur_data_sz = need_to_recv;
1058                 remaining_sz -= need_to_recv;
1059                 dblk = dblk->next;
1060                 cnt++;
1061         }
1062 
1063         pdu->cur_data_sz = pdu->hdr->payload_sz;
1064 
1065         return (0);
1066 }
1067 
1068 static int
1069 krrp_conn_rx(ksocket_t ks, void *buff, size_t buff_sz, krrp_error_t *error)
1070 {
1071         int rc;
1072         size_t received = 0;
1073 
1074         rc = ksocket_recv(ks, buff, buff_sz, MSG_WAITALL,
1075             &received, CRED());
1076 
1077         if ((rc != 0) || (received != buff_sz)) {
1078                 if (rc == 0) {
1079                         if (received == 0)
1080                                 krrp_error_set(error, KRRP_ERRNO_UNEXPCLOSE, 0);
1081                         else
1082                                 krrp_error_set(error, KRRP_ERRNO_UNEXPEND, 0);
1083                 } else {
1084                         if (rc == EAGAIN)
1085                                 rc = ETIMEDOUT;
1086 
1087                         krrp_error_set(error, KRRP_ERRNO_RECVFAIL, rc);
1088                 }
1089 
1090                 rc = -1;
1091         }
1092 
1093         return (rc);
1094 }
1095 
1096 static void
1097 krrp_conn_process_received_pdu(krrp_conn_t *conn, krrp_pdu_t *pdu)
1098 {
1099         krrp_conn_cb_ev_t ev;
1100 
1101         if (krrp_pdu_type(pdu) == KRRP_PT_DATA)
1102                 ev = KRRP_CONN_DATA_PDU;
1103         else
1104                 ev = KRRP_CONN_CTRL_PDU;
1105 
1106         krrp_conn_callback(conn, ev, pdu);
1107 }