23 #include <assert.h>
24 #include <umem.h>
25 #include <errno.h>
26 #include <strings.h>
27 #include <unistd.h>
28 #include <stddef.h>
29 #include <sys/uio.h>
30 #include <sys/debug.h>
31
32 #include <libvarpd_svp.h>
33
34 int svp_conn_query_timeout = 30;
35 static int svp_conn_backoff_tbl[] = { 1, 2, 4, 8, 16, 32 };
36 static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int);
37
38 typedef enum svp_conn_act {
39 SVP_RA_NONE = 0x00,
40 SVP_RA_DEGRADE = 0x01,
41 SVP_RA_RESTORE = 0x02,
42 SVP_RA_ERROR = 0x03,
43 SVP_RA_CLEANUP = 0x04
44 } svp_conn_act_t;
45
46 static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
47
48 static void
49 svp_conn_inject(svp_conn_t *scp)
50 {
51 int ret;
52 assert(MUTEX_HELD(&scp->sc_lock));
53
54 if (scp->sc_flags & SVP_CF_USER)
55 return;
56 scp->sc_flags |= SVP_CF_USER;
57 if ((ret = svp_event_inject(&scp->sc_event)) != 0)
58 libvarpd_panic("failed to inject event: %d\n", ret);
59 }
60
61 static void
62 svp_conn_degrade(svp_conn_t *scp)
63 {
75 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
76 }
77
78 static void
79 svp_conn_restore(svp_conn_t *scp)
80 {
81 svp_remote_t *srp = scp->sc_remote;
82
83 assert(MUTEX_HELD(&srp->sr_lock));
84 assert(MUTEX_HELD(&scp->sc_lock));
85
86 if (!(scp->sc_flags & SVP_CF_DEGRADED))
87 return;
88
89 scp->sc_flags &= ~SVP_CF_DEGRADED;
90 if (srp->sr_ndconns == srp->sr_tconns)
91 svp_remote_restore(srp, SVP_RD_REMOTE_FAIL);
92 srp->sr_ndconns--;
93 }
94
95 static void
96 svp_conn_add(svp_conn_t *scp)
97 {
98 svp_remote_t *srp = scp->sc_remote;
99
100 assert(MUTEX_HELD(&srp->sr_lock));
101 assert(MUTEX_HELD(&scp->sc_lock));
102
103 if (scp->sc_flags & SVP_CF_ADDED)
104 return;
105
106 list_insert_tail(&srp->sr_conns, scp);
107 scp->sc_flags |= SVP_CF_ADDED;
108 srp->sr_tconns++;
109 }
110
111 static void
112 svp_conn_remove(svp_conn_t *scp)
113 {
114 svp_remote_t *srp = scp->sc_remote;
115
152 libvarpd_panic("failed to close socket %d: %d\n",
153 scp->sc_socket, errno);
154 scp->sc_socket = -1;
155
156 scp->sc_cstate = SVP_CS_BACKOFF;
157 scp->sc_nbackoff++;
158 if (scp->sc_nbackoff >= svp_conn_nbackoff) {
159 scp->sc_btimer.st_value =
160 svp_conn_backoff_tbl[svp_conn_nbackoff - 1];
161 } else {
162 scp->sc_btimer.st_value =
163 svp_conn_backoff_tbl[scp->sc_nbackoff - 1];
164 }
165 svp_timer_add(&scp->sc_btimer);
166
167 if (scp->sc_nbackoff > svp_conn_nbackoff)
168 return (SVP_RA_DEGRADE);
169 return (SVP_RA_NONE);
170 }
171
172 /*
173 * Think of this as an extension to the connect() call in svp_conn_connect().
174 * Send a message, receive it, and set the version here. If the response is
175 * too slow or the socket throws an error, indicate a socket error, which
176 * will cause the caller to backoff (i.e. close the socket and try again).
177 *
178 * Version mismatch (corrupt SVP server or too-advanced SVP server) is its
179 * own error type.
180 */
181 static svp_conn_error_t
182 svp_conn_version_set(svp_conn_t *scp)
183 {
184 svp_req_t ping;
185 ssize_t ret;
186 uint32_t save_crc;
187 uint16_t peer_version;
188 int ntries = 3; /* One second between tries. 3secs should be enough. */
189
190 ping.svp_ver = htons(SVP_CURRENT_VERSION);
191 ping.svp_op = htons(SVP_R_PING);
192 ping.svp_size = 0; /* Header-only... */
193 ping.svp_id = 0;
194 /* 0-length data... just use the req buffer for the pointer. */
195 svp_query_crc32(&ping, &ping, 0);
196
197 ret = write(scp->sc_socket, &ping, sizeof (ping));
198 if (ret == -1) {
199 /*
200 * A failed write() call right after connect probably
201 * indicates a larger connection failure. Restart the
202 * connection from scratch.
203 */
204 return (SVP_CE_SOCKET);
205 }
206 assert(ret == sizeof (ping));
207 do {
208 /*
209 * Asynch read. We may loop here once or twice.
210 * Wait a bit, but don't loop too many times...
211 */
212 (void) sleep(1);
213 ret = read(scp->sc_socket, &ping, sizeof (ping));
214 } while (--ntries > 0 &&
215 ret == -1 && (errno == EINTR || errno == EAGAIN));
216 if (ret == -1) {
217 /*
218 * This is actually a failed read() call. Restart the
219 * connection from scratch.
220 */
221 return (SVP_CE_SOCKET);
222 }
223
224 save_crc = ping.svp_crc32;
225 svp_query_crc32(&ping, &ping, 0);
226 peer_version = htons(ping.svp_ver);
227 if (ping.svp_op != htons(SVP_R_PONG) ||
228 ping.svp_size != 0 || ping.svp_id != 0 ||
229 ping.svp_crc32 != save_crc ||
230 peer_version == 0 || peer_version > SVP_CURRENT_VERSION) {
231 return (SVP_CE_VERSION_PONG);
232 }
233
234 /* This connection now has a version! */
235 scp->sc_version = peer_version;
236 return (SVP_CE_NONE);
237 }
238
239 static svp_conn_act_t
240 svp_conn_connect(svp_conn_t *scp)
241 {
242 int ret;
243 struct sockaddr_in6 in6;
244
245 assert(MUTEX_HELD(&scp->sc_lock));
246 assert(scp->sc_cstate == SVP_CS_BACKOFF ||
247 scp->sc_cstate == SVP_CS_INITIAL);
248 assert(scp->sc_socket == -1);
249 if (scp->sc_cstate == SVP_CS_INITIAL)
250 scp->sc_nbackoff = 0;
251
252 /* New connect means we need to know the version. */
253 scp->sc_version = 0;
254
255 scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
256 if (scp->sc_socket == -1) {
257 scp->sc_error = SVP_CE_SOCKET;
258 scp->sc_errno = errno;
307 } else {
308 /*
309 * This call failed, which means that we obtained one of
310 * the following:
311 *
312 * EADDRNOTAVAIL
313 * ECONNREFUSED
314 * EIO
315 * ENETUNREACH
316 * EHOSTUNREACH
317 * ENXIO
318 * ETIMEDOUT
319 *
320 * Therefore we need to set ourselves into backoff and
321 * wait for that to clear up.
322 */
323 return (svp_conn_backoff(scp));
324 }
325 }
326
327 return (svp_conn_poll_connect(NULL, scp));
328 }
329
330 /*
331 * This should be the first call we get after a successful synchronous
332 * connect, or a completed (failed or successful) asynchronous connect. A
333 * non-NULL port-event indicates asynchronous completion, a NULL port-event
334 * indicates a successful synchronous connect.
335 *
336 * If we have successfully connected, we should see a writeable event. In the
337 * asynchronous case, we may also see an error or a hang up. For either hang
338 * up or error, we transition to error mode. If there is also a readable event
339 * (i.e. incoming data), we ignore it at the moment and just let a
340 * reassociation pick it up so we can simplify the set of state transitions
341 * that we have.
342 */
343 static svp_conn_act_t
344 svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
345 {
346 int ret;
353 /*
354 * These bits only matter if we're notified of an
355 * asynchronous connection completion.
356 */
357 if (!(pe->portev_events & POLLOUT)) {
358 scp->sc_errno = 0;
359 scp->sc_error = SVP_CE_NOPOLLOUT;
360 scp->sc_cstate = SVP_CS_ERROR;
361 return (SVP_RA_DEGRADE);
362 }
363
364 ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err,
365 &sl);
366 if (ret != 0)
367 libvarpd_panic("unanticipated getsockopt error");
368 if (err != 0) {
369 return (svp_conn_backoff(scp));
370 }
371 }
372
373 /* Use a single SVP_R_PING to determine the version. */
374 version_error = svp_conn_version_set(scp);
375 switch (version_error) {
376 case SVP_CE_SOCKET:
377 /* Use this to signal read/write errors... */
378 return (svp_conn_backoff(scp));
379 case SVP_CE_NONE:
380 assert(scp->sc_version > 0 &&
381 scp->sc_version <= SVP_CURRENT_VERSION);
382 break;
383 default:
384 scp->sc_error = version_error;
385 scp->sc_cstate = SVP_CS_ERROR;
386 scp->sc_errno = EPROTONOSUPPORT; /* Protocol error... */
387 return (SVP_RA_DEGRADE);
388 }
389
390 scp->sc_cstate = SVP_CS_ACTIVE;
391 scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
392 ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
393 if (ret == 0)
394 return (SVP_RA_RESTORE);
395 scp->sc_error = SVP_CE_ASSOCIATE;
396 scp->sc_errno = ret;
397 scp->sc_cstate = SVP_CS_ERROR;
398 return (SVP_RA_DEGRADE);
399 }
400
401 static svp_conn_act_t
402 svp_conn_pollout(svp_conn_t *scp)
403 {
404 svp_query_t *sqp;
405 svp_req_t *req;
406 size_t off;
407 struct iovec iov[2];
408 int nvecs = 0;
409 ssize_t ret;
410
411 assert(MUTEX_HELD(&scp->sc_lock));
412
413 /*
414 * We need to find a query and start writing it out.
415 */
416 if (scp->sc_output.sco_query == NULL) {
417 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
418 sqp = list_next(&scp->sc_queries, sqp)) {
463 default:
464 libvarpd_panic("unexpected errno: %d", errno);
465 }
466 }
467
468 sqp->sq_acttime = gethrtime();
469 scp->sc_output.sco_offset += ret;
470 if (ret >= sizeof (svp_req_t) + sqp->sq_rsize) {
471 sqp->sq_state = SVP_QUERY_READING;
472 scp->sc_output.sco_query = NULL;
473 scp->sc_output.sco_offset = 0;
474 scp->sc_event.se_events |= POLLOUT;
475 }
476 return (SVP_RA_NONE);
477 }
478
479 static boolean_t
480 svp_conn_pollin_validate(svp_conn_t *scp)
481 {
482 svp_query_t *sqp;
483 uint32_t nsize;
484 uint16_t nvers, nop;
485 svp_req_t *resp = &scp->sc_input.sci_req;
486
487 assert(MUTEX_HELD(&scp->sc_lock));
488
489 nvers = ntohs(resp->svp_ver);
490 nop = ntohs(resp->svp_op);
491 nsize = ntohl(resp->svp_size);
492
493 /*
494 * A peer that's messing with post-connection version changes is
495 * likely a broken peer.
496 */
497 if (nvers != scp->sc_version) {
498 (void) bunyan_warn(svp_bunyan, "version mismatch",
499 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
500 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
501 BUNYAN_T_INT32, "peer version", nvers,
502 BUNYAN_T_INT32, "our version", scp->sc_version,
503 BUNYAN_T_INT32, "operation", nop,
504 BUNYAN_T_INT32, "response_id", resp->svp_id,
505 BUNYAN_T_END);
506 return (B_FALSE);
507 }
508
509 if (nop != SVP_R_VL2_ACK && nop != SVP_R_VL3_ACK &&
510 nop != SVP_R_LOG_ACK && nop != SVP_R_LOG_RM_ACK) {
511 (void) bunyan_warn(svp_bunyan, "unsupported operation",
512 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
513 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
514 BUNYAN_T_INT32, "version", nvers,
515 BUNYAN_T_INT32, "operation", nop,
516 BUNYAN_T_INT32, "response_id", resp->svp_id,
517 BUNYAN_T_END);
518 return (B_FALSE);
519 }
520
521 sqp = svp_conn_query_find(scp, resp->svp_id);
522 if (sqp == NULL) {
523 (void) bunyan_warn(svp_bunyan, "unknown response id",
524 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
525 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
526 BUNYAN_T_INT32, "version", nvers,
527 BUNYAN_T_INT32, "operation", nop,
528 BUNYAN_T_INT32, "response_id", resp->svp_id,
529 BUNYAN_T_END);
530 return (B_FALSE);
531 }
532
533 if (sqp->sq_state != SVP_QUERY_READING) {
534 (void) bunyan_warn(svp_bunyan,
535 "got response for unexpecting query",
536 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
537 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
538 BUNYAN_T_INT32, "version", nvers,
539 BUNYAN_T_INT32, "operation", nop,
540 BUNYAN_T_INT32, "response_id", resp->svp_id,
541 BUNYAN_T_INT32, "query_state", sqp->sq_state,
542 BUNYAN_T_END);
543 return (B_FALSE);
544 }
545
546 if ((nop == SVP_R_VL2_ACK && nsize != sizeof (svp_vl2_ack_t)) ||
547 (nop == SVP_R_VL3_ACK && nsize != sizeof (svp_vl3_ack_t)) ||
548 (nop == SVP_R_LOG_RM_ACK && nsize != sizeof (svp_lrm_ack_t))) {
549 (void) bunyan_warn(svp_bunyan, "response size too large",
550 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
551 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
552 BUNYAN_T_INT32, "version", nvers,
553 BUNYAN_T_INT32, "operation", nop,
554 BUNYAN_T_INT32, "response_id", resp->svp_id,
555 BUNYAN_T_INT32, "response_size", nsize,
556 BUNYAN_T_INT32, "expected_size", nop == SVP_R_VL2_ACK ?
557 sizeof (svp_vl2_ack_t) : sizeof (svp_vl3_ack_t),
558 BUNYAN_T_INT32, "query_state", sqp->sq_state,
559 BUNYAN_T_END);
560 return (B_FALSE);
561 }
562
563 /*
564 * The valid size is anything <= to what the user requested, but at
565 * least svp_log_ack_t bytes large.
566 */
567 if (nop == SVP_R_LOG_ACK) {
568 const char *msg = NULL;
573 if (msg != NULL) {
574 (void) bunyan_warn(svp_bunyan, msg,
575 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
576 BUNYAN_T_INT32, "remote_port",
577 scp->sc_remote->sr_rport,
578 BUNYAN_T_INT32, "version", nvers,
579 BUNYAN_T_INT32, "operation", nop,
580 BUNYAN_T_INT32, "response_id", resp->svp_id,
581 BUNYAN_T_INT32, "response_size", nsize,
582 BUNYAN_T_INT32, "expected_size",
583 ((svp_log_req_t *)sqp->sq_rdata)->svlr_count,
584 BUNYAN_T_INT32, "query_state", sqp->sq_state,
585 BUNYAN_T_END);
586 return (B_FALSE);
587 }
588 }
589
590 sqp->sq_size = nsize;
591 scp->sc_input.sci_query = sqp;
592 if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK ||
593 nop == SVP_R_LOG_RM_ACK) {
594 sqp->sq_wdata = &sqp->sq_wdun;
595 sqp->sq_wsize = sizeof (svp_query_data_t);
596 } else {
597 VERIFY(nop == SVP_R_LOG_ACK);
598 assert(sqp->sq_wdata != NULL);
599 assert(sqp->sq_wsize != 0);
600 }
601
602 return (B_TRUE);
603 }
604
605 static svp_conn_act_t
606 svp_conn_pollin(svp_conn_t *scp)
607 {
608 size_t off, total;
609 ssize_t ret;
610 svp_query_t *sqp;
611 uint32_t crc;
612 uint16_t nop;
613
663 sqp->sq_acttime = gethrtime();
664 total = ntohl(scp->sc_input.sci_req.svp_size);
665 do {
666 ret = read(scp->sc_socket,
667 (void *)((uintptr_t)sqp->sq_wdata + off),
668 total - off);
669 } while (ret == -1 && errno == EINTR);
670
671 if (ret == -1) {
672 switch (errno) {
673 case EAGAIN:
674 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
675 return (SVP_RA_NONE);
676 case EIO:
677 case ECONNRESET:
678 return (SVP_RA_ERROR);
679 break;
680 default:
681 libvarpd_panic("unexpeted read errno: %d", errno);
682 }
683 } else if (ret == 0) {
684 /* Try to reconnect to the remote host */
685 return (SVP_RA_ERROR);
686 }
687
688 if (ret + off < total) {
689 scp->sc_input.sci_offset += ret;
690 return (SVP_RA_NONE);
691 }
692
693 nop = ntohs(scp->sc_input.sci_req.svp_op);
694 crc = scp->sc_input.sci_req.svp_crc32;
695 svp_query_crc32(&scp->sc_input.sci_req, sqp->sq_wdata, total);
696 if (crc != scp->sc_input.sci_req.svp_crc32) {
697 (void) bunyan_info(svp_bunyan, "crc32 mismatch",
698 BUNYAN_T_IP, "remote ip", &scp->sc_addr,
699 BUNYAN_T_INT32, "remote port", scp->sc_remote->sr_rport,
700 BUNYAN_T_INT32, "version",
701 ntohs(scp->sc_input.sci_req.svp_ver),
702 BUNYAN_T_INT32, "operation", nop,
703 BUNYAN_T_INT32, "response id",
707 BUNYAN_T_UINT32, "calc_crc",
708 ntohl(scp->sc_input.sci_req.svp_crc32),
709 BUNYAN_T_END);
710 return (SVP_RA_ERROR);
711 }
712 scp->sc_input.sci_query = NULL;
713 scp->sc_input.sci_offset = 0;
714
715 if (nop == SVP_R_VL2_ACK) {
716 svp_vl2_ack_t *sl2a = sqp->sq_wdata;
717 sqp->sq_status = ntohl(sl2a->sl2a_status);
718 } else if (nop == SVP_R_VL3_ACK) {
719 svp_vl3_ack_t *sl3a = sqp->sq_wdata;
720 sqp->sq_status = ntohl(sl3a->sl3a_status);
721 } else if (nop == SVP_R_LOG_ACK) {
722 svp_log_ack_t *svla = sqp->sq_wdata;
723 sqp->sq_status = ntohl(svla->svla_status);
724 } else if (nop == SVP_R_LOG_RM_ACK) {
725 svp_lrm_ack_t *svra = sqp->sq_wdata;
726 sqp->sq_status = ntohl(svra->svra_status);
727 } else {
728 libvarpd_panic("unhandled nop: %d", nop);
729 }
730
731 list_remove(&scp->sc_queries, sqp);
732 mutex_exit(&scp->sc_lock);
733
734 /*
735 * We have to release all of our resources associated with this entry
736 * before we call the callback. After we call it, the memory will be
737 * lost to time.
738 */
739 svp_query_release(sqp);
740 sqp->sq_func(sqp, sqp->sq_arg);
741 mutex_enter(&scp->sc_lock);
742 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
743
744 return (SVP_RA_NONE);
745 }
746
818 return;
819 }
820
821 /* Check if this needs to be reset */
822 if (scp->sc_flags & SVP_CF_TEARDOWN) {
823 /* Make sure any other users of this are disassociated */
824 ret = SVP_RA_ERROR;
825 goto out;
826 }
827
828 switch (scp->sc_cstate) {
829 case SVP_CS_INITIAL:
830 case SVP_CS_BACKOFF:
831 assert(pe == NULL);
832 ret = svp_conn_connect(scp);
833 break;
834 case SVP_CS_CONNECTING:
835 assert(pe != NULL);
836 ret = svp_conn_poll_connect(pe, scp);
837 break;
838 case SVP_CS_ACTIVE:
839 case SVP_CS_WINDDOWN:
840 assert(pe != NULL);
841 oldstate = scp->sc_cstate;
842 if (pe->portev_events & POLLOUT)
843 ret = svp_conn_pollout(scp);
844 if (ret == SVP_RA_NONE && (pe->portev_events & POLLIN))
845 ret = svp_conn_pollin(scp);
846
847 if (oldstate == SVP_CS_WINDDOWN &&
848 (list_is_empty(&scp->sc_queries) || ret != SVP_RA_NONE)) {
849 ret = SVP_RA_CLEANUP;
850 }
851
852 if (ret == SVP_RA_NONE) {
853 int err;
854 if ((err = svp_event_associate(&scp->sc_event,
855 scp->sc_socket)) != 0) {
856 scp->sc_error = SVP_CE_ASSOCIATE;
857 scp->sc_errno = err;
858 scp->sc_cstate = SVP_CS_ERROR;
859 ret = SVP_RA_DEGRADE;
860 }
861 }
862 break;
863 default:
864 libvarpd_panic("svp_conn_handler encountered unexpected "
865 "state: %d", scp->sc_cstate);
866 }
867 out:
868 mutex_exit(&scp->sc_lock);
869
870 if (ret == SVP_RA_NONE)
871 return;
872
873 mutex_enter(&srp->sr_lock);
874 mutex_enter(&scp->sc_lock);
875 if (ret == SVP_RA_ERROR)
876 ret = svp_conn_reset(scp);
877
878 if (ret == SVP_RA_DEGRADE)
879 svp_conn_degrade(scp);
880 if (ret == SVP_RA_RESTORE)
881 svp_conn_restore(scp);
882
883 if (ret == SVP_RA_CLEANUP) {
884 svp_conn_remove(scp);
885 scp->sc_flags |= SVP_CF_REAP;
886 svp_conn_inject(scp);
887 }
888 mutex_exit(&scp->sc_lock);
889 mutex_exit(&srp->sr_lock);
890 }
891
892 static void
893 svp_conn_backtimer(void *arg)
894 {
1096 "associated: %d", ret);
1097 }
1098 mutex_exit(&scp->sc_lock);
1099
1100 /* Verify our timers are killed */
1101 svp_timer_remove(&scp->sc_btimer);
1102 svp_timer_remove(&scp->sc_qtimer);
1103
1104 if (scp->sc_socket != -1 && close(scp->sc_socket) != 0)
1105 libvarpd_panic("failed to close svp_conn_t`scp_socket fd "
1106 "%d: %d", scp->sc_socket, errno);
1107
1108 list_destroy(&scp->sc_queries);
1109 umem_free(scp, sizeof (svp_conn_t));
1110 }
1111
1112 void
1113 svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp)
1114 {
1115 assert(MUTEX_HELD(&scp->sc_lock));
1116 assert(scp->sc_cstate == SVP_CS_ACTIVE);
1117
1118 sqp->sq_acttime = -1;
1119 list_insert_tail(&scp->sc_queries, sqp);
1120 if (!(scp->sc_event.se_events & POLLOUT)) {
1121 scp->sc_event.se_events |= POLLOUT;
1122 /*
1123 * If this becomes frequent, we should instead give up on this
1124 * set of connections instead of aborting.
1125 */
1126 if (svp_event_associate(&scp->sc_event, scp->sc_socket) != 0)
1127 libvarpd_panic("svp_event_associate failed somehow");
1128 }
1129 }
|
23 #include <assert.h>
24 #include <umem.h>
25 #include <errno.h>
26 #include <strings.h>
27 #include <unistd.h>
28 #include <stddef.h>
29 #include <sys/uio.h>
30 #include <sys/debug.h>
31
32 #include <libvarpd_svp.h>
33
34 int svp_conn_query_timeout = 30;
35 static int svp_conn_backoff_tbl[] = { 1, 2, 4, 8, 16, 32 };
36 static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int);
37
38 typedef enum svp_conn_act {
39 SVP_RA_NONE = 0x00,
40 SVP_RA_DEGRADE = 0x01,
41 SVP_RA_RESTORE = 0x02,
42 SVP_RA_ERROR = 0x03,
43 SVP_RA_CLEANUP = 0x04,
44 SVP_RA_FIND_VERSION = 0x05
45 } svp_conn_act_t;
46
47 static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
48
49 static void
50 svp_conn_inject(svp_conn_t *scp)
51 {
52 int ret;
53 assert(MUTEX_HELD(&scp->sc_lock));
54
55 if (scp->sc_flags & SVP_CF_USER)
56 return;
57 scp->sc_flags |= SVP_CF_USER;
58 if ((ret = svp_event_inject(&scp->sc_event)) != 0)
59 libvarpd_panic("failed to inject event: %d\n", ret);
60 }
61
62 static void
63 svp_conn_degrade(svp_conn_t *scp)
64 {
76 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
77 }
78
79 static void
80 svp_conn_restore(svp_conn_t *scp)
81 {
82 svp_remote_t *srp = scp->sc_remote;
83
84 assert(MUTEX_HELD(&srp->sr_lock));
85 assert(MUTEX_HELD(&scp->sc_lock));
86
87 if (!(scp->sc_flags & SVP_CF_DEGRADED))
88 return;
89
90 scp->sc_flags &= ~SVP_CF_DEGRADED;
91 if (srp->sr_ndconns == srp->sr_tconns)
92 svp_remote_restore(srp, SVP_RD_REMOTE_FAIL);
93 srp->sr_ndconns--;
94 }
95
96 static svp_conn_act_t
97 svp_conn_pong_handler(svp_conn_t *scp, svp_query_t *sqp)
98 {
99 uint16_t remote_version = ntohs(scp->sc_input.sci_req.svp_ver);
100
101 if (scp->sc_cstate == SVP_CS_VERSIONING) {
102 /* Transition VERSIONING -> ACTIVE. */
103 assert(scp->sc_version == 0);
104 if (remote_version == 0 || remote_version > SVP_CURRENT_VERSION)
105 return (SVP_RA_ERROR);
106 scp->sc_version = remote_version;
107 scp->sc_cstate = SVP_CS_ACTIVE;
108 }
109
110 return (SVP_RA_NONE);
111 }
112
113 static void
114 svp_conn_ping_cb(svp_query_t *sqp, void *arg)
115 {
116 size_t len = (size_t)arg;
117
118 assert(len == sizeof (svp_query_t));
119 umem_free(sqp, len);
120 }
121
122 static svp_conn_act_t
123 svp_conn_ping_version(svp_conn_t *scp)
124 {
125 svp_remote_t *srp = scp->sc_remote;
126 svp_query_t *sqp = umem_zalloc(sizeof (svp_query_t), UMEM_DEFAULT);
127 int ret;
128
129 assert(MUTEX_HELD(&srp->sr_lock));
130 assert(MUTEX_HELD(&scp->sc_lock));
131 assert(scp->sc_cstate == SVP_CS_CONNECTING);
132
133 if (sqp == NULL)
134 return (SVP_RA_ERROR);
135
136 /* Only set things that need to be non-0/non-NULL. */
137 sqp->sq_state = SVP_QUERY_INIT;
138 sqp->sq_func = svp_conn_ping_cb;
139 sqp->sq_arg = (void *)sizeof (svp_query_t);
140 sqp->sq_header.svp_op = htons(SVP_R_PING);
141 sqp->sq_header.svp_ver = htons(SVP_CURRENT_VERSION);
142 sqp->sq_header.svp_id = svp_id_alloc();
143 if (sqp->sq_header.svp_id == -1) {
144 umem_free(sqp, sizeof (svp_query_t));
145 return (SVP_RA_ERROR);
146 }
147
148 scp->sc_cstate = SVP_CS_VERSIONING;
149 /* Set the event flags now... */
150 scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP | POLLOUT;
151 /* ...so I can just queue it up directly... */
152 svp_conn_queue(scp, sqp);
153 /* ... and then associate the event port myself. */
154 ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
155 if (ret == 0)
156 return (SVP_RA_RESTORE);
157 scp->sc_error = SVP_CE_ASSOCIATE;
158 scp->sc_errno = ret;
159 scp->sc_cstate = SVP_CS_ERROR;
160 umem_free(sqp, sizeof (svp_query_t));
161 return (SVP_RA_DEGRADE);
162 }
163
164 static void
165 svp_conn_add(svp_conn_t *scp)
166 {
167 svp_remote_t *srp = scp->sc_remote;
168
169 assert(MUTEX_HELD(&srp->sr_lock));
170 assert(MUTEX_HELD(&scp->sc_lock));
171
172 if (scp->sc_flags & SVP_CF_ADDED)
173 return;
174
175 list_insert_tail(&srp->sr_conns, scp);
176 scp->sc_flags |= SVP_CF_ADDED;
177 srp->sr_tconns++;
178 }
179
180 static void
181 svp_conn_remove(svp_conn_t *scp)
182 {
183 svp_remote_t *srp = scp->sc_remote;
184
221 libvarpd_panic("failed to close socket %d: %d\n",
222 scp->sc_socket, errno);
223 scp->sc_socket = -1;
224
225 scp->sc_cstate = SVP_CS_BACKOFF;
226 scp->sc_nbackoff++;
227 if (scp->sc_nbackoff >= svp_conn_nbackoff) {
228 scp->sc_btimer.st_value =
229 svp_conn_backoff_tbl[svp_conn_nbackoff - 1];
230 } else {
231 scp->sc_btimer.st_value =
232 svp_conn_backoff_tbl[scp->sc_nbackoff - 1];
233 }
234 svp_timer_add(&scp->sc_btimer);
235
236 if (scp->sc_nbackoff > svp_conn_nbackoff)
237 return (SVP_RA_DEGRADE);
238 return (SVP_RA_NONE);
239 }
240
241 static svp_conn_act_t
242 svp_conn_connect(svp_conn_t *scp)
243 {
244 int ret;
245 struct sockaddr_in6 in6;
246
247 assert(MUTEX_HELD(&scp->sc_lock));
248 assert(scp->sc_cstate == SVP_CS_BACKOFF ||
249 scp->sc_cstate == SVP_CS_INITIAL);
250 assert(scp->sc_socket == -1);
251 if (scp->sc_cstate == SVP_CS_INITIAL)
252 scp->sc_nbackoff = 0;
253
254 /* New connect means we need to know the version. */
255 scp->sc_version = 0;
256
257 scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
258 if (scp->sc_socket == -1) {
259 scp->sc_error = SVP_CE_SOCKET;
260 scp->sc_errno = errno;
309 } else {
310 /*
311 * This call failed, which means that we obtained one of
312 * the following:
313 *
314 * EADDRNOTAVAIL
315 * ECONNREFUSED
316 * EIO
317 * ENETUNREACH
318 * EHOSTUNREACH
319 * ENXIO
320 * ETIMEDOUT
321 *
322 * Therefore we need to set ourselves into backoff and
323 * wait for that to clear up.
324 */
325 return (svp_conn_backoff(scp));
326 }
327 }
328
329 /* Immediately successful connection, move to SVP_CS_VERSIONING. */
330 return (svp_conn_poll_connect(NULL, scp));
331 }
332
333 /*
334 * This should be the first call we get after a successful synchronous
335 * connect, or a completed (failed or successful) asynchronous connect. A
336 * non-NULL port-event indicates asynchronous completion, a NULL port-event
337 * indicates a successful synchronous connect.
338 *
339 * If we have successfully connected, we should see a writeable event. In the
340 * asynchronous case, we may also see an error or a hang up. For either hang
341 * up or error, we transition to error mode. If there is also a readable event
342 * (i.e. incoming data), we ignore it at the moment and just let a
343 * reassociation pick it up so we can simplify the set of state transitions
344 * that we have.
345 */
346 static svp_conn_act_t
347 svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
348 {
349 int ret;
356 /*
357 * These bits only matter if we're notified of an
358 * asynchronous connection completion.
359 */
360 if (!(pe->portev_events & POLLOUT)) {
361 scp->sc_errno = 0;
362 scp->sc_error = SVP_CE_NOPOLLOUT;
363 scp->sc_cstate = SVP_CS_ERROR;
364 return (SVP_RA_DEGRADE);
365 }
366
367 ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err,
368 &sl);
369 if (ret != 0)
370 libvarpd_panic("unanticipated getsockopt error");
371 if (err != 0) {
372 return (svp_conn_backoff(scp));
373 }
374 }
375
376 return (SVP_RA_FIND_VERSION);
377 }
378
379 static svp_conn_act_t
380 svp_conn_pollout(svp_conn_t *scp)
381 {
382 svp_query_t *sqp;
383 svp_req_t *req;
384 size_t off;
385 struct iovec iov[2];
386 int nvecs = 0;
387 ssize_t ret;
388
389 assert(MUTEX_HELD(&scp->sc_lock));
390
391 /*
392 * We need to find a query and start writing it out.
393 */
394 if (scp->sc_output.sco_query == NULL) {
395 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
396 sqp = list_next(&scp->sc_queries, sqp)) {
441 default:
442 libvarpd_panic("unexpected errno: %d", errno);
443 }
444 }
445
446 sqp->sq_acttime = gethrtime();
447 scp->sc_output.sco_offset += ret;
448 if (ret >= sizeof (svp_req_t) + sqp->sq_rsize) {
449 sqp->sq_state = SVP_QUERY_READING;
450 scp->sc_output.sco_query = NULL;
451 scp->sc_output.sco_offset = 0;
452 scp->sc_event.se_events |= POLLOUT;
453 }
454 return (SVP_RA_NONE);
455 }
456
457 static boolean_t
458 svp_conn_pollin_validate(svp_conn_t *scp)
459 {
460 svp_query_t *sqp;
461 uint32_t nsize, expected_size = 0;
462 uint16_t nvers, nop;
463 svp_req_t *resp = &scp->sc_input.sci_req;
464
465 assert(MUTEX_HELD(&scp->sc_lock));
466
467 nvers = ntohs(resp->svp_ver);
468 nop = ntohs(resp->svp_op);
469 nsize = ntohl(resp->svp_size);
470
471 /*
472 * A peer that's messing with post-connection version changes is
473 * likely a broken peer.
474 */
475 if (scp->sc_cstate != SVP_CS_VERSIONING && nvers != scp->sc_version) {
476 (void) bunyan_warn(svp_bunyan, "version mismatch",
477 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
478 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
479 BUNYAN_T_INT32, "peer version", nvers,
480 BUNYAN_T_INT32, "our version", scp->sc_version,
481 BUNYAN_T_INT32, "operation", nop,
482 BUNYAN_T_INT32, "response_id", resp->svp_id,
483 BUNYAN_T_END);
484 return (B_FALSE);
485 }
486
487 switch (nop) {
488 case SVP_R_VL2_ACK:
489 expected_size = sizeof (svp_vl2_ack_t);
490 break;
491 case SVP_R_VL3_ACK:
492 expected_size = sizeof (svp_vl3_ack_t);
493 break;
494 case SVP_R_LOG_RM_ACK:
495 expected_size = sizeof (svp_lrm_ack_t);
496 break;
497 case SVP_R_ROUTE_ACK:
498 expected_size = sizeof (svp_route_ack_t);
499 break;
500 case SVP_R_LOG_ACK:
501 case SVP_R_PONG:
502 /* No expected size (LOG_ACK) or size is 0 (PONG). */
503 break;
504 default:
505 (void) bunyan_warn(svp_bunyan, "unsupported operation",
506 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
507 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
508 BUNYAN_T_INT32, "version", nvers,
509 BUNYAN_T_INT32, "operation", nop,
510 BUNYAN_T_INT32, "response_id", resp->svp_id,
511 BUNYAN_T_END);
512 return (B_FALSE);
513 }
514
515 sqp = svp_conn_query_find(scp, resp->svp_id);
516 if (sqp == NULL) {
517 (void) bunyan_warn(svp_bunyan, "unknown response id",
518 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
519 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
520 BUNYAN_T_INT32, "version", nvers,
521 BUNYAN_T_INT32, "operation", nop,
522 BUNYAN_T_INT32, "response_id", resp->svp_id,
523 BUNYAN_T_END);
524 return (B_FALSE);
525 }
526
527 if (sqp->sq_state != SVP_QUERY_READING) {
528 (void) bunyan_warn(svp_bunyan,
529 "got response for unexpecting query",
530 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
531 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
532 BUNYAN_T_INT32, "version", nvers,
533 BUNYAN_T_INT32, "operation", nop,
534 BUNYAN_T_INT32, "response_id", resp->svp_id,
535 BUNYAN_T_INT32, "query_state", sqp->sq_state,
536 BUNYAN_T_END);
537 return (B_FALSE);
538 }
539
540 if (nop != SVP_R_LOG_RM_ACK && nsize != expected_size) {
541 (void) bunyan_warn(svp_bunyan, "response size too large",
542 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
543 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
544 BUNYAN_T_INT32, "version", nvers,
545 BUNYAN_T_INT32, "operation", nop,
546 BUNYAN_T_INT32, "response_id", resp->svp_id,
547 BUNYAN_T_INT32, "response_size", nsize,
548 BUNYAN_T_INT32, "expected_size", nop == SVP_R_VL2_ACK ?
549 sizeof (svp_vl2_ack_t) : sizeof (svp_vl3_ack_t),
550 BUNYAN_T_INT32, "query_state", sqp->sq_state,
551 BUNYAN_T_END);
552 return (B_FALSE);
553 }
554
555 /*
556 * The valid size is anything <= to what the user requested, but at
557 * least svp_log_ack_t bytes large.
558 */
559 if (nop == SVP_R_LOG_ACK) {
560 const char *msg = NULL;
565 if (msg != NULL) {
566 (void) bunyan_warn(svp_bunyan, msg,
567 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
568 BUNYAN_T_INT32, "remote_port",
569 scp->sc_remote->sr_rport,
570 BUNYAN_T_INT32, "version", nvers,
571 BUNYAN_T_INT32, "operation", nop,
572 BUNYAN_T_INT32, "response_id", resp->svp_id,
573 BUNYAN_T_INT32, "response_size", nsize,
574 BUNYAN_T_INT32, "expected_size",
575 ((svp_log_req_t *)sqp->sq_rdata)->svlr_count,
576 BUNYAN_T_INT32, "query_state", sqp->sq_state,
577 BUNYAN_T_END);
578 return (B_FALSE);
579 }
580 }
581
582 sqp->sq_size = nsize;
583 scp->sc_input.sci_query = sqp;
584 if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK ||
585 nop == SVP_R_LOG_RM_ACK || nop == SVP_R_ROUTE_ACK ||
586 nop == SVP_R_PONG) {
587 sqp->sq_wdata = &sqp->sq_wdun;
588 sqp->sq_wsize = sizeof (svp_query_data_t);
589 } else {
590 VERIFY(nop == SVP_R_LOG_ACK);
591 assert(sqp->sq_wdata != NULL);
592 assert(sqp->sq_wsize != 0);
593 }
594
595 return (B_TRUE);
596 }
597
598 static svp_conn_act_t
599 svp_conn_pollin(svp_conn_t *scp)
600 {
601 size_t off, total;
602 ssize_t ret;
603 svp_query_t *sqp;
604 uint32_t crc;
605 uint16_t nop;
606
656 sqp->sq_acttime = gethrtime();
657 total = ntohl(scp->sc_input.sci_req.svp_size);
658 do {
659 ret = read(scp->sc_socket,
660 (void *)((uintptr_t)sqp->sq_wdata + off),
661 total - off);
662 } while (ret == -1 && errno == EINTR);
663
664 if (ret == -1) {
665 switch (errno) {
666 case EAGAIN:
667 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
668 return (SVP_RA_NONE);
669 case EIO:
670 case ECONNRESET:
671 return (SVP_RA_ERROR);
672 break;
673 default:
674 libvarpd_panic("unexpeted read errno: %d", errno);
675 }
676 } else if (ret == 0 && total - off > 0) {
677 /* Try to reconnect to the remote host */
678 return (SVP_RA_ERROR);
679 }
680
681 if (ret + off < total) {
682 scp->sc_input.sci_offset += ret;
683 return (SVP_RA_NONE);
684 }
685
686 nop = ntohs(scp->sc_input.sci_req.svp_op);
687 crc = scp->sc_input.sci_req.svp_crc32;
688 svp_query_crc32(&scp->sc_input.sci_req, sqp->sq_wdata, total);
689 if (crc != scp->sc_input.sci_req.svp_crc32) {
690 (void) bunyan_info(svp_bunyan, "crc32 mismatch",
691 BUNYAN_T_IP, "remote ip", &scp->sc_addr,
692 BUNYAN_T_INT32, "remote port", scp->sc_remote->sr_rport,
693 BUNYAN_T_INT32, "version",
694 ntohs(scp->sc_input.sci_req.svp_ver),
695 BUNYAN_T_INT32, "operation", nop,
696 BUNYAN_T_INT32, "response id",
700 BUNYAN_T_UINT32, "calc_crc",
701 ntohl(scp->sc_input.sci_req.svp_crc32),
702 BUNYAN_T_END);
703 return (SVP_RA_ERROR);
704 }
705 scp->sc_input.sci_query = NULL;
706 scp->sc_input.sci_offset = 0;
707
708 if (nop == SVP_R_VL2_ACK) {
709 svp_vl2_ack_t *sl2a = sqp->sq_wdata;
710 sqp->sq_status = ntohl(sl2a->sl2a_status);
711 } else if (nop == SVP_R_VL3_ACK) {
712 svp_vl3_ack_t *sl3a = sqp->sq_wdata;
713 sqp->sq_status = ntohl(sl3a->sl3a_status);
714 } else if (nop == SVP_R_LOG_ACK) {
715 svp_log_ack_t *svla = sqp->sq_wdata;
716 sqp->sq_status = ntohl(svla->svla_status);
717 } else if (nop == SVP_R_LOG_RM_ACK) {
718 svp_lrm_ack_t *svra = sqp->sq_wdata;
719 sqp->sq_status = ntohl(svra->svra_status);
720 } else if (nop == SVP_R_ROUTE_ACK) {
721 svp_route_ack_t *sra = sqp->sq_wdata;
722 sqp->sq_status = ntohl(sra->sra_status);
723 } else if (nop == SVP_R_PONG) {
724 /*
725 * Handle the PONG versioning-capture here, as we need
726 * the version number, the scp_lock held, and the ability
727 * to error out.
728 */
729 svp_conn_act_t cbret;
730
731 cbret = svp_conn_pong_handler(scp, sqp);
732 if (cbret != SVP_RA_NONE)
733 return (cbret);
734 } else {
735 libvarpd_panic("unhandled nop: %d", nop);
736 }
737
738 list_remove(&scp->sc_queries, sqp);
739 mutex_exit(&scp->sc_lock);
740
741 /*
742 * We have to release all of our resources associated with this entry
743 * before we call the callback. After we call it, the memory will be
744 * lost to time.
745 */
746 svp_query_release(sqp);
747 sqp->sq_func(sqp, sqp->sq_arg);
748 mutex_enter(&scp->sc_lock);
749 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
750
751 return (SVP_RA_NONE);
752 }
753
825 return;
826 }
827
828 /* Check if this needs to be reset */
829 if (scp->sc_flags & SVP_CF_TEARDOWN) {
830 /* Make sure any other users of this are disassociated */
831 ret = SVP_RA_ERROR;
832 goto out;
833 }
834
835 switch (scp->sc_cstate) {
836 case SVP_CS_INITIAL:
837 case SVP_CS_BACKOFF:
838 assert(pe == NULL);
839 ret = svp_conn_connect(scp);
840 break;
841 case SVP_CS_CONNECTING:
842 assert(pe != NULL);
843 ret = svp_conn_poll_connect(pe, scp);
844 break;
845 case SVP_CS_VERSIONING:
846 case SVP_CS_ACTIVE:
847 case SVP_CS_WINDDOWN:
848 assert(pe != NULL);
849 oldstate = scp->sc_cstate;
850 if (pe->portev_events & POLLOUT)
851 ret = svp_conn_pollout(scp);
852 if (ret == SVP_RA_NONE && (pe->portev_events & POLLIN))
853 ret = svp_conn_pollin(scp);
854
855 if (oldstate == SVP_CS_WINDDOWN &&
856 (list_is_empty(&scp->sc_queries) || ret != SVP_RA_NONE)) {
857 ret = SVP_RA_CLEANUP;
858 }
859
860 if (ret == SVP_RA_NONE) {
861 int err;
862 if ((err = svp_event_associate(&scp->sc_event,
863 scp->sc_socket)) != 0) {
864 scp->sc_error = SVP_CE_ASSOCIATE;
865 scp->sc_errno = err;
866 scp->sc_cstate = SVP_CS_ERROR;
867 ret = SVP_RA_DEGRADE;
868 }
869 }
870 break;
871 default:
872 libvarpd_panic("svp_conn_handler encountered unexpected "
873 "state: %d", scp->sc_cstate);
874 }
875 out:
876 mutex_exit(&scp->sc_lock);
877
878 if (ret == SVP_RA_NONE)
879 return;
880
881 mutex_enter(&srp->sr_lock);
882 mutex_enter(&scp->sc_lock);
883 if (ret == SVP_RA_FIND_VERSION)
884 ret = svp_conn_ping_version(scp);
885
886 if (ret == SVP_RA_ERROR)
887 ret = svp_conn_reset(scp);
888
889 if (ret == SVP_RA_DEGRADE)
890 svp_conn_degrade(scp);
891 if (ret == SVP_RA_RESTORE)
892 svp_conn_restore(scp);
893
894 if (ret == SVP_RA_CLEANUP) {
895 svp_conn_remove(scp);
896 scp->sc_flags |= SVP_CF_REAP;
897 svp_conn_inject(scp);
898 }
899 mutex_exit(&scp->sc_lock);
900 mutex_exit(&srp->sr_lock);
901 }
902
903 static void
904 svp_conn_backtimer(void *arg)
905 {
1107 "associated: %d", ret);
1108 }
1109 mutex_exit(&scp->sc_lock);
1110
1111 /* Verify our timers are killed */
1112 svp_timer_remove(&scp->sc_btimer);
1113 svp_timer_remove(&scp->sc_qtimer);
1114
1115 if (scp->sc_socket != -1 && close(scp->sc_socket) != 0)
1116 libvarpd_panic("failed to close svp_conn_t`scp_socket fd "
1117 "%d: %d", scp->sc_socket, errno);
1118
1119 list_destroy(&scp->sc_queries);
1120 umem_free(scp, sizeof (svp_conn_t));
1121 }
1122
1123 void
1124 svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp)
1125 {
1126 assert(MUTEX_HELD(&scp->sc_lock));
1127 assert(scp->sc_cstate == SVP_CS_ACTIVE ||
1128 scp->sc_cstate == SVP_CS_VERSIONING);
1129
1130 sqp->sq_acttime = -1;
1131 list_insert_tail(&scp->sc_queries, sqp);
1132 if (!(scp->sc_event.se_events & POLLOUT)) {
1133 scp->sc_event.se_events |= POLLOUT;
1134 /*
1135 * If this becomes frequent, we should instead give up on this
1136 * set of connections instead of aborting.
1137 */
1138 if (svp_event_associate(&scp->sc_event, scp->sc_socket) != 0)
1139 libvarpd_panic("svp_event_associate failed somehow");
1140 }
1141 }
|