1 /*
2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
5 * 1.0 of the CDDL.
6 *
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
10 */
11
12 /*
13 * Copyright 2015 Joyent, Inc.
14 */
15
16 /*
17 * Logic to manage an individual connection to a remote host.
18 *
19 * For more information, see the big theory statement in
20 * lib/varpd/svp/common/libvarpd_svp.c.
21 */
22
23 #include <assert.h>
24 #include <umem.h>
25 #include <errno.h>
26 #include <strings.h>
27 #include <unistd.h>
28 #include <stddef.h>
29 #include <sys/uio.h>
30 #include <sys/debug.h>
31
32 #include <libvarpd_svp.h>
33
34 int svp_conn_query_timeout = 30;
35 static int svp_conn_backoff_tbl[] = { 1, 2, 4, 8, 16, 32 };
36 static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int);
37
38 typedef enum svp_conn_act {
39 SVP_RA_NONE = 0x00,
40 SVP_RA_DEGRADE = 0x01,
41 SVP_RA_RESTORE = 0x02,
42 SVP_RA_ERROR = 0x03,
43 SVP_RA_CLEANUP = 0x04,
44 SVP_RA_FIND_VERSION = 0x05
45 } svp_conn_act_t;
46
47 static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
48
49 static void
50 svp_conn_inject(svp_conn_t *scp)
51 {
52 int ret;
53 assert(MUTEX_HELD(&scp->sc_lock));
54
55 if (scp->sc_flags & SVP_CF_USER)
56 return;
57 scp->sc_flags |= SVP_CF_USER;
58 if ((ret = svp_event_inject(&scp->sc_event)) != 0)
59 libvarpd_panic("failed to inject event: %d\n", ret);
60 }
61
62 static void
63 svp_conn_degrade(svp_conn_t *scp)
64 {
65 svp_remote_t *srp = scp->sc_remote;
66
67 assert(MUTEX_HELD(&srp->sr_lock));
68 assert(MUTEX_HELD(&scp->sc_lock));
69
70 if (scp->sc_flags & SVP_CF_DEGRADED)
71 return;
72
73 scp->sc_flags |= SVP_CF_DEGRADED;
74 srp->sr_ndconns++;
75 if (srp->sr_ndconns == srp->sr_tconns)
76 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
77 }
78
79 static void
80 svp_conn_restore(svp_conn_t *scp)
81 {
82 svp_remote_t *srp = scp->sc_remote;
83
84 assert(MUTEX_HELD(&srp->sr_lock));
85 assert(MUTEX_HELD(&scp->sc_lock));
86
87 if (!(scp->sc_flags & SVP_CF_DEGRADED))
88 return;
89
90 scp->sc_flags &= ~SVP_CF_DEGRADED;
91 if (srp->sr_ndconns == srp->sr_tconns)
92 svp_remote_restore(srp, SVP_RD_REMOTE_FAIL);
93 srp->sr_ndconns--;
94 }
95
96 static svp_conn_act_t
97 svp_conn_pong_handler(svp_conn_t *scp, svp_query_t *sqp)
98 {
99 uint16_t remote_version = ntohs(scp->sc_input.sci_req.svp_ver);
100
101 if (scp->sc_cstate == SVP_CS_VERSIONING) {
102 /* Transition VERSIONING -> ACTIVE. */
103 assert(scp->sc_version == 0);
104 if (remote_version == 0 || remote_version > SVP_CURRENT_VERSION)
105 return (SVP_RA_ERROR);
106 scp->sc_version = remote_version;
107 scp->sc_cstate = SVP_CS_ACTIVE;
108 }
109
110 return (SVP_RA_NONE);
111 }
112
113 static void
114 svp_conn_ping_cb(svp_query_t *sqp, void *arg)
115 {
116 size_t len = (size_t)arg;
117
118 assert(len == sizeof (svp_query_t));
119 umem_free(sqp, len);
120 }
121
122 static svp_conn_act_t
123 svp_conn_ping_version(svp_conn_t *scp)
124 {
125 svp_remote_t *srp = scp->sc_remote;
126 svp_query_t *sqp = umem_zalloc(sizeof (svp_query_t), UMEM_DEFAULT);
127 int ret;
128
129 assert(MUTEX_HELD(&srp->sr_lock));
130 assert(MUTEX_HELD(&scp->sc_lock));
131 assert(scp->sc_cstate == SVP_CS_CONNECTING);
132
133 if (sqp == NULL)
134 return (SVP_RA_ERROR);
135
136 /* Only set things that need to be non-0/non-NULL. */
137 sqp->sq_state = SVP_QUERY_INIT;
138 sqp->sq_func = svp_conn_ping_cb;
139 sqp->sq_arg = (void *)sizeof (svp_query_t);
140 sqp->sq_header.svp_op = htons(SVP_R_PING);
141 sqp->sq_header.svp_ver = htons(SVP_CURRENT_VERSION);
142 sqp->sq_header.svp_id = svp_id_alloc();
143 if (sqp->sq_header.svp_id == -1) {
144 umem_free(sqp, sizeof (svp_query_t));
145 return (SVP_RA_ERROR);
146 }
147
148 scp->sc_cstate = SVP_CS_VERSIONING;
149 /* Set the event flags now... */
150 scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP | POLLOUT;
151 /* ...so I can just queue it up directly... */
152 svp_conn_queue(scp, sqp);
153 /* ... and then associate the event port myself. */
154 ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
155 if (ret == 0)
156 return (SVP_RA_RESTORE);
157 scp->sc_error = SVP_CE_ASSOCIATE;
158 scp->sc_errno = ret;
159 scp->sc_cstate = SVP_CS_ERROR;
160 umem_free(sqp, sizeof (svp_query_t));
161 return (SVP_RA_DEGRADE);
162 }
163
164 static void
165 svp_conn_add(svp_conn_t *scp)
166 {
167 svp_remote_t *srp = scp->sc_remote;
168
169 assert(MUTEX_HELD(&srp->sr_lock));
170 assert(MUTEX_HELD(&scp->sc_lock));
171
172 if (scp->sc_flags & SVP_CF_ADDED)
173 return;
174
175 list_insert_tail(&srp->sr_conns, scp);
176 scp->sc_flags |= SVP_CF_ADDED;
177 srp->sr_tconns++;
178 }
179
180 static void
181 svp_conn_remove(svp_conn_t *scp)
182 {
183 svp_remote_t *srp = scp->sc_remote;
184
185 assert(MUTEX_HELD(&srp->sr_lock));
186 assert(MUTEX_HELD(&scp->sc_lock));
187
188 if (!(scp->sc_flags & SVP_CF_ADDED))
189 return;
190
191 scp->sc_flags &= ~SVP_CF_ADDED;
192 if (scp->sc_flags & SVP_CF_DEGRADED)
193 srp->sr_ndconns--;
194 srp->sr_tconns--;
195 if (srp->sr_tconns == srp->sr_ndconns)
196 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
197 }
198
199 static svp_query_t *
200 svp_conn_query_find(svp_conn_t *scp, uint32_t id)
201 {
202 svp_query_t *sqp;
203
204 assert(MUTEX_HELD(&scp->sc_lock));
205
206 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
207 sqp = list_next(&scp->sc_queries, sqp)) {
208 if (sqp->sq_header.svp_id == id)
209 break;
210 }
211
212 return (sqp);
213 }
214
215 static svp_conn_act_t
216 svp_conn_backoff(svp_conn_t *scp)
217 {
218 assert(MUTEX_HELD(&scp->sc_lock));
219
220 if (close(scp->sc_socket) != 0)
221 libvarpd_panic("failed to close socket %d: %d\n",
222 scp->sc_socket, errno);
223 scp->sc_socket = -1;
224
225 scp->sc_cstate = SVP_CS_BACKOFF;
226 scp->sc_nbackoff++;
227 if (scp->sc_nbackoff >= svp_conn_nbackoff) {
228 scp->sc_btimer.st_value =
229 svp_conn_backoff_tbl[svp_conn_nbackoff - 1];
230 } else {
231 scp->sc_btimer.st_value =
232 svp_conn_backoff_tbl[scp->sc_nbackoff - 1];
233 }
234 svp_timer_add(&scp->sc_btimer);
235
236 if (scp->sc_nbackoff > svp_conn_nbackoff)
237 return (SVP_RA_DEGRADE);
238 return (SVP_RA_NONE);
239 }
240
241 static svp_conn_act_t
242 svp_conn_connect(svp_conn_t *scp)
243 {
244 int ret;
245 struct sockaddr_in6 in6;
246
247 assert(MUTEX_HELD(&scp->sc_lock));
248 assert(scp->sc_cstate == SVP_CS_BACKOFF ||
249 scp->sc_cstate == SVP_CS_INITIAL);
250 assert(scp->sc_socket == -1);
251 if (scp->sc_cstate == SVP_CS_INITIAL)
252 scp->sc_nbackoff = 0;
253
254 /* New connect means we need to know the version. */
255 scp->sc_version = 0;
256
257 scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
258 if (scp->sc_socket == -1) {
259 scp->sc_error = SVP_CE_SOCKET;
260 scp->sc_errno = errno;
261 scp->sc_cstate = SVP_CS_ERROR;
262 return (SVP_RA_DEGRADE);
263 }
264
265 bzero(&in6, sizeof (struct sockaddr_in6));
266 in6.sin6_family = AF_INET6;
267 in6.sin6_port = htons(scp->sc_remote->sr_rport);
268 bcopy(&scp->sc_addr, &in6.sin6_addr, sizeof (struct in6_addr));
269 ret = connect(scp->sc_socket, (struct sockaddr *)&in6,
270 sizeof (struct sockaddr_in6));
271 if (ret != 0) {
272 boolean_t async = B_FALSE;
273
274 switch (errno) {
275 case EACCES:
276 case EADDRINUSE:
277 case EAFNOSUPPORT:
278 case EALREADY:
279 case EBADF:
280 case EISCONN:
281 case ELOOP:
282 case ENOENT:
283 case ENOSR:
284 case EWOULDBLOCK:
285 libvarpd_panic("unanticipated connect errno %d", errno);
286 break;
287 case EINPROGRESS:
288 case EINTR:
289 async = B_TRUE;
290 default:
291 break;
292 }
293
294 /*
295 * So, we will be connecting to this in the future, advance our
296 * state and make sure that we poll for the next round.
297 */
298 if (async == B_TRUE) {
299 scp->sc_cstate = SVP_CS_CONNECTING;
300 scp->sc_event.se_events = POLLOUT | POLLHUP;
301 ret = svp_event_associate(&scp->sc_event,
302 scp->sc_socket);
303 if (ret == 0)
304 return (SVP_RA_NONE);
305 scp->sc_error = SVP_CE_ASSOCIATE;
306 scp->sc_errno = ret;
307 scp->sc_cstate = SVP_CS_ERROR;
308 return (SVP_RA_DEGRADE);
309 } else {
310 /*
311 * This call failed, which means that we obtained one of
312 * the following:
313 *
314 * EADDRNOTAVAIL
315 * ECONNREFUSED
316 * EIO
317 * ENETUNREACH
318 * EHOSTUNREACH
319 * ENXIO
320 * ETIMEDOUT
321 *
322 * Therefore we need to set ourselves into backoff and
323 * wait for that to clear up.
324 */
325 return (svp_conn_backoff(scp));
326 }
327 }
328
329 /* Immediately successful connection, move to SVP_CS_VERSIONING. */
330 return (svp_conn_poll_connect(NULL, scp));
331 }
332
333 /*
334 * This should be the first call we get after a successful synchronous
335 * connect, or a completed (failed or successful) asynchronous connect. A
336 * non-NULL port-event indicates asynchronous completion, a NULL port-event
337 * indicates a successful synchronous connect.
338 *
339 * If we have successfully connected, we should see a writeable event. In the
340 * asynchronous case, we may also see an error or a hang up. For either hang
341 * up or error, we transition to error mode. If there is also a readable event
342 * (i.e. incoming data), we ignore it at the moment and just let a
343 * reassociation pick it up so we can simplify the set of state transitions
344 * that we have.
345 */
346 static svp_conn_act_t
347 svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
348 {
349 int ret;
350 svp_conn_error_t version_error;
351
352 if (pe != NULL) {
353 int err;
354 socklen_t sl = sizeof (err);
355
356 /*
357 * These bits only matter if we're notified of an
358 * asynchronous connection completion.
359 */
360 if (!(pe->portev_events & POLLOUT)) {
361 scp->sc_errno = 0;
362 scp->sc_error = SVP_CE_NOPOLLOUT;
363 scp->sc_cstate = SVP_CS_ERROR;
364 return (SVP_RA_DEGRADE);
365 }
366
367 ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err,
368 &sl);
369 if (ret != 0)
370 libvarpd_panic("unanticipated getsockopt error");
371 if (err != 0) {
372 return (svp_conn_backoff(scp));
373 }
374 }
375
376 return (SVP_RA_FIND_VERSION);
377 }
378
379 static svp_conn_act_t
380 svp_conn_pollout(svp_conn_t *scp)
381 {
382 svp_query_t *sqp;
383 svp_req_t *req;
384 size_t off;
385 struct iovec iov[2];
386 int nvecs = 0;
387 ssize_t ret;
388
389 assert(MUTEX_HELD(&scp->sc_lock));
390
391 /*
392 * We need to find a query and start writing it out.
393 */
394 if (scp->sc_output.sco_query == NULL) {
395 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
396 sqp = list_next(&scp->sc_queries, sqp)) {
397 if (sqp->sq_state != SVP_QUERY_INIT)
398 continue;
399 break;
400 }
401
402 if (sqp == NULL) {
403 scp->sc_event.se_events &= ~POLLOUT;
404 return (SVP_RA_NONE);
405 }
406
407 scp->sc_output.sco_query = sqp;
408 scp->sc_output.sco_offset = 0;
409 sqp->sq_state = SVP_QUERY_WRITING;
410 svp_query_crc32(&sqp->sq_header, sqp->sq_rdata, sqp->sq_rsize);
411 }
412
413 sqp = scp->sc_output.sco_query;
414 req = &sqp->sq_header;
415 off = scp->sc_output.sco_offset;
416 if (off < sizeof (svp_req_t)) {
417 iov[nvecs].iov_base = (void *)((uintptr_t)req + off);
418 iov[nvecs].iov_len = sizeof (svp_req_t) - off;
419 nvecs++;
420 off = 0;
421 } else {
422 off -= sizeof (svp_req_t);
423 }
424
425 iov[nvecs].iov_base = (void *)((uintptr_t)sqp->sq_rdata + off);
426 iov[nvecs].iov_len = sqp->sq_rsize - off;
427 nvecs++;
428
429 do {
430 ret = writev(scp->sc_socket, iov, nvecs);
431 } while (ret == -1 && errno == EINTR);
432 if (ret == -1) {
433 switch (errno) {
434 case EAGAIN:
435 scp->sc_event.se_events |= POLLOUT;
436 return (SVP_RA_NONE);
437 case EIO:
438 case ENXIO:
439 case ECONNRESET:
440 return (SVP_RA_ERROR);
441 default:
442 libvarpd_panic("unexpected errno: %d", errno);
443 }
444 }
445
446 sqp->sq_acttime = gethrtime();
447 scp->sc_output.sco_offset += ret;
448 if (ret >= sizeof (svp_req_t) + sqp->sq_rsize) {
449 sqp->sq_state = SVP_QUERY_READING;
450 scp->sc_output.sco_query = NULL;
451 scp->sc_output.sco_offset = 0;
452 scp->sc_event.se_events |= POLLOUT;
453 }
454 return (SVP_RA_NONE);
455 }
456
457 static boolean_t
458 svp_conn_pollin_validate(svp_conn_t *scp)
459 {
460 svp_query_t *sqp;
461 uint32_t nsize, expected_size = 0;
462 uint16_t nvers, nop;
463 svp_req_t *resp = &scp->sc_input.sci_req;
464
465 assert(MUTEX_HELD(&scp->sc_lock));
466
467 nvers = ntohs(resp->svp_ver);
468 nop = ntohs(resp->svp_op);
469 nsize = ntohl(resp->svp_size);
470
471 /*
472 * A peer that's messing with post-connection version changes is
473 * likely a broken peer.
474 */
475 if (scp->sc_cstate != SVP_CS_VERSIONING && nvers != scp->sc_version) {
476 (void) bunyan_warn(svp_bunyan, "version mismatch",
477 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
478 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
479 BUNYAN_T_INT32, "peer version", nvers,
480 BUNYAN_T_INT32, "our version", scp->sc_version,
481 BUNYAN_T_INT32, "operation", nop,
482 BUNYAN_T_INT32, "response_id", resp->svp_id,
483 BUNYAN_T_END);
484 return (B_FALSE);
485 }
486
487 switch (nop) {
488 case SVP_R_VL2_ACK:
489 expected_size = sizeof (svp_vl2_ack_t);
490 break;
491 case SVP_R_VL3_ACK:
492 expected_size = sizeof (svp_vl3_ack_t);
493 break;
494 case SVP_R_LOG_RM_ACK:
495 expected_size = sizeof (svp_lrm_ack_t);
496 break;
497 case SVP_R_ROUTE_ACK:
498 expected_size = sizeof (svp_route_ack_t);
499 break;
500 case SVP_R_LOG_ACK:
501 case SVP_R_PONG:
502 /* No expected size (LOG_ACK) or size is 0 (PONG). */
503 break;
504 default:
505 (void) bunyan_warn(svp_bunyan, "unsupported operation",
506 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
507 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
508 BUNYAN_T_INT32, "version", nvers,
509 BUNYAN_T_INT32, "operation", nop,
510 BUNYAN_T_INT32, "response_id", resp->svp_id,
511 BUNYAN_T_END);
512 return (B_FALSE);
513 }
514
515 sqp = svp_conn_query_find(scp, resp->svp_id);
516 if (sqp == NULL) {
517 (void) bunyan_warn(svp_bunyan, "unknown response id",
518 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
519 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
520 BUNYAN_T_INT32, "version", nvers,
521 BUNYAN_T_INT32, "operation", nop,
522 BUNYAN_T_INT32, "response_id", resp->svp_id,
523 BUNYAN_T_END);
524 return (B_FALSE);
525 }
526
527 if (sqp->sq_state != SVP_QUERY_READING) {
528 (void) bunyan_warn(svp_bunyan,
529 "got response for unexpecting query",
530 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
531 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
532 BUNYAN_T_INT32, "version", nvers,
533 BUNYAN_T_INT32, "operation", nop,
534 BUNYAN_T_INT32, "response_id", resp->svp_id,
535 BUNYAN_T_INT32, "query_state", sqp->sq_state,
536 BUNYAN_T_END);
537 return (B_FALSE);
538 }
539
540 if (nop != SVP_R_LOG_RM_ACK && nsize != expected_size) {
541 (void) bunyan_warn(svp_bunyan, "response size too large",
542 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
543 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
544 BUNYAN_T_INT32, "version", nvers,
545 BUNYAN_T_INT32, "operation", nop,
546 BUNYAN_T_INT32, "response_id", resp->svp_id,
547 BUNYAN_T_INT32, "response_size", nsize,
548 BUNYAN_T_INT32, "expected_size", nop == SVP_R_VL2_ACK ?
549 sizeof (svp_vl2_ack_t) : sizeof (svp_vl3_ack_t),
550 BUNYAN_T_INT32, "query_state", sqp->sq_state,
551 BUNYAN_T_END);
552 return (B_FALSE);
553 }
554
555 /*
556 * The valid size is anything <= to what the user requested, but at
557 * least svp_log_ack_t bytes large.
558 */
559 if (nop == SVP_R_LOG_ACK) {
560 const char *msg = NULL;
561 if (nsize < sizeof (svp_log_ack_t))
562 msg = "response size too small";
563 else if (nsize > ((svp_log_req_t *)sqp->sq_rdata)->svlr_count)
564 msg = "response size too large";
565 if (msg != NULL) {
566 (void) bunyan_warn(svp_bunyan, msg,
567 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
568 BUNYAN_T_INT32, "remote_port",
569 scp->sc_remote->sr_rport,
570 BUNYAN_T_INT32, "version", nvers,
571 BUNYAN_T_INT32, "operation", nop,
572 BUNYAN_T_INT32, "response_id", resp->svp_id,
573 BUNYAN_T_INT32, "response_size", nsize,
574 BUNYAN_T_INT32, "expected_size",
575 ((svp_log_req_t *)sqp->sq_rdata)->svlr_count,
576 BUNYAN_T_INT32, "query_state", sqp->sq_state,
577 BUNYAN_T_END);
578 return (B_FALSE);
579 }
580 }
581
582 sqp->sq_size = nsize;
583 scp->sc_input.sci_query = sqp;
584 if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK ||
585 nop == SVP_R_LOG_RM_ACK || nop == SVP_R_ROUTE_ACK ||
586 nop == SVP_R_PONG) {
587 sqp->sq_wdata = &sqp->sq_wdun;
588 sqp->sq_wsize = sizeof (svp_query_data_t);
589 } else {
590 VERIFY(nop == SVP_R_LOG_ACK);
591 assert(sqp->sq_wdata != NULL);
592 assert(sqp->sq_wsize != 0);
593 }
594
595 return (B_TRUE);
596 }
597
598 static svp_conn_act_t
599 svp_conn_pollin(svp_conn_t *scp)
600 {
601 size_t off, total;
602 ssize_t ret;
603 svp_query_t *sqp;
604 uint32_t crc;
605 uint16_t nop;
606
607 assert(MUTEX_HELD(&scp->sc_lock));
608
609 /*
610 * No query implies that we're reading in the header and that the offset
611 * is associted with it.
612 */
613 off = scp->sc_input.sci_offset;
614 sqp = scp->sc_input.sci_query;
615 if (scp->sc_input.sci_query == NULL) {
616 svp_req_t *resp = &scp->sc_input.sci_req;
617
618 assert(off < sizeof (svp_req_t));
619
620 do {
621 ret = read(scp->sc_socket,
622 (void *)((uintptr_t)resp + off),
623 sizeof (svp_req_t) - off);
624 } while (ret == -1 && errno == EINTR);
625 if (ret == -1) {
626 switch (errno) {
627 case EAGAIN:
628 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
629 return (SVP_RA_NONE);
630 case EIO:
631 case ECONNRESET:
632 return (SVP_RA_ERROR);
633 break;
634 default:
635 libvarpd_panic("unexpeted read errno: %d",
636 errno);
637 }
638 } else if (ret == 0) {
639 /* Try to reconnect to the remote host */
640 return (SVP_RA_ERROR);
641 }
642
643 /* Didn't get all the data we need */
644 if (off + ret < sizeof (svp_req_t)) {
645 scp->sc_input.sci_offset += ret;
646 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
647 return (SVP_RA_NONE);
648 }
649
650 if (svp_conn_pollin_validate(scp) != B_TRUE)
651 return (SVP_RA_ERROR);
652 }
653
654 sqp = scp->sc_input.sci_query;
655 assert(sqp != NULL);
656 sqp->sq_acttime = gethrtime();
657 total = ntohl(scp->sc_input.sci_req.svp_size);
658 do {
659 ret = read(scp->sc_socket,
660 (void *)((uintptr_t)sqp->sq_wdata + off),
661 total - off);
662 } while (ret == -1 && errno == EINTR);
663
664 if (ret == -1) {
665 switch (errno) {
666 case EAGAIN:
667 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
668 return (SVP_RA_NONE);
669 case EIO:
670 case ECONNRESET:
671 return (SVP_RA_ERROR);
672 break;
673 default:
674 libvarpd_panic("unexpeted read errno: %d", errno);
675 }
676 } else if (ret == 0 && total - off > 0) {
677 /* Try to reconnect to the remote host */
678 return (SVP_RA_ERROR);
679 }
680
681 if (ret + off < total) {
682 scp->sc_input.sci_offset += ret;
683 return (SVP_RA_NONE);
684 }
685
686 nop = ntohs(scp->sc_input.sci_req.svp_op);
687 crc = scp->sc_input.sci_req.svp_crc32;
688 svp_query_crc32(&scp->sc_input.sci_req, sqp->sq_wdata, total);
689 if (crc != scp->sc_input.sci_req.svp_crc32) {
690 (void) bunyan_info(svp_bunyan, "crc32 mismatch",
691 BUNYAN_T_IP, "remote ip", &scp->sc_addr,
692 BUNYAN_T_INT32, "remote port", scp->sc_remote->sr_rport,
693 BUNYAN_T_INT32, "version",
694 ntohs(scp->sc_input.sci_req.svp_ver),
695 BUNYAN_T_INT32, "operation", nop,
696 BUNYAN_T_INT32, "response id",
697 ntohl(scp->sc_input.sci_req.svp_id),
698 BUNYAN_T_INT32, "query state", sqp->sq_state,
699 BUNYAN_T_UINT32, "msg_crc", ntohl(crc),
700 BUNYAN_T_UINT32, "calc_crc",
701 ntohl(scp->sc_input.sci_req.svp_crc32),
702 BUNYAN_T_END);
703 return (SVP_RA_ERROR);
704 }
705 scp->sc_input.sci_query = NULL;
706 scp->sc_input.sci_offset = 0;
707
708 if (nop == SVP_R_VL2_ACK) {
709 svp_vl2_ack_t *sl2a = sqp->sq_wdata;
710 sqp->sq_status = ntohl(sl2a->sl2a_status);
711 } else if (nop == SVP_R_VL3_ACK) {
712 svp_vl3_ack_t *sl3a = sqp->sq_wdata;
713 sqp->sq_status = ntohl(sl3a->sl3a_status);
714 } else if (nop == SVP_R_LOG_ACK) {
715 svp_log_ack_t *svla = sqp->sq_wdata;
716 sqp->sq_status = ntohl(svla->svla_status);
717 } else if (nop == SVP_R_LOG_RM_ACK) {
718 svp_lrm_ack_t *svra = sqp->sq_wdata;
719 sqp->sq_status = ntohl(svra->svra_status);
720 } else if (nop == SVP_R_ROUTE_ACK) {
721 svp_route_ack_t *sra = sqp->sq_wdata;
722 sqp->sq_status = ntohl(sra->sra_status);
723 } else if (nop == SVP_R_PONG) {
724 /*
725 * Handle the PONG versioning-capture here, as we need
726 * the version number, the scp_lock held, and the ability
727 * to error out.
728 */
729 svp_conn_act_t cbret;
730
731 cbret = svp_conn_pong_handler(scp, sqp);
732 if (cbret != SVP_RA_NONE)
733 return (cbret);
734 } else {
735 libvarpd_panic("unhandled nop: %d", nop);
736 }
737
738 list_remove(&scp->sc_queries, sqp);
739 mutex_exit(&scp->sc_lock);
740
741 /*
742 * We have to release all of our resources associated with this entry
743 * before we call the callback. After we call it, the memory will be
744 * lost to time.
745 */
746 svp_query_release(sqp);
747 sqp->sq_func(sqp, sqp->sq_arg);
748 mutex_enter(&scp->sc_lock);
749 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
750
751 return (SVP_RA_NONE);
752 }
753
754 static svp_conn_act_t
755 svp_conn_reset(svp_conn_t *scp)
756 {
757 svp_remote_t *srp = scp->sc_remote;
758
759 assert(MUTEX_HELD(&srp->sr_lock));
760 assert(MUTEX_HELD(&scp->sc_lock));
761
762 assert(svp_event_dissociate(&scp->sc_event, scp->sc_socket) ==
763 ENOENT);
764 if (close(scp->sc_socket) != 0)
765 libvarpd_panic("failed to close socket %d: %d", scp->sc_socket,
766 errno);
767 scp->sc_flags &= ~SVP_CF_TEARDOWN;
768 scp->sc_socket = -1;
769 scp->sc_cstate = SVP_CS_INITIAL;
770 scp->sc_input.sci_query = NULL;
771 scp->sc_output.sco_query = NULL;
772
773 svp_remote_reassign(srp, scp);
774
775 return (svp_conn_connect(scp));
776 }
777
778 /*
779 * This is our general state transition function. We're called here when we want
780 * to advance part of our state machine as well as to re-arm ourselves. We can
781 * also end up here from the standard event loop as a result of having a user
782 * event posted.
783 */
784 static void
785 svp_conn_handler(port_event_t *pe, void *arg)
786 {
787 svp_conn_t *scp = arg;
788 svp_remote_t *srp = scp->sc_remote;
789 svp_conn_act_t ret = SVP_RA_NONE;
790 svp_conn_state_t oldstate;
791
792 mutex_enter(&scp->sc_lock);
793
794 /*
795 * Check if one of our event interrupts is set. An event interrupt, such
796 * as having to be reaped or be torndown is notified by a
797 * PORT_SOURCE_USER event that tries to take care of this. However,
798 * because of the fact that the event loop can be ongoing despite this,
799 * we may get here before the PORT_SOURCE_USER has casued us to get
800 * here. In such a case, if the PORT_SOURCE_USER event is tagged, then
801 * we're going to opt to do nothing here and wait for it to come and
802 * tear us down. That will also indicate to us that we have nothing to
803 * worry about as far as general timing and the like goes.
804 */
805 if ((scp->sc_flags & SVP_CF_UFLAG) != 0 &&
806 (scp->sc_flags & SVP_CF_USER) != 0 &&
807 pe != NULL &&
808 pe->portev_source != PORT_SOURCE_USER) {
809 mutex_exit(&scp->sc_lock);
810 return;
811 }
812
813 if (pe != NULL && pe->portev_source == PORT_SOURCE_USER) {
814 scp->sc_flags &= ~SVP_CF_USER;
815 if ((scp->sc_flags & SVP_CF_UFLAG) == 0) {
816 mutex_exit(&scp->sc_lock);
817 return;
818 }
819 }
820
821 /* Check if this needs to be freed */
822 if (scp->sc_flags & SVP_CF_REAP) {
823 mutex_exit(&scp->sc_lock);
824 svp_conn_destroy(scp);
825 return;
826 }
827
828 /* Check if this needs to be reset */
829 if (scp->sc_flags & SVP_CF_TEARDOWN) {
830 /* Make sure any other users of this are disassociated */
831 ret = SVP_RA_ERROR;
832 goto out;
833 }
834
835 switch (scp->sc_cstate) {
836 case SVP_CS_INITIAL:
837 case SVP_CS_BACKOFF:
838 assert(pe == NULL);
839 ret = svp_conn_connect(scp);
840 break;
841 case SVP_CS_CONNECTING:
842 assert(pe != NULL);
843 ret = svp_conn_poll_connect(pe, scp);
844 break;
845 case SVP_CS_VERSIONING:
846 case SVP_CS_ACTIVE:
847 case SVP_CS_WINDDOWN:
848 assert(pe != NULL);
849 oldstate = scp->sc_cstate;
850 if (pe->portev_events & POLLOUT)
851 ret = svp_conn_pollout(scp);
852 if (ret == SVP_RA_NONE && (pe->portev_events & POLLIN))
853 ret = svp_conn_pollin(scp);
854
855 if (oldstate == SVP_CS_WINDDOWN &&
856 (list_is_empty(&scp->sc_queries) || ret != SVP_RA_NONE)) {
857 ret = SVP_RA_CLEANUP;
858 }
859
860 if (ret == SVP_RA_NONE) {
861 int err;
862 if ((err = svp_event_associate(&scp->sc_event,
863 scp->sc_socket)) != 0) {
864 scp->sc_error = SVP_CE_ASSOCIATE;
865 scp->sc_errno = err;
866 scp->sc_cstate = SVP_CS_ERROR;
867 ret = SVP_RA_DEGRADE;
868 }
869 }
870 break;
871 default:
872 libvarpd_panic("svp_conn_handler encountered unexpected "
873 "state: %d", scp->sc_cstate);
874 }
875 out:
876 mutex_exit(&scp->sc_lock);
877
878 if (ret == SVP_RA_NONE)
879 return;
880
881 mutex_enter(&srp->sr_lock);
882 mutex_enter(&scp->sc_lock);
883 if (ret == SVP_RA_FIND_VERSION)
884 ret = svp_conn_ping_version(scp);
885
886 if (ret == SVP_RA_ERROR)
887 ret = svp_conn_reset(scp);
888
889 if (ret == SVP_RA_DEGRADE)
890 svp_conn_degrade(scp);
891 if (ret == SVP_RA_RESTORE)
892 svp_conn_restore(scp);
893
894 if (ret == SVP_RA_CLEANUP) {
895 svp_conn_remove(scp);
896 scp->sc_flags |= SVP_CF_REAP;
897 svp_conn_inject(scp);
898 }
899 mutex_exit(&scp->sc_lock);
900 mutex_exit(&srp->sr_lock);
901 }
902
903 static void
904 svp_conn_backtimer(void *arg)
905 {
906 svp_conn_t *scp = arg;
907
908 svp_conn_handler(NULL, scp);
909 }
910
911 /*
912 * This fires every svp_conn_query_timeout seconds. Its purpos is to determine
913 * if we haven't heard back on a request with in svp_conn_query_timeout seconds.
914 * If any of the svp_conn_query_t's that have been started (indicated by
915 * svp_query_t`sq_acttime != -1), and more than svp_conn_query_timeout seconds
916 * have passed, we basically tear this connection down and reassign outstanding
917 * queries.
918 */
919 static void
920 svp_conn_querytimer(void *arg)
921 {
922 int ret;
923 svp_query_t *sqp;
924 svp_conn_t *scp = arg;
925 hrtime_t now = gethrtime();
926
927 mutex_enter(&scp->sc_lock);
928
929 /*
930 * If we're not in the active state, then we don't care about this as
931 * we're already either going to die or we have no connections to worry
932 * about.
933 */
934 if (scp->sc_cstate != SVP_CS_ACTIVE) {
935 mutex_exit(&scp->sc_lock);
936 return;
937 }
938
939 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
940 sqp = list_next(&scp->sc_queries, sqp)) {
941 if (sqp->sq_acttime == -1)
942 continue;
943 if ((now - sqp->sq_acttime) / NANOSEC > svp_conn_query_timeout)
944 break;
945 }
946
947 /* Nothing timed out, we're good here */
948 if (sqp == NULL) {
949 mutex_exit(&scp->sc_lock);
950 return;
951 }
952
953 (void) bunyan_warn(svp_bunyan, "query timed out on connection",
954 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
955 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
956 BUNYAN_T_INT32, "operation", ntohs(sqp->sq_header.svp_op),
957 BUNYAN_T_END);
958
959 /*
960 * Begin the tear down process for this connect. If we lose the
961 * disassociate, then we don't inject an event. See the big theory
962 * statement in libvarpd_svp.c for more information.
963 */
964 scp->sc_flags |= SVP_CF_TEARDOWN;
965
966 ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket);
967 if (ret == 0)
968 svp_conn_inject(scp);
969 else
970 VERIFY(ret == ENOENT);
971
972 mutex_exit(&scp->sc_lock);
973 }
974
975 /*
976 * This connection has fallen out of DNS, figure out what we need to do with it.
977 */
978 void
979 svp_conn_fallout(svp_conn_t *scp)
980 {
981 svp_remote_t *srp = scp->sc_remote;
982
983 assert(MUTEX_HELD(&srp->sr_lock));
984
985 mutex_enter(&scp->sc_lock);
986 switch (scp->sc_cstate) {
987 case SVP_CS_ERROR:
988 /*
989 * Connection is already inactive, so it's safe to tear down.
990 * Fire it off through the state machine to tear down via the
991 * backoff timer.
992 */
993 svp_conn_remove(scp);
994 scp->sc_flags |= SVP_CF_REAP;
995 svp_conn_inject(scp);
996 break;
997 case SVP_CS_INITIAL:
998 case SVP_CS_BACKOFF:
999 case SVP_CS_CONNECTING:
1000 /*
1001 * Here, we have something actively going on, so we'll let it be
1002 * clean up the next time we hit the event loop by the event
1003 * loop itself. As it has no connections, there isn't much to
1004 * really do, though we'll take this chance to go ahead and
1005 * remove it from the remote.
1006 */
1007 svp_conn_remove(scp);
1008 scp->sc_flags |= SVP_CF_REAP;
1009 svp_conn_inject(scp);
1010 break;
1011 case SVP_CS_ACTIVE:
1012 case SVP_CS_WINDDOWN:
1013 /*
1014 * If there are no outstanding queries, then we should simply
1015 * clean this up now,t he same way we would with the others.
1016 * Othewrise, as we know the event loop is ongoing, we'll make
1017 * sure that these entries get cleaned up once they're done.
1018 */
1019 scp->sc_cstate = SVP_CS_WINDDOWN;
1020 if (list_is_empty(&scp->sc_queries)) {
1021 svp_conn_remove(scp);
1022 scp->sc_flags |= SVP_CF_REAP;
1023 svp_conn_inject(scp);
1024 }
1025 break;
1026 default:
1027 libvarpd_panic("svp_conn_fallout encountered"
1028 "unkonwn state");
1029 }
1030 mutex_exit(&scp->sc_lock);
1031 mutex_exit(&srp->sr_lock);
1032 }
1033
1034 int
1035 svp_conn_create(svp_remote_t *srp, const struct in6_addr *addr)
1036 {
1037 int ret;
1038 svp_conn_t *scp;
1039
1040 assert(MUTEX_HELD(&srp->sr_lock));
1041 scp = umem_zalloc(sizeof (svp_conn_t), UMEM_DEFAULT);
1042 if (scp == NULL)
1043 return (ENOMEM);
1044
1045 if ((ret = mutex_init(&scp->sc_lock, USYNC_THREAD | LOCK_ERRORCHECK,
1046 NULL)) != 0) {
1047 umem_free(scp, sizeof (svp_conn_t));
1048 return (ret);
1049 }
1050
1051 scp->sc_remote = srp;
1052 scp->sc_event.se_func = svp_conn_handler;
1053 scp->sc_event.se_arg = scp;
1054 scp->sc_btimer.st_func = svp_conn_backtimer;
1055 scp->sc_btimer.st_arg = scp;
1056 scp->sc_btimer.st_oneshot = B_TRUE;
1057 scp->sc_btimer.st_value = 1;
1058
1059 scp->sc_qtimer.st_func = svp_conn_querytimer;
1060 scp->sc_qtimer.st_arg = scp;
1061 scp->sc_qtimer.st_oneshot = B_FALSE;
1062 scp->sc_qtimer.st_value = svp_conn_query_timeout;
1063
1064 scp->sc_socket = -1;
1065
1066 list_create(&scp->sc_queries, sizeof (svp_query_t),
1067 offsetof(svp_query_t, sq_lnode));
1068 scp->sc_gen = srp->sr_gen;
1069 bcopy(addr, &scp->sc_addr, sizeof (struct in6_addr));
1070 scp->sc_cstate = SVP_CS_INITIAL;
1071 mutex_enter(&scp->sc_lock);
1072 svp_conn_add(scp);
1073 mutex_exit(&scp->sc_lock);
1074
1075 /* Now that we're locked and loaded, add our timers */
1076 svp_timer_add(&scp->sc_qtimer);
1077 svp_timer_add(&scp->sc_btimer);
1078
1079 return (0);
1080 }
1081
1082 /*
1083 * At the time of calling, the entry has been removed from all lists. In
1084 * addition, the entries state should be SVP_CS_ERROR, therefore, we know that
1085 * the fd should not be associated with the event loop. We'll double check that
1086 * just in case. We should also have already been removed from the remote's
1087 * list.
1088 */
1089 void
1090 svp_conn_destroy(svp_conn_t *scp)
1091 {
1092 int ret;
1093
1094 mutex_enter(&scp->sc_lock);
1095 if (scp->sc_cstate != SVP_CS_ERROR)
1096 libvarpd_panic("asked to tear down an active connection");
1097 if (scp->sc_flags & SVP_CF_ADDED)
1098 libvarpd_panic("asked to remove a connection still in "
1099 "the remote list\n");
1100 if (!list_is_empty(&scp->sc_queries))
1101 libvarpd_panic("asked to remove a connection with non-empty "
1102 "query list");
1103
1104 if ((ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket)) !=
1105 ENOENT) {
1106 libvarpd_panic("dissociate failed or was actually "
1107 "associated: %d", ret);
1108 }
1109 mutex_exit(&scp->sc_lock);
1110
1111 /* Verify our timers are killed */
1112 svp_timer_remove(&scp->sc_btimer);
1113 svp_timer_remove(&scp->sc_qtimer);
1114
1115 if (scp->sc_socket != -1 && close(scp->sc_socket) != 0)
1116 libvarpd_panic("failed to close svp_conn_t`scp_socket fd "
1117 "%d: %d", scp->sc_socket, errno);
1118
1119 list_destroy(&scp->sc_queries);
1120 umem_free(scp, sizeof (svp_conn_t));
1121 }
1122
1123 void
1124 svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp)
1125 {
1126 assert(MUTEX_HELD(&scp->sc_lock));
1127 assert(scp->sc_cstate == SVP_CS_ACTIVE ||
1128 scp->sc_cstate == SVP_CS_VERSIONING);
1129
1130 sqp->sq_acttime = -1;
1131 list_insert_tail(&scp->sc_queries, sqp);
1132 if (!(scp->sc_event.se_events & POLLOUT)) {
1133 scp->sc_event.se_events |= POLLOUT;
1134 /*
1135 * If this becomes frequent, we should instead give up on this
1136 * set of connections instead of aborting.
1137 */
1138 if (svp_event_associate(&scp->sc_event, scp->sc_socket) != 0)
1139 libvarpd_panic("svp_event_associate failed somehow");
1140 }
1141 }