1 /*
2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
5 * 1.0 of the CDDL.
6 *
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
10 */
11
12 /*
13 * Copyright 2019 Nexenta Systems, Inc. All rights reserved.
14 */
15
16 #include <sys/socket.h>
17 #include <netinet/tcp.h>
18 #include <inet/tcp.h>
19 #include <sys/strsubr.h>
20 #include <sys/socketvar.h>
21 #include <sys/filio.h>
22
23 #include "krrp_connection.h"
24
25 /* #define KRRP_CONN_DEBUG 1 */
26
27 /* Interval in us */
28 #define KRRP_THROTTLE_INTERVAL_US (10 * MILLISEC)
29
30 #define krrp_conn_callback(conn, ev, ev_arg) \
31 (conn)->callback(conn, ev, \
32 (uintptr_t)ev_arg, (conn)->callback_arg)
33
34 /*
35 * KRRP TCP-level connection timeout default (60000 ms).
36 * Note that TCP's own default is 5min or 300,000ms.
37 */
38 #define KRRP_TCP_ABORT_THRESHOLD_DEFAULT 60000
39
40 /*
41 * The value is specified in milliseconds and does not
42 * have any effect on an already created KRRP connection(s)
43 * should be in range: 100 ... UINT32_MAX
44 *
45 * Note: use with caution
46 */
47 uint32_t krrp_tcp_abort_threshold = 0;
48
49 typedef struct {
50 kmutex_t mtx;
51 kcondvar_t cv;
52 int rc;
53 boolean_t cb_done;
54 } krrp_conn_connect_timeout_t;
55
56 static void krrp_conn_throttle_init(krrp_throttle_t *throttle);
57 static void krrp_conn_throttle_fini(krrp_throttle_t *throttle);
58 static void krrp_conn_throttle_enable(krrp_throttle_t *throttle);
59 static void krrp_conn_throttle_disable(krrp_throttle_t *throttle);
60 static void krrp_conn_throttle_cb(void *void_throttle);
61 static void krrp_conn_throttle(krrp_throttle_t *throttle, size_t send_sz);
62
63 static int krrp_conn_post_create(krrp_conn_t *conn, krrp_error_t *error);
64 static int krrp_conn_connect(krrp_conn_t *conn, const char *host,
65 int port, int timeout, krrp_error_t *error);
66 static int krrp_conn_connect_with_timeout(ksocket_t ks,
67 struct sockaddr *servaddr, int timeout, krrp_error_t *error);
68 void krrp_conn_connect_cb(ksocket_t ks,
69 ksocket_callback_event_t ev, void *arg, uintptr_t info);
70 static int krrp_conn_post_configure(krrp_conn_t *conn, krrp_error_t *error);
71
72 static void krrp_conn_tx_handler(void *void_conn);
73 static void krrp_conn_rx_handler(void *void_conn);
74
75 static void krrp_conn_process_received_pdu(krrp_conn_t *conn,
76 krrp_pdu_t *pdu);
77
78 static int krrp_conn_tx_pdu(krrp_conn_t *conn, krrp_pdu_t *pdu,
79 krrp_error_t *error);
80 static int krrp_conn_tx(ksocket_t ks, void *buff, size_t buff_sz,
81 krrp_error_t *error);
82 static int krrp_conn_tx_mblk(ksocket_t ks, mblk_t *mp, krrp_error_t *error);
83 static int krrp_conn_tx_ctrl_pdu_dblk(krrp_conn_t *conn, krrp_dblk_t *dblk,
84 krrp_error_t *error);
85 static int krrp_conn_tx_data_pdu_dblk(krrp_conn_t *conn, krrp_dblk_t **dblk,
86 krrp_error_t *error);
87 static mblk_t *krrp_conn_dblk_to_mblk(krrp_dblk_t *dblk,
88 size_t wroff, size_t tail_len);
89
90 static int krrp_conn_rx_header(krrp_conn_t *, krrp_hdr_t **, krrp_error_t *);
91 static int krrp_conn_rx_pdu(krrp_conn_t *, krrp_pdu_t *, krrp_error_t *);
92 static int krrp_conn_rx(ksocket_t ks, void *, size_t, krrp_error_t *);
93
94 int
95 krrp_conn_create_from_scratch(krrp_conn_t **result_conn,
96 const char *address, int port, int timeout, krrp_error_t *error)
97 {
98 krrp_conn_t *conn = NULL;
99
100 VERIFY(result_conn != NULL && *result_conn == NULL);
101 VERIFY(address != NULL);
102 VERIFY(port > 0 && port < 65535);
103
104 conn = kmem_zalloc(sizeof (krrp_conn_t), KM_SLEEP);
105
106 if (krrp_conn_connect(conn, address, port, timeout, error) != 0)
107 goto fail;
108
109 if (krrp_conn_post_create(conn, error) != 0)
110 goto fail;
111
112 *result_conn = conn;
113 return (0);
114
115 fail:
116 if (conn->ks != NULL)
117 (void) ksocket_close(conn->ks, CRED());
118
119 kmem_free(conn, sizeof (krrp_conn_t));
120 return (-1);
121 }
122
123 /* ARGSUSED */
124 int
125 krrp_conn_create_from_ksocket(krrp_conn_t **result_conn,
126 ksocket_t ks, krrp_error_t *error)
127 {
128 krrp_conn_t *conn = NULL;
129
130 VERIFY(result_conn != NULL && *result_conn == NULL);
131
132 conn = kmem_zalloc(sizeof (krrp_conn_t), KM_SLEEP);
133
134 conn->ks = ks;
135
136 if (krrp_conn_post_create(conn, error) != 0) {
137 kmem_free(conn, sizeof (krrp_conn_t));
138 return (-1);
139 }
140
141 *result_conn = conn;
142
143 return (0);
144 }
145
146 void
147 krrp_conn_destroy(krrp_conn_t *conn)
148 {
149 mutex_enter(&conn->mtx);
150
151 conn->state = KRRP_CS_DISCONNECTING;
152 conn->tx_running = B_FALSE;
153 conn->rx_running = B_FALSE;
154 mutex_exit(&conn->mtx);
155
156 krrp_conn_throttle_disable(&conn->throttle);
157
158 /*
159 * We do not join TX and RX thread, because:
160 * - conn->ks is held by TX and RX threads on start
161 * - TX and RX threads do ksocket_rele() before exit thread
162 * - ksocket_close() is blocked while the given ksocket is held
163 */
164 if (conn->ks != NULL) {
165 (void) ksocket_shutdown(conn->ks, SHUT_RDWR, CRED());
166 (void) ksocket_close(conn->ks, CRED());
167 }
168
169 mutex_enter(&conn->mtx);
170 conn->state = KRRP_CS_DISCONNECTED;
171 mutex_exit(&conn->mtx);
172
173 krrp_conn_throttle_fini(&conn->throttle);
174
175 cv_destroy(&conn->cv);
176
177 mutex_destroy(&conn->mtx);
178
179 kmem_free(conn, sizeof (krrp_conn_t));
180 }
181
182 void
183 krrp_conn_register_callback(krrp_conn_t *conn,
184 krrp_conn_cb_t *ev_cb, void *cb_arg)
185 {
186 VERIFY(ev_cb != NULL);
187
188 mutex_enter(&conn->mtx);
189 VERIFY(conn->state == KRRP_CS_CONNECTED);
190
191 conn->state = KRRP_CS_READY_TO_RUN;
192 conn->callback = ev_cb;
193 conn->callback_arg = cb_arg;
194
195 mutex_exit(&conn->mtx);
196 }
197
198 void
199 krrp_conn_run(krrp_conn_t *conn, krrp_queue_t *ctrl_tx_queue,
200 krrp_pdu_engine_t *data_pdu_engine,
201 krrp_get_data_pdu_cb_t *get_data_pdu_cb, void *cb_arg)
202 {
203 VERIFY(ctrl_tx_queue != NULL);
204 VERIFY(data_pdu_engine != NULL);
205 VERIFY(data_pdu_engine->type == KRRP_PET_DATA);
206
207 mutex_enter(&conn->mtx);
208 VERIFY(conn->state == KRRP_CS_READY_TO_RUN);
209
210 conn->data_pdu_engine = data_pdu_engine;
211 conn->ctrl_tx_queue = ctrl_tx_queue;
212 conn->get_data_pdu_cb = get_data_pdu_cb;
213 conn->get_data_pdu_cb_arg = cb_arg;
214
215 conn->state = KRRP_CS_ACTIVE;
216
217 conn->tx_running = B_TRUE;
218 conn->rx_running = B_TRUE;
219
220 krrp_conn_throttle_enable(&conn->throttle);
221
222 ksocket_hold(conn->ks);
223 /* thread_create never fails */
224 (void) thread_create(NULL, 0, &krrp_conn_tx_handler,
225 conn, 0, &p0, TS_RUN, minclsyspri);
226
227 ksocket_hold(conn->ks);
228 /* thread_create never fails */
229 (void) thread_create(NULL, 0, &krrp_conn_rx_handler,
230 conn, 0, &p0, TS_RUN, minclsyspri);
231
232 mutex_exit(&conn->mtx);
233 }
234
235 void
236 krrp_conn_stop(krrp_conn_t *conn)
237 {
238 mutex_enter(&conn->mtx);
239 VERIFY3U(conn->state, ==, KRRP_CS_ACTIVE);
240 conn->state = KRRP_CS_STOPPED;
241 conn->tx_running = B_FALSE;
242 conn->rx_running = B_FALSE;
243 mutex_exit(&conn->mtx);
244 }
245
246 int
247 krrp_conn_send_ctrl_data(krrp_conn_t *conn, krrp_opcode_t opcode,
248 nvlist_t *nvl, krrp_error_t *error)
249 {
250 krrp_pdu_ctrl_t *pdu = NULL;
251 int rc = -1;
252
253 krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
254 if (pdu != NULL) {
255 pdu->hdr->opcode = (uint16_t)opcode;
256
257 if (nvl != NULL) {
258 size_t packed_size = 0;
259
260 /*
261 * fnvlist_size cannot be used, because it uses
262 * hardcoded encode-type == NV_ENCODE_NATIVE
263 */
264 VERIFY3U(nvlist_size(nvl, &packed_size,
265 NV_ENCODE_XDR), ==, 0);
266
267 VERIFY3U(packed_size, <, pdu->dblk->max_data_sz);
268
269 /*
270 * fnvlist_pack cannot be used,
271 * because cannot work with preallocated buffers,
272 * so just reimplement it here
273 */
274 VERIFY3U(nvlist_pack(nvl, (char **)&pdu->dblk->data,
275 &packed_size, NV_ENCODE_XDR, KM_SLEEP), ==, 0);
276
277 pdu->dblk->cur_data_sz = packed_size;
278 pdu->hdr->payload_sz = (uint32_t)packed_size;
279 }
280
281 rc = krrp_conn_tx_ctrl_pdu(conn, pdu, error);
282 krrp_pdu_rele((krrp_pdu_t *)pdu);
283 } else
284 krrp_error_set(error, KRRP_ERRNO_NOMEM, 0);
285
286 return (rc);
287 }
288
289 int
290 krrp_conn_rx_ctrl_pdu(krrp_conn_t *conn, krrp_pdu_ctrl_t **result_pdu,
291 krrp_error_t *error)
292 {
293 krrp_pdu_ctrl_t *pdu = NULL;
294
295 VERIFY(conn != NULL);
296 VERIFY(result_pdu != NULL && *result_pdu == NULL);
297
298 krrp_pdu_ctrl_alloc(&pdu, KRRP_PDU_WITH_HDR);
299 if (pdu == NULL) {
300 krrp_error_set(error, KRRP_ERRNO_NOMEM, 0);
301 return (-1);
302 }
303
304 if (krrp_conn_rx_header(conn, (krrp_hdr_t **)&pdu->hdr, error) != 0)
305 goto err;
306
307 if (krrp_conn_rx_pdu(conn, (krrp_pdu_t *)pdu, error) != 0)
308 goto err;
309
310 *result_pdu = pdu;
311
312 return (0);
313
314 err:
315 krrp_pdu_rele((krrp_pdu_t *)pdu);
316 return (-1);
317 }
318
319 int
320 krrp_conn_tx_ctrl_pdu(krrp_conn_t *conn, krrp_pdu_ctrl_t *pdu,
321 krrp_error_t *error)
322 {
323 int rc;
324
325 VERIFY(conn != NULL);
326 VERIFY(pdu != NULL);
327
328 rc = krrp_conn_tx(conn->ks, pdu->hdr,
329 sizeof (krrp_hdr_t), error);
330 if (rc != 0)
331 return (rc);
332
333 conn->bytes_tx += sizeof (krrp_hdr_t);
334 rc = krrp_conn_tx_ctrl_pdu_dblk(conn, pdu->dblk, error);
335 if (rc == 0)
336 conn->bytes_tx += sizeof (krrp_hdr_t);
337
338 return (rc);
339 }
340
341 static int
342 krrp_conn_post_create(krrp_conn_t *conn, krrp_error_t *error)
343 {
344 if (krrp_conn_post_configure(conn, error) != 0)
345 return (-1);
346
347 krrp_conn_throttle_init(&conn->throttle);
348
349 mutex_init(&conn->mtx, NULL, MUTEX_DEFAULT, NULL);
350 cv_init(&conn->cv, NULL, CV_DEFAULT, NULL);
351
352 conn->state = KRRP_CS_CONNECTED;
353 return (0);
354 }
355
356 static void
357 krrp_conn_throttle_init(krrp_throttle_t *throttle)
358 {
359 mutex_init(&throttle->mtx, NULL, MUTEX_DEFAULT, NULL);
360 cv_init(&throttle->cv, NULL, CV_DEFAULT, NULL);
361 }
362
363 static void
364 krrp_conn_throttle_fini(krrp_throttle_t *throttle)
365 {
366 cv_destroy(&throttle->cv);
367 mutex_destroy(&throttle->mtx);
368 }
369
370 static void
371 krrp_conn_throttle_enable(krrp_throttle_t *throttle)
372 {
373 mutex_enter(&throttle->mtx);
374
375 if (throttle->limit != 0) {
376 throttle->timer = timeout(&krrp_conn_throttle_cb,
377 throttle, drv_usectohz(KRRP_THROTTLE_INTERVAL_US));
378 }
379
380 mutex_exit(&throttle->mtx);
381 }
382
383 static void
384 krrp_conn_throttle_disable(krrp_throttle_t *throttle)
385 {
386 timeout_id_t saved_timer;
387
388 mutex_enter(&throttle->mtx);
389 saved_timer = throttle->timer;
390 throttle->timer = NULL;
391 throttle->limit = 0;
392 cv_signal(&throttle->cv);
393 mutex_exit(&throttle->mtx);
394
395 if (saved_timer != NULL)
396 (void) untimeout(saved_timer);
397 }
398
399 static void
400 krrp_conn_throttle_cb(void *void_throttle)
401 {
402 krrp_throttle_t *throttle = void_throttle;
403
404 mutex_enter(&throttle->mtx);
405 if (throttle->limit == 0)
406 throttle->remains = SIZE_MAX;
407 else
408 throttle->remains = throttle->limit;
409
410 if (throttle->timer != NULL) {
411 throttle->timer = timeout(&krrp_conn_throttle_cb,
412 throttle, drv_usectohz(KRRP_THROTTLE_INTERVAL_US));
413 }
414
415 cv_signal(&throttle->cv);
416 mutex_exit(&throttle->mtx);
417 }
418
419 static void
420 krrp_conn_throttle(krrp_throttle_t *throttle, size_t send_sz)
421 {
422 if (throttle->limit == 0)
423 return;
424
425 mutex_enter(&throttle->mtx);
426
427 while (throttle->remains == 0 && throttle->limit != 0)
428 cv_wait(&throttle->cv, &throttle->mtx);
429
430 if (throttle->remains > send_sz)
431 throttle->remains -= send_sz;
432 else
433 throttle->remains = 0;
434
435 mutex_exit(&throttle->mtx);
436 }
437
438 /*
439 * This function is called on a krrp session that is either
440 * already running or not yet. Using the 'only_set' (2nd arg)
441 * the caller explicitly controls whether to just set
442 * the limiting rate, or (re)set the rate and start (or continue)
443 * throttling the traffic right away
444 */
445 void
446 krrp_conn_throttle_set(krrp_conn_t *conn, size_t new_limit,
447 boolean_t only_set)
448 {
449 boolean_t require_enable = B_FALSE;
450 krrp_throttle_t *throttle = &conn->throttle;
451
452 /*
453 * We update "remains" each 10ms, so need to
454 * calculate limit according to this logic
455 */
456
457 new_limit /= 100;
458
459 if (new_limit == 0)
460 krrp_conn_throttle_disable(throttle);
461 else {
462 mutex_enter(&throttle->mtx);
463
464 /*
465 * limit == 0 means that throttle-logic
466 * is not active.
467 */
468 if (throttle->limit == 0)
469 require_enable = B_TRUE;
470
471 throttle->limit = new_limit;
472 mutex_exit(&throttle->mtx);
473
474 if (require_enable && !only_set)
475 krrp_conn_throttle_enable(throttle);
476 }
477 }
478
479 static int
480 krrp_conn_connect(krrp_conn_t *conn, const char *host,
481 int port, int timeout, krrp_error_t *error)
482 {
483 int rc;
484 struct sockaddr_in servaddr;
485
486 VERIFY(host != NULL);
487 VERIFY(port > 0 && port < 65535);
488 VERIFY(timeout >= 5 && timeout <= 120);
489
490 (void) memset(&servaddr, 0, sizeof (servaddr));
491 servaddr.sin_family = AF_INET;
492 servaddr.sin_port = htons(port);
493 if (inet_pton(AF_INET, (char *)host, &servaddr.sin_addr) != 1) {
494 krrp_error_set(error, KRRP_ERRNO_ADDR, EINVAL);
495 return (-1);
496 }
497
498 rc = ksocket_socket(&conn->ks, AF_INET, SOCK_STREAM, 0,
499 KSOCKET_SLEEP, CRED());
500 if (rc != 0) {
501 krrp_error_set(error, KRRP_ERRNO_CREATEFAIL, rc);
502 return (-1);
503 }
504
505 rc = krrp_conn_connect_with_timeout(conn->ks,
506 (struct sockaddr *)&servaddr,
507 timeout, error);
508 if (rc != 0) {
509 (void) ksocket_close(conn->ks, CRED());
510 conn->ks = NULL;
511 }
512
513 return (rc);
514 }
515
516 static int
517 krrp_conn_connect_with_timeout(ksocket_t ks, struct sockaddr *servaddr,
518 int timeout, krrp_error_t *error)
519 {
520 int rc, nonblocking, rval = 0;
521 ksocket_callbacks_t ks_cb;
522 krrp_conn_connect_timeout_t ct;
523
524 nonblocking = 1;
525 rc = ksocket_ioctl(ks, FIONBIO, (intptr_t)&nonblocking,
526 &rval, CRED());
527 if (rc != 0) {
528 krrp_error_set(error, KRRP_ERRNO_SETSOCKOPTFAIL, rc);
529 goto out;
530 }
531
532 ks_cb.ksock_cb_flags = KSOCKET_CB_CONNECTED |
533 KSOCKET_CB_CONNECTFAILED | KSOCKET_CB_DISCONNECTED;
534 ks_cb.ksock_cb_connected = &krrp_conn_connect_cb;
535 ks_cb.ksock_cb_connectfailed = &krrp_conn_connect_cb;
536 ks_cb.ksock_cb_disconnected = &krrp_conn_connect_cb;
537
538 rc = ksocket_setcallbacks(ks, &ks_cb, &ct, CRED());
539 if (rc != 0) {
540 krrp_error_set(error, KRRP_ERRNO_SETSOCKOPTFAIL, rc);
541 goto out;
542 }
543
544 mutex_init(&ct.mtx, NULL, MUTEX_DEFAULT, NULL);
545 cv_init(&ct.cv, NULL, CV_DEFAULT, NULL);
546 ct.cb_done = B_FALSE;
547 ct.rc = 0;
548
549 rc = ksocket_connect(ks, servaddr, sizeof (*servaddr), CRED());
550 if (rc == 0 || rc == EISCONN) {
551 rc = 0;
552 goto cleanup;
553 }
554
555 if (rc != EINPROGRESS && rc != EALREADY) {
556 krrp_error_set(error, KRRP_ERRNO_CONNFAIL, rc);
557 goto cleanup;
558 }
559
560 mutex_enter(&ct.mtx);
561 if (!ct.cb_done)
562 (void) cv_reltimedwait_sig(&ct.cv, &ct.mtx,
563 SEC_TO_TICK(timeout), TR_CLOCK_TICK);
564
565 rc = ct.cb_done ? ct.rc : ETIMEDOUT;
566
567 if (rc != 0)
568 krrp_error_set(error, KRRP_ERRNO_CONNFAIL, rc);
569
570 mutex_exit(&ct.mtx);
571
572 cleanup:
573 nonblocking = 0;
574 (void) ksocket_ioctl(ks, FIONBIO, (intptr_t)&nonblocking,
575 &rval, CRED());
576
577 (void) ksocket_setcallbacks(ks, NULL, NULL, CRED());
578
579 cv_destroy(&ct.cv);
580 mutex_destroy(&ct.mtx);
581
582 out:
583 return (rc);
584 }
585
586 /* ARGSUSED */
587 void
588 krrp_conn_connect_cb(ksocket_t ks,
589 ksocket_callback_event_t ev, void *arg, uintptr_t info)
590 {
591 krrp_conn_connect_timeout_t *ct = arg;
592
593 VERIFY(ct != NULL);
594 VERIFY(ev == KSOCKET_EV_CONNECTED ||
595 ev == KSOCKET_EV_CONNECTFAILED ||
596 ev == KSOCKET_EV_DISCONNECTED);
597
598 mutex_enter(&ct->mtx);
599 ct->cb_done = B_TRUE;
600 if (ev == KSOCKET_EV_CONNECTED)
601 ct->rc = 0;
602 else
603 ct->rc = info == 0 ? ECONNRESET : (int)info;
604
605 cv_signal(&ct->cv);
606 mutex_exit(&ct->mtx);
607 }
608
609 static int
610 krrp_conn_post_configure(krrp_conn_t *conn, krrp_error_t *error)
611 {
612 struct so_snd_bufinfo snd_bufinfo;
613 uint32_t value;
614 int value_len;
615 int rc;
616
617 value = 1024 * 1024;
618 rc = ksocket_setsockopt(conn->ks, SOL_SOCKET, SO_SNDBUF,
619 (const void *) &value, sizeof (value), CRED());
620 if (rc != 0)
621 goto err_set;
622
623 value = 1024 * 1024;
624 rc = ksocket_setsockopt(conn->ks, SOL_SOCKET, SO_RCVBUF,
625 (const void *) &value, sizeof (value), CRED());
626 if (rc != 0)
627 goto err_set;
628
629 value = 1;
630 rc = ksocket_setsockopt(conn->ks, IPPROTO_TCP, TCP_NODELAY,
631 (const void *) &value, sizeof (value), CRED());
632 if (rc != 0)
633 goto err_set;
634
635 /* Do not allow to set it less 100 to exclude any side-effect */
636 value = krrp_tcp_abort_threshold > 100 ?
637 krrp_tcp_abort_threshold : KRRP_TCP_ABORT_THRESHOLD_DEFAULT;
638 rc = ksocket_setsockopt(conn->ks, IPPROTO_TCP, TCP_ABORT_THRESHOLD,
639 (const void *) &value, sizeof (value), CRED());
640 if (rc != 0)
641 goto err_set;
642
643 if (get_udatamodel() == DATAMODEL_NONE ||
644 get_udatamodel() == DATAMODEL_NATIVE) {
645 struct timeval tl;
646
647 tl.tv_sec = KRRP_RX_TIMEOUT;
648 tl.tv_usec = 0;
649
650 rc = ksocket_setsockopt(conn->ks, SOL_SOCKET, SO_RCVTIMEO,
651 &tl, sizeof (struct timeval), CRED());
652 } else {
653 struct timeval32 tl;
654
655 tl.tv_sec = KRRP_RX_TIMEOUT;
656 tl.tv_usec = 0;
657
658 rc = ksocket_setsockopt(conn->ks, SOL_SOCKET, SO_RCVTIMEO,
659 &tl, sizeof (struct timeval32), CRED());
660 }
661
662 if (rc != 0)
663 goto err_set;
664
665 value_len = sizeof (snd_bufinfo);
666 rc = ksocket_getsockopt(conn->ks, SOL_SOCKET, SO_SND_BUFINFO,
667 (void *)&snd_bufinfo, &value_len, CRED());
668 if (rc != 0)
669 goto err_get;
670
671 conn->mblk_wroff = (size_t)snd_bufinfo.sbi_wroff;
672 conn->mblk_tail_len = (size_t)snd_bufinfo.sbi_tail;
673
674 if (snd_bufinfo.sbi_maxblk == INFPSZ) {
675 /* LSO is enabled */
676 conn->blk_sz = snd_bufinfo.sbi_maxpsz;
677
678 /*
679 * kmem_alloc for allocations that are less 128k
680 * uses kmem_cache, otherwise some slow-path,
681 * so to exclude performance problems if LSO allows
682 * very big buffer the maximum block size is 128k
683 */
684 if (conn->blk_sz > 128 * 1024)
685 conn->blk_sz = 128 * 1024;
686 } else {
687 conn->blk_sz = snd_bufinfo.sbi_maxblk;
688 }
689
690 return (0);
691
692 err_set:
693 krrp_error_set(error, KRRP_ERRNO_SETSOCKOPTFAIL, rc);
694 return (-1);
695
696 err_get:
697 krrp_error_set(error, KRRP_ERRNO_GETSOCKOPTFAIL, rc);
698 return (-1);
699 }
700
701 static void
702 krrp_conn_tx_handler(void *void_conn)
703 {
704 krrp_conn_t *conn = void_conn;
705 krrp_pdu_t *pdu;
706 krrp_error_t error;
707 int rc = 0;
708
709 krrp_error_init(&error);
710
711 mutex_enter(&conn->mtx);
712
713 while (conn->tx_running) {
714 mutex_exit(&conn->mtx);
715
716 /*
717 * At the sender side TX path sends CTRL and DATA PDUs
718 */
719 if (conn->get_data_pdu_cb != NULL) {
720 pdu = krrp_queue_get_no_wait(conn->ctrl_tx_queue);
721
722 if (pdu == NULL) {
723 conn->get_data_pdu_cb(conn->get_data_pdu_cb_arg,
724 &pdu);
725 if (pdu == NULL) {
726 mutex_enter(&conn->mtx);
727 continue;
728 }
729
730 conn->cur_txg = ((krrp_pdu_data_t *)pdu)->txg;
731 }
732 } else {
733 pdu = krrp_queue_get(conn->ctrl_tx_queue);
734 if (pdu == NULL) {
735 mutex_enter(&conn->mtx);
736 continue;
737 }
738 }
739
740 rc = krrp_conn_tx_pdu(conn, pdu, &error);
741 krrp_pdu_rele(pdu);
742 mutex_enter(&conn->mtx);
743 if (rc != 0)
744 break;
745
746 conn->cur_txg = 0;
747 }
748
749 if (conn->state == KRRP_CS_DISCONNECTING)
750 (void) memset(&error, 0, sizeof (error));
751
752 if (error.krrp_errno != 0) {
753 conn->state = KRRP_CS_DISCONNECTING;
754 conn->rx_running = B_FALSE;
755 }
756
757 mutex_exit(&conn->mtx);
758
759 ksocket_rele(conn->ks);
760
761 if (error.krrp_errno != 0)
762 krrp_conn_callback(conn, KRRP_CONN_ERROR, &error);
763
764 thread_exit();
765 }
766
767 static int
768 krrp_conn_tx_pdu(krrp_conn_t *conn, krrp_pdu_t *pdu, krrp_error_t *error)
769 {
770 int rc;
771
772 #ifdef KRRP_CONN_DEBUG
773 cmn_err(CE_NOTE, "TX PDU-[%s], payload:[%u][%lu]",
774 (pdu->type == KRRP_PT_DATA ? "DATA" : "CTRL"),
775 pdu->hdr->payload_sz, pdu->cur_data_sz);
776 #endif
777
778 rc = krrp_conn_tx(conn->ks, pdu->hdr,
779 sizeof (krrp_hdr_t), error);
780 if (rc != 0)
781 return (rc);
782
783 switch (pdu->type) {
784 case KRRP_PT_DATA:
785 conn->bytes_tx += sizeof (krrp_hdr_t);
786 rc = krrp_conn_tx_data_pdu_dblk(conn, &pdu->dblk, error);
787 break;
788 case KRRP_PT_CTRL:
789 conn->bytes_tx += sizeof (krrp_hdr_t);
790 rc = krrp_conn_tx_ctrl_pdu_dblk(conn, pdu->dblk, error);
791 break;
792 }
793
794 return (rc);
795 }
796
797 static int
798 krrp_conn_tx(ksocket_t ks, void *buff, size_t buff_sz, krrp_error_t *error)
799 {
800 int rc = 0;
801 size_t sent = 0, remains, offset = 0;
802
803 remains = buff_sz;
804 while (remains > 0) {
805 rc = ksocket_send(ks, (void *)(((uintptr_t)buff) + offset),
806 remains, 0, &sent, CRED());
807 if (rc != 0) {
808 krrp_error_set(error, KRRP_ERRNO_SENDFAIL, rc);
809 break;
810 }
811
812 remains -= sent;
813 offset += sent;
814 sent = 0;
815 }
816
817 return (rc);
818 }
819
820 static int
821 krrp_conn_tx_mblk(ksocket_t ks, mblk_t *mp, krrp_error_t *error)
822 {
823 int rc;
824 struct nmsghdr msghdr;
825
826 msghdr.msg_name = NULL;
827 msghdr.msg_namelen = 0;
828 msghdr.msg_control = NULL;
829 msghdr.msg_controllen = 0;
830 msghdr.msg_flags = MSG_EOR;
831
832 rc = ksocket_sendmblk(ks, &msghdr, 0, &mp, CRED());
833 if (rc != 0) {
834 krrp_error_set(error, KRRP_ERRNO_SENDMBLKFAIL, rc);
835 if (mp != NULL)
836 freeb(mp);
837 }
838
839 return (rc);
840 }
841
842 static int
843 krrp_conn_tx_ctrl_pdu_dblk(krrp_conn_t *conn, krrp_dblk_t *dblk,
844 krrp_error_t *error)
845 {
846 while (dblk != NULL) {
847 int rc;
848
849 rc = krrp_conn_tx(conn->ks, dblk->data,
850 dblk->cur_data_sz, error);
851 if (rc != 0)
852 return (rc);
853
854 conn->bytes_tx += dblk->cur_data_sz;
855 dblk = dblk->next;
856 }
857
858 return (0);
859 }
860
861 static int
862 krrp_conn_tx_data_pdu_dblk(krrp_conn_t *conn, krrp_dblk_t **dblk,
863 krrp_error_t *error)
864 {
865 krrp_dblk_t *dblk_cur;
866
867 dblk_cur = *dblk;
868 while (dblk_cur != NULL && dblk_cur->cur_data_sz != 0) {
869 mblk_t *mp;
870 int rc;
871
872 *dblk = dblk_cur->next;
873 dblk_cur->next = NULL;
874
875 mp = krrp_conn_dblk_to_mblk(dblk_cur,
876 conn->mblk_wroff, 0);
877 if (mp == NULL) {
878 krrp_error_set(error, KRRP_ERRNO_NOMEM, 0);
879 return (ENOMEM);
880 }
881
882 krrp_conn_throttle(&conn->throttle, dblk_cur->cur_data_sz);
883
884 conn->bytes_tx += dblk_cur->cur_data_sz;
885 rc = krrp_conn_tx_mblk(conn->ks, mp, error);
886 if (rc != 0) {
887 return (rc);
888 }
889
890 dblk_cur = *dblk;
891 }
892
893 return (0);
894 }
895
896 /* ARGSUSED */
897 static mblk_t *
898 krrp_conn_dblk_to_mblk(krrp_dblk_t *dblk,
899 size_t wroff, size_t tail_len)
900 {
901 mblk_t *mp;
902
903 mp = desballoc(dblk->head, dblk->total_sz, 0, &dblk->free_rtns);
904 if (mp != NULL) {
905 mp->b_rptr += wroff;
906 mp->b_wptr = mp->b_rptr + dblk->cur_data_sz;
907 }
908
909 return (mp);
910 }
911
912 static void
913 krrp_conn_rx_handler(void *void_conn)
914 {
915 krrp_conn_t *conn = void_conn;
916
917 int rc;
918 krrp_pdu_t *pdu = NULL;
919 krrp_hdr_t *hdr = NULL;
920 krrp_error_t error;
921
922 krrp_error_init(&error);
923
924 mutex_enter(&conn->mtx);
925
926 while (conn->rx_running) {
927 mutex_exit(&conn->mtx);
928
929 if (hdr == NULL) {
930 if (krrp_conn_rx_header(conn, &hdr, &error) != 0) {
931 mutex_enter(&conn->mtx);
932 conn->rx_running = B_FALSE;
933 continue;
934 }
935 }
936
937 #ifdef KRRP_CONN_DEBUG
938 cmn_err(CE_NOTE, "HDR: opcode:[%u]; flags:[%u]; "
939 "payload_sz:[%u]",
940 hdr->opcode, hdr->flags, hdr->payload_sz);
941 #endif
942
943 if (hdr->opcode & KRRP_CTRL_OPCODE_MASK)
944 krrp_pdu_ctrl_alloc((krrp_pdu_ctrl_t **)&pdu,
945 KRRP_PDU_WITHOUT_HDR);
946 else if (conn->data_pdu_engine != NULL) {
947 krrp_pdu_alloc(conn->data_pdu_engine, &pdu,
948 KRRP_PDU_WITHOUT_HDR);
949 conn->cur_txg = ((krrp_hdr_data_t *)hdr)->txg;
950 } else {
951 /*
952 * This thread is not used at initial stage,
953 * so at the running stage DataPDUEngine must be defined
954 */
955 cmn_err(CE_PANIC, "Data PDU Engined is not defined");
956 }
957
958 if (pdu == NULL) {
959 mutex_enter(&conn->mtx);
960 continue;
961 }
962
963 pdu->hdr = hdr;
964 hdr = NULL;
965
966 rc = krrp_conn_rx_pdu(conn, pdu, &error);
967 if (rc == 0) {
968 krrp_conn_process_received_pdu(conn, pdu);
969 pdu = NULL;
970 conn->cur_txg = 0;
971 }
972
973 mutex_enter(&conn->mtx);
974 if (rc != 0)
975 break;
976 }
977
978 if (conn->state == KRRP_CS_DISCONNECTING)
979 (void) memset(&error, 0, sizeof (error));
980
981 if (error.krrp_errno != 0) {
982 conn->state = KRRP_CS_DISCONNECTING;
983 conn->tx_running = B_FALSE;
984 }
985
986 mutex_exit(&conn->mtx);
987
988 if (hdr != NULL)
989 kmem_free(hdr, sizeof (krrp_hdr_t));
990
991 if (pdu != NULL)
992 krrp_pdu_rele(pdu);
993
994 if (error.krrp_errno != 0)
995 krrp_conn_callback(conn, KRRP_CONN_ERROR, &error);
996
997 ksocket_rele(conn->ks);
998
999 thread_exit();
1000 }
1001
1002 static int
1003 krrp_conn_rx_header(krrp_conn_t *conn, krrp_hdr_t **result_hdr,
1004 krrp_error_t *error)
1005 {
1006 krrp_hdr_t *hdr;
1007 int rc;
1008
1009 hdr = kmem_zalloc(sizeof (krrp_hdr_t), KM_SLEEP);
1010 rc = krrp_conn_rx(conn->ks, hdr, sizeof (krrp_hdr_t), error);
1011 if (rc == 0) {
1012 conn->bytes_rx += sizeof (krrp_hdr_t);
1013 *result_hdr = hdr;
1014 return (0);
1015 }
1016
1017 kmem_free(hdr, sizeof (krrp_hdr_t));
1018 return (rc);
1019 }
1020
1021 static int
1022 krrp_conn_rx_pdu(krrp_conn_t *conn, krrp_pdu_t *pdu, krrp_error_t *error)
1023 {
1024 krrp_dblk_t *dblk;
1025 size_t remaining_sz;
1026 size_t cnt = 0;
1027
1028 if (pdu->hdr->payload_sz > pdu->max_data_sz) {
1029 krrp_error_set(error, KRRP_ERRNO_BIGPAYLOAD, 0);
1030 return (-1);
1031 }
1032
1033 remaining_sz = pdu->hdr->payload_sz;
1034 dblk = pdu->dblk;
1035 while (remaining_sz != 0) {
1036 int rc;
1037 size_t need_to_recv;
1038
1039 /* Something wrong in our PDU Engine */
1040 ASSERT(dblk != NULL);
1041
1042 if (remaining_sz > dblk->max_data_sz)
1043 need_to_recv = dblk->max_data_sz;
1044 else
1045 need_to_recv = remaining_sz;
1046
1047 #ifdef KRRP_CONN_DEBUG
1048 cmn_err(CE_NOTE, "RX dblk #[%lu] [%lu]",
1049 cnt, remaining_sz);
1050 #endif
1051
1052 rc = krrp_conn_rx(conn->ks, dblk->data, need_to_recv, error);
1053 if (rc != 0)
1054 return (rc);
1055
1056 conn->bytes_rx += need_to_recv;
1057 dblk->cur_data_sz = need_to_recv;
1058 remaining_sz -= need_to_recv;
1059 dblk = dblk->next;
1060 cnt++;
1061 }
1062
1063 pdu->cur_data_sz = pdu->hdr->payload_sz;
1064
1065 return (0);
1066 }
1067
1068 static int
1069 krrp_conn_rx(ksocket_t ks, void *buff, size_t buff_sz, krrp_error_t *error)
1070 {
1071 int rc;
1072 size_t received = 0;
1073
1074 rc = ksocket_recv(ks, buff, buff_sz, MSG_WAITALL,
1075 &received, CRED());
1076
1077 if ((rc != 0) || (received != buff_sz)) {
1078 if (rc == 0) {
1079 if (received == 0)
1080 krrp_error_set(error, KRRP_ERRNO_UNEXPCLOSE, 0);
1081 else
1082 krrp_error_set(error, KRRP_ERRNO_UNEXPEND, 0);
1083 } else {
1084 if (rc == EAGAIN)
1085 rc = ETIMEDOUT;
1086
1087 krrp_error_set(error, KRRP_ERRNO_RECVFAIL, rc);
1088 }
1089
1090 rc = -1;
1091 }
1092
1093 return (rc);
1094 }
1095
1096 static void
1097 krrp_conn_process_received_pdu(krrp_conn_t *conn, krrp_pdu_t *pdu)
1098 {
1099 krrp_conn_cb_ev_t ev;
1100
1101 if (krrp_pdu_type(pdu) == KRRP_PT_DATA)
1102 ev = KRRP_CONN_DATA_PDU;
1103 else
1104 ev = KRRP_CONN_CTRL_PDU;
1105
1106 krrp_conn_callback(conn, ev, pdu);
1107 }