1 /*
2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
5 * 1.0 of the CDDL.
6 *
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
10 */
11
12 /*
13 * Copyright 2015 Joyent, Inc.
14 */
15
16 /*
17 * Logic to manage an individual connection to a remote host.
18 *
19 * For more information, see the big theory statement in
20 * lib/varpd/svp/common/libvarpd_svp.c.
21 */
22
23 #include <assert.h>
24 #include <umem.h>
25 #include <errno.h>
26 #include <strings.h>
27 #include <unistd.h>
28 #include <stddef.h>
29 #include <sys/uio.h>
30 #include <sys/debug.h>
31
32 #include <libvarpd_svp.h>
33
34 int svp_conn_query_timeout = 30;
35 static int svp_conn_backoff_tbl[] = { 1, 2, 4, 8, 16, 32 };
36 static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int);
37
38 typedef enum svp_conn_act {
39 SVP_RA_NONE = 0x00,
40 SVP_RA_DEGRADE = 0x01,
41 SVP_RA_RESTORE = 0x02,
42 SVP_RA_ERROR = 0x03,
43 SVP_RA_CLEANUP = 0x04
44 } svp_conn_act_t;
45
46 static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
47
48 static void
49 svp_conn_inject(svp_conn_t *scp)
50 {
51 int ret;
52 assert(MUTEX_HELD(&scp->sc_lock));
53
54 if (scp->sc_flags & SVP_CF_USER)
55 return;
56 scp->sc_flags |= SVP_CF_USER;
57 if ((ret = svp_event_inject(&scp->sc_event)) != 0)
58 libvarpd_panic("failed to inject event: %d\n", ret);
59 }
60
61 static void
62 svp_conn_degrade(svp_conn_t *scp)
63 {
64 svp_remote_t *srp = scp->sc_remote;
65
66 assert(MUTEX_HELD(&srp->sr_lock));
67 assert(MUTEX_HELD(&scp->sc_lock));
68
69 if (scp->sc_flags & SVP_CF_DEGRADED)
70 return;
71
72 scp->sc_flags |= SVP_CF_DEGRADED;
73 srp->sr_ndconns++;
74 if (srp->sr_ndconns == srp->sr_tconns)
75 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
76 }
77
78 static void
79 svp_conn_restore(svp_conn_t *scp)
80 {
81 svp_remote_t *srp = scp->sc_remote;
82
83 assert(MUTEX_HELD(&srp->sr_lock));
84 assert(MUTEX_HELD(&scp->sc_lock));
85
86 if (!(scp->sc_flags & SVP_CF_DEGRADED))
87 return;
88
89 scp->sc_flags &= ~SVP_CF_DEGRADED;
90 if (srp->sr_ndconns == srp->sr_tconns)
91 svp_remote_restore(srp, SVP_RD_REMOTE_FAIL);
92 srp->sr_ndconns--;
93 }
94
95 static void
96 svp_conn_add(svp_conn_t *scp)
97 {
98 svp_remote_t *srp = scp->sc_remote;
99
100 assert(MUTEX_HELD(&srp->sr_lock));
101 assert(MUTEX_HELD(&scp->sc_lock));
102
103 if (scp->sc_flags & SVP_CF_ADDED)
104 return;
105
106 list_insert_tail(&srp->sr_conns, scp);
107 scp->sc_flags |= SVP_CF_ADDED;
108 srp->sr_tconns++;
109 }
110
111 static void
112 svp_conn_remove(svp_conn_t *scp)
113 {
114 svp_remote_t *srp = scp->sc_remote;
115
116 assert(MUTEX_HELD(&srp->sr_lock));
117 assert(MUTEX_HELD(&scp->sc_lock));
118
119 if (!(scp->sc_flags & SVP_CF_ADDED))
120 return;
121
122 scp->sc_flags &= ~SVP_CF_ADDED;
123 if (scp->sc_flags & SVP_CF_DEGRADED)
124 srp->sr_ndconns--;
125 srp->sr_tconns--;
126 if (srp->sr_tconns == srp->sr_ndconns)
127 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
128 }
129
130 static svp_query_t *
131 svp_conn_query_find(svp_conn_t *scp, uint32_t id)
132 {
133 svp_query_t *sqp;
134
135 assert(MUTEX_HELD(&scp->sc_lock));
136
137 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
138 sqp = list_next(&scp->sc_queries, sqp)) {
139 if (sqp->sq_header.svp_id == id)
140 break;
141 }
142
143 return (sqp);
144 }
145
146 static svp_conn_act_t
147 svp_conn_backoff(svp_conn_t *scp)
148 {
149 assert(MUTEX_HELD(&scp->sc_lock));
150
151 if (close(scp->sc_socket) != 0)
152 libvarpd_panic("failed to close socket %d: %d\n",
153 scp->sc_socket, errno);
154 scp->sc_socket = -1;
155
156 scp->sc_cstate = SVP_CS_BACKOFF;
157 scp->sc_nbackoff++;
158 if (scp->sc_nbackoff >= svp_conn_nbackoff) {
159 scp->sc_btimer.st_value =
160 svp_conn_backoff_tbl[svp_conn_nbackoff - 1];
161 } else {
162 scp->sc_btimer.st_value =
163 svp_conn_backoff_tbl[scp->sc_nbackoff - 1];
164 }
165 svp_timer_add(&scp->sc_btimer);
166
167 if (scp->sc_nbackoff > svp_conn_nbackoff)
168 return (SVP_RA_DEGRADE);
169 return (SVP_RA_NONE);
170 }
171
172 /*
173 * Think of this as an extension to the connect() call in svp_conn_connect().
174 * Send a message, receive it, and set the version here. If the response is
175 * too slow or the socket throws an error, indicate a socket error, which
176 * will cause the caller to backoff (i.e. close the socket and try again).
177 *
178 * Version mismatch (corrupt SVP server or too-advanced SVP server) is its
179 * own error type.
180 */
181 static svp_conn_error_t
182 svp_conn_version_set(svp_conn_t *scp)
183 {
184 svp_req_t ping;
185 ssize_t ret;
186 uint32_t save_crc;
187 uint16_t peer_version;
188 int ntries = 3; /* One second between tries. 3secs should be enough. */
189
190 ping.svp_ver = htons(SVP_CURRENT_VERSION);
191 ping.svp_op = htons(SVP_R_PING);
192 ping.svp_size = 0; /* Header-only... */
193 ping.svp_id = 0;
194 /* 0-length data... just use the req buffer for the pointer. */
195 svp_query_crc32(&ping, &ping, 0);
196
197 ret = write(scp->sc_socket, &ping, sizeof (ping));
198 if (ret == -1) {
199 /*
200 * A failed write() call right after connect probably
201 * indicates a larger connection failure. Restart the
202 * connection from scratch.
203 */
204 return (SVP_CE_SOCKET);
205 }
206 assert(ret == sizeof (ping));
207 do {
208 /*
209 * Asynch read. We may loop here once or twice.
210 * Wait a bit, but don't loop too many times...
211 */
212 (void) sleep(1);
213 ret = read(scp->sc_socket, &ping, sizeof (ping));
214 } while (--ntries > 0 &&
215 ret == -1 && (errno == EINTR || errno == EAGAIN));
216 if (ret == -1) {
217 /*
218 * This is actually a failed read() call. Restart the
219 * connection from scratch.
220 */
221 return (SVP_CE_SOCKET);
222 }
223
224 save_crc = ping.svp_crc32;
225 svp_query_crc32(&ping, &ping, 0);
226 peer_version = htons(ping.svp_ver);
227 if (ping.svp_op != htons(SVP_R_PONG) ||
228 ping.svp_size != 0 || ping.svp_id != 0 ||
229 ping.svp_crc32 != save_crc ||
230 peer_version == 0 || peer_version > SVP_CURRENT_VERSION) {
231 return (SVP_CE_VERSION_PONG);
232 }
233
234 /* This connection now has a version! */
235 scp->sc_version = peer_version;
236 return (SVP_CE_NONE);
237 }
238
239 static svp_conn_act_t
240 svp_conn_connect(svp_conn_t *scp)
241 {
242 int ret;
243 struct sockaddr_in6 in6;
244
245 assert(MUTEX_HELD(&scp->sc_lock));
246 assert(scp->sc_cstate == SVP_CS_BACKOFF ||
247 scp->sc_cstate == SVP_CS_INITIAL);
248 assert(scp->sc_socket == -1);
249 if (scp->sc_cstate == SVP_CS_INITIAL)
250 scp->sc_nbackoff = 0;
251
252 /* New connect means we need to know the version. */
253 scp->sc_version = 0;
254
255 scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
256 if (scp->sc_socket == -1) {
257 scp->sc_error = SVP_CE_SOCKET;
258 scp->sc_errno = errno;
259 scp->sc_cstate = SVP_CS_ERROR;
260 return (SVP_RA_DEGRADE);
261 }
262
263 bzero(&in6, sizeof (struct sockaddr_in6));
264 in6.sin6_family = AF_INET6;
265 in6.sin6_port = htons(scp->sc_remote->sr_rport);
266 bcopy(&scp->sc_addr, &in6.sin6_addr, sizeof (struct in6_addr));
267 ret = connect(scp->sc_socket, (struct sockaddr *)&in6,
268 sizeof (struct sockaddr_in6));
269 if (ret != 0) {
270 boolean_t async = B_FALSE;
271
272 switch (errno) {
273 case EACCES:
274 case EADDRINUSE:
275 case EAFNOSUPPORT:
276 case EALREADY:
277 case EBADF:
278 case EISCONN:
279 case ELOOP:
280 case ENOENT:
281 case ENOSR:
282 case EWOULDBLOCK:
283 libvarpd_panic("unanticipated connect errno %d", errno);
284 break;
285 case EINPROGRESS:
286 case EINTR:
287 async = B_TRUE;
288 default:
289 break;
290 }
291
292 /*
293 * So, we will be connecting to this in the future, advance our
294 * state and make sure that we poll for the next round.
295 */
296 if (async == B_TRUE) {
297 scp->sc_cstate = SVP_CS_CONNECTING;
298 scp->sc_event.se_events = POLLOUT | POLLHUP;
299 ret = svp_event_associate(&scp->sc_event,
300 scp->sc_socket);
301 if (ret == 0)
302 return (SVP_RA_NONE);
303 scp->sc_error = SVP_CE_ASSOCIATE;
304 scp->sc_errno = ret;
305 scp->sc_cstate = SVP_CS_ERROR;
306 return (SVP_RA_DEGRADE);
307 } else {
308 /*
309 * This call failed, which means that we obtained one of
310 * the following:
311 *
312 * EADDRNOTAVAIL
313 * ECONNREFUSED
314 * EIO
315 * ENETUNREACH
316 * EHOSTUNREACH
317 * ENXIO
318 * ETIMEDOUT
319 *
320 * Therefore we need to set ourselves into backoff and
321 * wait for that to clear up.
322 */
323 return (svp_conn_backoff(scp));
324 }
325 }
326
327 return (svp_conn_poll_connect(NULL, scp));
328 }
329
330 /*
331 * This should be the first call we get after a successful synchronous
332 * connect, or a completed (failed or successful) asynchronous connect. A
333 * non-NULL port-event indicates asynchronous completion, a NULL port-event
334 * indicates a successful synchronous connect.
335 *
336 * If we have successfully connected, we should see a writeable event. In the
337 * asynchronous case, we may also see an error or a hang up. For either hang
338 * up or error, we transition to error mode. If there is also a readable event
339 * (i.e. incoming data), we ignore it at the moment and just let a
340 * reassociation pick it up so we can simplify the set of state transitions
341 * that we have.
342 */
343 static svp_conn_act_t
344 svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
345 {
346 int ret;
347 svp_conn_error_t version_error;
348
349 if (pe != NULL) {
350 int err;
351 socklen_t sl = sizeof (err);
352
353 /*
354 * These bits only matter if we're notified of an
355 * asynchronous connection completion.
356 */
357 if (!(pe->portev_events & POLLOUT)) {
358 scp->sc_errno = 0;
359 scp->sc_error = SVP_CE_NOPOLLOUT;
360 scp->sc_cstate = SVP_CS_ERROR;
361 return (SVP_RA_DEGRADE);
362 }
363
364 ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err,
365 &sl);
366 if (ret != 0)
367 libvarpd_panic("unanticipated getsockopt error");
368 if (err != 0) {
369 return (svp_conn_backoff(scp));
370 }
371 }
372
373 /* Use a single SVP_R_PING to determine the version. */
374 version_error = svp_conn_version_set(scp);
375 switch (version_error) {
376 case SVP_CE_SOCKET:
377 /* Use this to signal read/write errors... */
378 return (svp_conn_backoff(scp));
379 case SVP_CE_NONE:
380 assert(scp->sc_version > 0 &&
381 scp->sc_version <= SVP_CURRENT_VERSION);
382 break;
383 default:
384 scp->sc_error = version_error;
385 scp->sc_cstate = SVP_CS_ERROR;
386 scp->sc_errno = EPROTONOSUPPORT; /* Protocol error... */
387 return (SVP_RA_DEGRADE);
388 }
389
390 scp->sc_cstate = SVP_CS_ACTIVE;
391 scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
392 ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
393 if (ret == 0)
394 return (SVP_RA_RESTORE);
395 scp->sc_error = SVP_CE_ASSOCIATE;
396 scp->sc_errno = ret;
397 scp->sc_cstate = SVP_CS_ERROR;
398 return (SVP_RA_DEGRADE);
399 }
400
401 static svp_conn_act_t
402 svp_conn_pollout(svp_conn_t *scp)
403 {
404 svp_query_t *sqp;
405 svp_req_t *req;
406 size_t off;
407 struct iovec iov[2];
408 int nvecs = 0;
409 ssize_t ret;
410
411 assert(MUTEX_HELD(&scp->sc_lock));
412
413 /*
414 * We need to find a query and start writing it out.
415 */
416 if (scp->sc_output.sco_query == NULL) {
417 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
418 sqp = list_next(&scp->sc_queries, sqp)) {
419 if (sqp->sq_state != SVP_QUERY_INIT)
420 continue;
421 break;
422 }
423
424 if (sqp == NULL) {
425 scp->sc_event.se_events &= ~POLLOUT;
426 return (SVP_RA_NONE);
427 }
428
429 scp->sc_output.sco_query = sqp;
430 scp->sc_output.sco_offset = 0;
431 sqp->sq_state = SVP_QUERY_WRITING;
432 svp_query_crc32(&sqp->sq_header, sqp->sq_rdata, sqp->sq_rsize);
433 }
434
435 sqp = scp->sc_output.sco_query;
436 req = &sqp->sq_header;
437 off = scp->sc_output.sco_offset;
438 if (off < sizeof (svp_req_t)) {
439 iov[nvecs].iov_base = (void *)((uintptr_t)req + off);
440 iov[nvecs].iov_len = sizeof (svp_req_t) - off;
441 nvecs++;
442 off = 0;
443 } else {
444 off -= sizeof (svp_req_t);
445 }
446
447 iov[nvecs].iov_base = (void *)((uintptr_t)sqp->sq_rdata + off);
448 iov[nvecs].iov_len = sqp->sq_rsize - off;
449 nvecs++;
450
451 do {
452 ret = writev(scp->sc_socket, iov, nvecs);
453 } while (ret == -1 && errno == EINTR);
454 if (ret == -1) {
455 switch (errno) {
456 case EAGAIN:
457 scp->sc_event.se_events |= POLLOUT;
458 return (SVP_RA_NONE);
459 case EIO:
460 case ENXIO:
461 case ECONNRESET:
462 return (SVP_RA_ERROR);
463 default:
464 libvarpd_panic("unexpected errno: %d", errno);
465 }
466 }
467
468 sqp->sq_acttime = gethrtime();
469 scp->sc_output.sco_offset += ret;
470 if (ret >= sizeof (svp_req_t) + sqp->sq_rsize) {
471 sqp->sq_state = SVP_QUERY_READING;
472 scp->sc_output.sco_query = NULL;
473 scp->sc_output.sco_offset = 0;
474 scp->sc_event.se_events |= POLLOUT;
475 }
476 return (SVP_RA_NONE);
477 }
478
479 static boolean_t
480 svp_conn_pollin_validate(svp_conn_t *scp)
481 {
482 svp_query_t *sqp;
483 uint32_t nsize;
484 uint16_t nvers, nop;
485 svp_req_t *resp = &scp->sc_input.sci_req;
486
487 assert(MUTEX_HELD(&scp->sc_lock));
488
489 nvers = ntohs(resp->svp_ver);
490 nop = ntohs(resp->svp_op);
491 nsize = ntohl(resp->svp_size);
492
493 /*
494 * A peer that's messing with post-connection version changes is
495 * likely a broken peer.
496 */
497 if (nvers != scp->sc_version) {
498 (void) bunyan_warn(svp_bunyan, "version mismatch",
499 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
500 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
501 BUNYAN_T_INT32, "peer version", nvers,
502 BUNYAN_T_INT32, "our version", scp->sc_version,
503 BUNYAN_T_INT32, "operation", nop,
504 BUNYAN_T_INT32, "response_id", resp->svp_id,
505 BUNYAN_T_END);
506 return (B_FALSE);
507 }
508
509 if (nop != SVP_R_VL2_ACK && nop != SVP_R_VL3_ACK &&
510 nop != SVP_R_LOG_ACK && nop != SVP_R_LOG_RM_ACK) {
511 (void) bunyan_warn(svp_bunyan, "unsupported operation",
512 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
513 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
514 BUNYAN_T_INT32, "version", nvers,
515 BUNYAN_T_INT32, "operation", nop,
516 BUNYAN_T_INT32, "response_id", resp->svp_id,
517 BUNYAN_T_END);
518 return (B_FALSE);
519 }
520
521 sqp = svp_conn_query_find(scp, resp->svp_id);
522 if (sqp == NULL) {
523 (void) bunyan_warn(svp_bunyan, "unknown response id",
524 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
525 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
526 BUNYAN_T_INT32, "version", nvers,
527 BUNYAN_T_INT32, "operation", nop,
528 BUNYAN_T_INT32, "response_id", resp->svp_id,
529 BUNYAN_T_END);
530 return (B_FALSE);
531 }
532
533 if (sqp->sq_state != SVP_QUERY_READING) {
534 (void) bunyan_warn(svp_bunyan,
535 "got response for unexpecting query",
536 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
537 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
538 BUNYAN_T_INT32, "version", nvers,
539 BUNYAN_T_INT32, "operation", nop,
540 BUNYAN_T_INT32, "response_id", resp->svp_id,
541 BUNYAN_T_INT32, "query_state", sqp->sq_state,
542 BUNYAN_T_END);
543 return (B_FALSE);
544 }
545
546 if ((nop == SVP_R_VL2_ACK && nsize != sizeof (svp_vl2_ack_t)) ||
547 (nop == SVP_R_VL3_ACK && nsize != sizeof (svp_vl3_ack_t)) ||
548 (nop == SVP_R_LOG_RM_ACK && nsize != sizeof (svp_lrm_ack_t))) {
549 (void) bunyan_warn(svp_bunyan, "response size too large",
550 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
551 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
552 BUNYAN_T_INT32, "version", nvers,
553 BUNYAN_T_INT32, "operation", nop,
554 BUNYAN_T_INT32, "response_id", resp->svp_id,
555 BUNYAN_T_INT32, "response_size", nsize,
556 BUNYAN_T_INT32, "expected_size", nop == SVP_R_VL2_ACK ?
557 sizeof (svp_vl2_ack_t) : sizeof (svp_vl3_ack_t),
558 BUNYAN_T_INT32, "query_state", sqp->sq_state,
559 BUNYAN_T_END);
560 return (B_FALSE);
561 }
562
563 /*
564 * The valid size is anything <= to what the user requested, but at
565 * least svp_log_ack_t bytes large.
566 */
567 if (nop == SVP_R_LOG_ACK) {
568 const char *msg = NULL;
569 if (nsize < sizeof (svp_log_ack_t))
570 msg = "response size too small";
571 else if (nsize > ((svp_log_req_t *)sqp->sq_rdata)->svlr_count)
572 msg = "response size too large";
573 if (msg != NULL) {
574 (void) bunyan_warn(svp_bunyan, msg,
575 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
576 BUNYAN_T_INT32, "remote_port",
577 scp->sc_remote->sr_rport,
578 BUNYAN_T_INT32, "version", nvers,
579 BUNYAN_T_INT32, "operation", nop,
580 BUNYAN_T_INT32, "response_id", resp->svp_id,
581 BUNYAN_T_INT32, "response_size", nsize,
582 BUNYAN_T_INT32, "expected_size",
583 ((svp_log_req_t *)sqp->sq_rdata)->svlr_count,
584 BUNYAN_T_INT32, "query_state", sqp->sq_state,
585 BUNYAN_T_END);
586 return (B_FALSE);
587 }
588 }
589
590 sqp->sq_size = nsize;
591 scp->sc_input.sci_query = sqp;
592 if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK ||
593 nop == SVP_R_LOG_RM_ACK) {
594 sqp->sq_wdata = &sqp->sq_wdun;
595 sqp->sq_wsize = sizeof (svp_query_data_t);
596 } else {
597 VERIFY(nop == SVP_R_LOG_ACK);
598 assert(sqp->sq_wdata != NULL);
599 assert(sqp->sq_wsize != 0);
600 }
601
602 return (B_TRUE);
603 }
604
605 static svp_conn_act_t
606 svp_conn_pollin(svp_conn_t *scp)
607 {
608 size_t off, total;
609 ssize_t ret;
610 svp_query_t *sqp;
611 uint32_t crc;
612 uint16_t nop;
613
614 assert(MUTEX_HELD(&scp->sc_lock));
615
616 /*
617 * No query implies that we're reading in the header and that the offset
618 * is associted with it.
619 */
620 off = scp->sc_input.sci_offset;
621 sqp = scp->sc_input.sci_query;
622 if (scp->sc_input.sci_query == NULL) {
623 svp_req_t *resp = &scp->sc_input.sci_req;
624
625 assert(off < sizeof (svp_req_t));
626
627 do {
628 ret = read(scp->sc_socket,
629 (void *)((uintptr_t)resp + off),
630 sizeof (svp_req_t) - off);
631 } while (ret == -1 && errno == EINTR);
632 if (ret == -1) {
633 switch (errno) {
634 case EAGAIN:
635 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
636 return (SVP_RA_NONE);
637 case EIO:
638 case ECONNRESET:
639 return (SVP_RA_ERROR);
640 break;
641 default:
642 libvarpd_panic("unexpeted read errno: %d",
643 errno);
644 }
645 } else if (ret == 0) {
646 /* Try to reconnect to the remote host */
647 return (SVP_RA_ERROR);
648 }
649
650 /* Didn't get all the data we need */
651 if (off + ret < sizeof (svp_req_t)) {
652 scp->sc_input.sci_offset += ret;
653 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
654 return (SVP_RA_NONE);
655 }
656
657 if (svp_conn_pollin_validate(scp) != B_TRUE)
658 return (SVP_RA_ERROR);
659 }
660
661 sqp = scp->sc_input.sci_query;
662 assert(sqp != NULL);
663 sqp->sq_acttime = gethrtime();
664 total = ntohl(scp->sc_input.sci_req.svp_size);
665 do {
666 ret = read(scp->sc_socket,
667 (void *)((uintptr_t)sqp->sq_wdata + off),
668 total - off);
669 } while (ret == -1 && errno == EINTR);
670
671 if (ret == -1) {
672 switch (errno) {
673 case EAGAIN:
674 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
675 return (SVP_RA_NONE);
676 case EIO:
677 case ECONNRESET:
678 return (SVP_RA_ERROR);
679 break;
680 default:
681 libvarpd_panic("unexpeted read errno: %d", errno);
682 }
683 } else if (ret == 0) {
684 /* Try to reconnect to the remote host */
685 return (SVP_RA_ERROR);
686 }
687
688 if (ret + off < total) {
689 scp->sc_input.sci_offset += ret;
690 return (SVP_RA_NONE);
691 }
692
693 nop = ntohs(scp->sc_input.sci_req.svp_op);
694 crc = scp->sc_input.sci_req.svp_crc32;
695 svp_query_crc32(&scp->sc_input.sci_req, sqp->sq_wdata, total);
696 if (crc != scp->sc_input.sci_req.svp_crc32) {
697 (void) bunyan_info(svp_bunyan, "crc32 mismatch",
698 BUNYAN_T_IP, "remote ip", &scp->sc_addr,
699 BUNYAN_T_INT32, "remote port", scp->sc_remote->sr_rport,
700 BUNYAN_T_INT32, "version",
701 ntohs(scp->sc_input.sci_req.svp_ver),
702 BUNYAN_T_INT32, "operation", nop,
703 BUNYAN_T_INT32, "response id",
704 ntohl(scp->sc_input.sci_req.svp_id),
705 BUNYAN_T_INT32, "query state", sqp->sq_state,
706 BUNYAN_T_UINT32, "msg_crc", ntohl(crc),
707 BUNYAN_T_UINT32, "calc_crc",
708 ntohl(scp->sc_input.sci_req.svp_crc32),
709 BUNYAN_T_END);
710 return (SVP_RA_ERROR);
711 }
712 scp->sc_input.sci_query = NULL;
713 scp->sc_input.sci_offset = 0;
714
715 if (nop == SVP_R_VL2_ACK) {
716 svp_vl2_ack_t *sl2a = sqp->sq_wdata;
717 sqp->sq_status = ntohl(sl2a->sl2a_status);
718 } else if (nop == SVP_R_VL3_ACK) {
719 svp_vl3_ack_t *sl3a = sqp->sq_wdata;
720 sqp->sq_status = ntohl(sl3a->sl3a_status);
721 } else if (nop == SVP_R_LOG_ACK) {
722 svp_log_ack_t *svla = sqp->sq_wdata;
723 sqp->sq_status = ntohl(svla->svla_status);
724 } else if (nop == SVP_R_LOG_RM_ACK) {
725 svp_lrm_ack_t *svra = sqp->sq_wdata;
726 sqp->sq_status = ntohl(svra->svra_status);
727 } else {
728 libvarpd_panic("unhandled nop: %d", nop);
729 }
730
731 list_remove(&scp->sc_queries, sqp);
732 mutex_exit(&scp->sc_lock);
733
734 /*
735 * We have to release all of our resources associated with this entry
736 * before we call the callback. After we call it, the memory will be
737 * lost to time.
738 */
739 svp_query_release(sqp);
740 sqp->sq_func(sqp, sqp->sq_arg);
741 mutex_enter(&scp->sc_lock);
742 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
743
744 return (SVP_RA_NONE);
745 }
746
747 static svp_conn_act_t
748 svp_conn_reset(svp_conn_t *scp)
749 {
750 svp_remote_t *srp = scp->sc_remote;
751
752 assert(MUTEX_HELD(&srp->sr_lock));
753 assert(MUTEX_HELD(&scp->sc_lock));
754
755 assert(svp_event_dissociate(&scp->sc_event, scp->sc_socket) ==
756 ENOENT);
757 if (close(scp->sc_socket) != 0)
758 libvarpd_panic("failed to close socket %d: %d", scp->sc_socket,
759 errno);
760 scp->sc_flags &= ~SVP_CF_TEARDOWN;
761 scp->sc_socket = -1;
762 scp->sc_cstate = SVP_CS_INITIAL;
763 scp->sc_input.sci_query = NULL;
764 scp->sc_output.sco_query = NULL;
765
766 svp_remote_reassign(srp, scp);
767
768 return (svp_conn_connect(scp));
769 }
770
771 /*
772 * This is our general state transition function. We're called here when we want
773 * to advance part of our state machine as well as to re-arm ourselves. We can
774 * also end up here from the standard event loop as a result of having a user
775 * event posted.
776 */
777 static void
778 svp_conn_handler(port_event_t *pe, void *arg)
779 {
780 svp_conn_t *scp = arg;
781 svp_remote_t *srp = scp->sc_remote;
782 svp_conn_act_t ret = SVP_RA_NONE;
783 svp_conn_state_t oldstate;
784
785 mutex_enter(&scp->sc_lock);
786
787 /*
788 * Check if one of our event interrupts is set. An event interrupt, such
789 * as having to be reaped or be torndown is notified by a
790 * PORT_SOURCE_USER event that tries to take care of this. However,
791 * because of the fact that the event loop can be ongoing despite this,
792 * we may get here before the PORT_SOURCE_USER has casued us to get
793 * here. In such a case, if the PORT_SOURCE_USER event is tagged, then
794 * we're going to opt to do nothing here and wait for it to come and
795 * tear us down. That will also indicate to us that we have nothing to
796 * worry about as far as general timing and the like goes.
797 */
798 if ((scp->sc_flags & SVP_CF_UFLAG) != 0 &&
799 (scp->sc_flags & SVP_CF_USER) != 0 &&
800 pe != NULL &&
801 pe->portev_source != PORT_SOURCE_USER) {
802 mutex_exit(&scp->sc_lock);
803 return;
804 }
805
806 if (pe != NULL && pe->portev_source == PORT_SOURCE_USER) {
807 scp->sc_flags &= ~SVP_CF_USER;
808 if ((scp->sc_flags & SVP_CF_UFLAG) == 0) {
809 mutex_exit(&scp->sc_lock);
810 return;
811 }
812 }
813
814 /* Check if this needs to be freed */
815 if (scp->sc_flags & SVP_CF_REAP) {
816 mutex_exit(&scp->sc_lock);
817 svp_conn_destroy(scp);
818 return;
819 }
820
821 /* Check if this needs to be reset */
822 if (scp->sc_flags & SVP_CF_TEARDOWN) {
823 /* Make sure any other users of this are disassociated */
824 ret = SVP_RA_ERROR;
825 goto out;
826 }
827
828 switch (scp->sc_cstate) {
829 case SVP_CS_INITIAL:
830 case SVP_CS_BACKOFF:
831 assert(pe == NULL);
832 ret = svp_conn_connect(scp);
833 break;
834 case SVP_CS_CONNECTING:
835 assert(pe != NULL);
836 ret = svp_conn_poll_connect(pe, scp);
837 break;
838 case SVP_CS_ACTIVE:
839 case SVP_CS_WINDDOWN:
840 assert(pe != NULL);
841 oldstate = scp->sc_cstate;
842 if (pe->portev_events & POLLOUT)
843 ret = svp_conn_pollout(scp);
844 if (ret == SVP_RA_NONE && (pe->portev_events & POLLIN))
845 ret = svp_conn_pollin(scp);
846
847 if (oldstate == SVP_CS_WINDDOWN &&
848 (list_is_empty(&scp->sc_queries) || ret != SVP_RA_NONE)) {
849 ret = SVP_RA_CLEANUP;
850 }
851
852 if (ret == SVP_RA_NONE) {
853 int err;
854 if ((err = svp_event_associate(&scp->sc_event,
855 scp->sc_socket)) != 0) {
856 scp->sc_error = SVP_CE_ASSOCIATE;
857 scp->sc_errno = err;
858 scp->sc_cstate = SVP_CS_ERROR;
859 ret = SVP_RA_DEGRADE;
860 }
861 }
862 break;
863 default:
864 libvarpd_panic("svp_conn_handler encountered unexpected "
865 "state: %d", scp->sc_cstate);
866 }
867 out:
868 mutex_exit(&scp->sc_lock);
869
870 if (ret == SVP_RA_NONE)
871 return;
872
873 mutex_enter(&srp->sr_lock);
874 mutex_enter(&scp->sc_lock);
875 if (ret == SVP_RA_ERROR)
876 ret = svp_conn_reset(scp);
877
878 if (ret == SVP_RA_DEGRADE)
879 svp_conn_degrade(scp);
880 if (ret == SVP_RA_RESTORE)
881 svp_conn_restore(scp);
882
883 if (ret == SVP_RA_CLEANUP) {
884 svp_conn_remove(scp);
885 scp->sc_flags |= SVP_CF_REAP;
886 svp_conn_inject(scp);
887 }
888 mutex_exit(&scp->sc_lock);
889 mutex_exit(&srp->sr_lock);
890 }
891
892 static void
893 svp_conn_backtimer(void *arg)
894 {
895 svp_conn_t *scp = arg;
896
897 svp_conn_handler(NULL, scp);
898 }
899
900 /*
901 * This fires every svp_conn_query_timeout seconds. Its purpos is to determine
902 * if we haven't heard back on a request with in svp_conn_query_timeout seconds.
903 * If any of the svp_conn_query_t's that have been started (indicated by
904 * svp_query_t`sq_acttime != -1), and more than svp_conn_query_timeout seconds
905 * have passed, we basically tear this connection down and reassign outstanding
906 * queries.
907 */
908 static void
909 svp_conn_querytimer(void *arg)
910 {
911 int ret;
912 svp_query_t *sqp;
913 svp_conn_t *scp = arg;
914 hrtime_t now = gethrtime();
915
916 mutex_enter(&scp->sc_lock);
917
918 /*
919 * If we're not in the active state, then we don't care about this as
920 * we're already either going to die or we have no connections to worry
921 * about.
922 */
923 if (scp->sc_cstate != SVP_CS_ACTIVE) {
924 mutex_exit(&scp->sc_lock);
925 return;
926 }
927
928 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
929 sqp = list_next(&scp->sc_queries, sqp)) {
930 if (sqp->sq_acttime == -1)
931 continue;
932 if ((now - sqp->sq_acttime) / NANOSEC > svp_conn_query_timeout)
933 break;
934 }
935
936 /* Nothing timed out, we're good here */
937 if (sqp == NULL) {
938 mutex_exit(&scp->sc_lock);
939 return;
940 }
941
942 (void) bunyan_warn(svp_bunyan, "query timed out on connection",
943 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
944 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
945 BUNYAN_T_INT32, "operation", ntohs(sqp->sq_header.svp_op),
946 BUNYAN_T_END);
947
948 /*
949 * Begin the tear down process for this connect. If we lose the
950 * disassociate, then we don't inject an event. See the big theory
951 * statement in libvarpd_svp.c for more information.
952 */
953 scp->sc_flags |= SVP_CF_TEARDOWN;
954
955 ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket);
956 if (ret == 0)
957 svp_conn_inject(scp);
958 else
959 VERIFY(ret == ENOENT);
960
961 mutex_exit(&scp->sc_lock);
962 }
963
964 /*
965 * This connection has fallen out of DNS, figure out what we need to do with it.
966 */
967 void
968 svp_conn_fallout(svp_conn_t *scp)
969 {
970 svp_remote_t *srp = scp->sc_remote;
971
972 assert(MUTEX_HELD(&srp->sr_lock));
973
974 mutex_enter(&scp->sc_lock);
975 switch (scp->sc_cstate) {
976 case SVP_CS_ERROR:
977 /*
978 * Connection is already inactive, so it's safe to tear down.
979 * Fire it off through the state machine to tear down via the
980 * backoff timer.
981 */
982 svp_conn_remove(scp);
983 scp->sc_flags |= SVP_CF_REAP;
984 svp_conn_inject(scp);
985 break;
986 case SVP_CS_INITIAL:
987 case SVP_CS_BACKOFF:
988 case SVP_CS_CONNECTING:
989 /*
990 * Here, we have something actively going on, so we'll let it be
991 * clean up the next time we hit the event loop by the event
992 * loop itself. As it has no connections, there isn't much to
993 * really do, though we'll take this chance to go ahead and
994 * remove it from the remote.
995 */
996 svp_conn_remove(scp);
997 scp->sc_flags |= SVP_CF_REAP;
998 svp_conn_inject(scp);
999 break;
1000 case SVP_CS_ACTIVE:
1001 case SVP_CS_WINDDOWN:
1002 /*
1003 * If there are no outstanding queries, then we should simply
1004 * clean this up now,t he same way we would with the others.
1005 * Othewrise, as we know the event loop is ongoing, we'll make
1006 * sure that these entries get cleaned up once they're done.
1007 */
1008 scp->sc_cstate = SVP_CS_WINDDOWN;
1009 if (list_is_empty(&scp->sc_queries)) {
1010 svp_conn_remove(scp);
1011 scp->sc_flags |= SVP_CF_REAP;
1012 svp_conn_inject(scp);
1013 }
1014 break;
1015 default:
1016 libvarpd_panic("svp_conn_fallout encountered"
1017 "unkonwn state");
1018 }
1019 mutex_exit(&scp->sc_lock);
1020 mutex_exit(&srp->sr_lock);
1021 }
1022
1023 int
1024 svp_conn_create(svp_remote_t *srp, const struct in6_addr *addr)
1025 {
1026 int ret;
1027 svp_conn_t *scp;
1028
1029 assert(MUTEX_HELD(&srp->sr_lock));
1030 scp = umem_zalloc(sizeof (svp_conn_t), UMEM_DEFAULT);
1031 if (scp == NULL)
1032 return (ENOMEM);
1033
1034 if ((ret = mutex_init(&scp->sc_lock, USYNC_THREAD | LOCK_ERRORCHECK,
1035 NULL)) != 0) {
1036 umem_free(scp, sizeof (svp_conn_t));
1037 return (ret);
1038 }
1039
1040 scp->sc_remote = srp;
1041 scp->sc_event.se_func = svp_conn_handler;
1042 scp->sc_event.se_arg = scp;
1043 scp->sc_btimer.st_func = svp_conn_backtimer;
1044 scp->sc_btimer.st_arg = scp;
1045 scp->sc_btimer.st_oneshot = B_TRUE;
1046 scp->sc_btimer.st_value = 1;
1047
1048 scp->sc_qtimer.st_func = svp_conn_querytimer;
1049 scp->sc_qtimer.st_arg = scp;
1050 scp->sc_qtimer.st_oneshot = B_FALSE;
1051 scp->sc_qtimer.st_value = svp_conn_query_timeout;
1052
1053 scp->sc_socket = -1;
1054
1055 list_create(&scp->sc_queries, sizeof (svp_query_t),
1056 offsetof(svp_query_t, sq_lnode));
1057 scp->sc_gen = srp->sr_gen;
1058 bcopy(addr, &scp->sc_addr, sizeof (struct in6_addr));
1059 scp->sc_cstate = SVP_CS_INITIAL;
1060 mutex_enter(&scp->sc_lock);
1061 svp_conn_add(scp);
1062 mutex_exit(&scp->sc_lock);
1063
1064 /* Now that we're locked and loaded, add our timers */
1065 svp_timer_add(&scp->sc_qtimer);
1066 svp_timer_add(&scp->sc_btimer);
1067
1068 return (0);
1069 }
1070
1071 /*
1072 * At the time of calling, the entry has been removed from all lists. In
1073 * addition, the entries state should be SVP_CS_ERROR, therefore, we know that
1074 * the fd should not be associated with the event loop. We'll double check that
1075 * just in case. We should also have already been removed from the remote's
1076 * list.
1077 */
1078 void
1079 svp_conn_destroy(svp_conn_t *scp)
1080 {
1081 int ret;
1082
1083 mutex_enter(&scp->sc_lock);
1084 if (scp->sc_cstate != SVP_CS_ERROR)
1085 libvarpd_panic("asked to tear down an active connection");
1086 if (scp->sc_flags & SVP_CF_ADDED)
1087 libvarpd_panic("asked to remove a connection still in "
1088 "the remote list\n");
1089 if (!list_is_empty(&scp->sc_queries))
1090 libvarpd_panic("asked to remove a connection with non-empty "
1091 "query list");
1092
1093 if ((ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket)) !=
1094 ENOENT) {
1095 libvarpd_panic("dissociate failed or was actually "
1096 "associated: %d", ret);
1097 }
1098 mutex_exit(&scp->sc_lock);
1099
1100 /* Verify our timers are killed */
1101 svp_timer_remove(&scp->sc_btimer);
1102 svp_timer_remove(&scp->sc_qtimer);
1103
1104 if (scp->sc_socket != -1 && close(scp->sc_socket) != 0)
1105 libvarpd_panic("failed to close svp_conn_t`scp_socket fd "
1106 "%d: %d", scp->sc_socket, errno);
1107
1108 list_destroy(&scp->sc_queries);
1109 umem_free(scp, sizeof (svp_conn_t));
1110 }
1111
1112 void
1113 svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp)
1114 {
1115 assert(MUTEX_HELD(&scp->sc_lock));
1116 assert(scp->sc_cstate == SVP_CS_ACTIVE);
1117
1118 sqp->sq_acttime = -1;
1119 list_insert_tail(&scp->sc_queries, sqp);
1120 if (!(scp->sc_event.se_events & POLLOUT)) {
1121 scp->sc_event.se_events |= POLLOUT;
1122 /*
1123 * If this becomes frequent, we should instead give up on this
1124 * set of connections instead of aborting.
1125 */
1126 if (svp_event_associate(&scp->sc_event, scp->sc_socket) != 0)
1127 libvarpd_panic("svp_event_associate failed somehow");
1128 }
1129 }