1 /*
2 * This file and its contents are supplied under the terms of the
3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 * You may only use this file in accordance with the terms of version
5 * 1.0 of the CDDL.
6 *
7 * A full copy of the text of the CDDL should have accompanied this
8 * source. A copy of the CDDL is also available via the Internet at
9 * http://www.illumos.org/license/CDDL.
10 */
11
12 /*
13 * Copyright 2015 Joyent, Inc.
14 */
15
16 /*
17 * Logic to manage an individual connection to a remote host.
18 *
19 * For more information, see the big theory statement in
20 * lib/varpd/svp/common/libvarpd_svp.c.
21 */
22
23 #include <assert.h>
24 #include <umem.h>
25 #include <errno.h>
26 #include <strings.h>
27 #include <unistd.h>
28 #include <stddef.h>
29 #include <sys/uio.h>
30 #include <sys/debug.h>
31
32 #include <libvarpd_svp.h>
33
34 int svp_conn_query_timeout = 30;
35 static int svp_conn_backoff_tbl[] = { 1, 2, 4, 8, 16, 32 };
36 static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int);
37
38 typedef enum svp_conn_act {
39 SVP_RA_NONE = 0x00,
40 SVP_RA_DEGRADE = 0x01,
41 SVP_RA_RESTORE = 0x02,
42 SVP_RA_ERROR = 0x03,
43 SVP_RA_CLEANUP = 0x04
44 } svp_conn_act_t;
45
46 static void
47 svp_conn_inject(svp_conn_t *scp)
48 {
49 int ret;
50 assert(MUTEX_HELD(&scp->sc_lock));
51
52 if (scp->sc_flags & SVP_CF_USER)
53 return;
54 scp->sc_flags |= SVP_CF_USER;
55 if ((ret = svp_event_inject(&scp->sc_event)) != 0)
56 libvarpd_panic("failed to inject event: %d\n", ret);
57 }
58
59 static void
60 svp_conn_degrade(svp_conn_t *scp)
61 {
62 svp_remote_t *srp = scp->sc_remote;
63
64 assert(MUTEX_HELD(&srp->sr_lock));
65 assert(MUTEX_HELD(&scp->sc_lock));
66
67 if (scp->sc_flags & SVP_CF_DEGRADED)
68 return;
69
70 scp->sc_flags |= SVP_CF_DEGRADED;
71 srp->sr_ndconns++;
72 if (srp->sr_ndconns == srp->sr_tconns)
73 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
74 }
75
76 static void
77 svp_conn_restore(svp_conn_t *scp)
78 {
79 svp_remote_t *srp = scp->sc_remote;
80
81 assert(MUTEX_HELD(&srp->sr_lock));
82 assert(MUTEX_HELD(&scp->sc_lock));
83
84 if (!(scp->sc_flags & SVP_CF_DEGRADED))
85 return;
86
87 scp->sc_flags &= ~SVP_CF_DEGRADED;
88 if (srp->sr_ndconns == srp->sr_tconns)
89 svp_remote_restore(srp, SVP_RD_REMOTE_FAIL);
90 srp->sr_ndconns--;
91 }
92
93 static void
94 svp_conn_add(svp_conn_t *scp)
95 {
96 svp_remote_t *srp = scp->sc_remote;
97
98 assert(MUTEX_HELD(&srp->sr_lock));
99 assert(MUTEX_HELD(&scp->sc_lock));
100
101 if (scp->sc_flags & SVP_CF_ADDED)
102 return;
103
104 list_insert_tail(&srp->sr_conns, scp);
105 scp->sc_flags |= SVP_CF_ADDED;
106 srp->sr_tconns++;
107 }
108
109 static void
110 svp_conn_remove(svp_conn_t *scp)
111 {
112 svp_remote_t *srp = scp->sc_remote;
113
114 assert(MUTEX_HELD(&srp->sr_lock));
115 assert(MUTEX_HELD(&scp->sc_lock));
116
117 if (!(scp->sc_flags & SVP_CF_ADDED))
118 return;
119
120 scp->sc_flags &= ~SVP_CF_ADDED;
121 if (scp->sc_flags & SVP_CF_DEGRADED)
122 srp->sr_ndconns--;
123 srp->sr_tconns--;
124 if (srp->sr_tconns == srp->sr_ndconns)
125 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
126 }
127
128 static svp_query_t *
129 svp_conn_query_find(svp_conn_t *scp, uint32_t id)
130 {
131 svp_query_t *sqp;
132
133 assert(MUTEX_HELD(&scp->sc_lock));
134
135 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
136 sqp = list_next(&scp->sc_queries, sqp)) {
137 if (sqp->sq_header.svp_id == id)
138 break;
139 }
140
141 return (sqp);
142 }
143
144 static svp_conn_act_t
145 svp_conn_backoff(svp_conn_t *scp)
146 {
147 assert(MUTEX_HELD(&scp->sc_lock));
148
149 if (close(scp->sc_socket) != 0)
150 libvarpd_panic("failed to close socket %d: %d\n",
151 scp->sc_socket, errno);
152 scp->sc_socket = -1;
153
154 scp->sc_cstate = SVP_CS_BACKOFF;
155 scp->sc_nbackoff++;
156 if (scp->sc_nbackoff >= svp_conn_nbackoff) {
157 scp->sc_btimer.st_value =
158 svp_conn_backoff_tbl[svp_conn_nbackoff - 1];
159 } else {
160 scp->sc_btimer.st_value =
161 svp_conn_backoff_tbl[scp->sc_nbackoff - 1];
162 }
163 svp_timer_add(&scp->sc_btimer);
164
165 if (scp->sc_nbackoff > svp_conn_nbackoff)
166 return (SVP_RA_DEGRADE);
167 return (SVP_RA_NONE);
168 }
169
170 static svp_conn_act_t
171 svp_conn_connect(svp_conn_t *scp)
172 {
173 int ret;
174 struct sockaddr_in6 in6;
175
176 assert(MUTEX_HELD(&scp->sc_lock));
177 assert(scp->sc_cstate == SVP_CS_BACKOFF ||
178 scp->sc_cstate == SVP_CS_INITIAL);
179 assert(scp->sc_socket == -1);
180 if (scp->sc_cstate == SVP_CS_INITIAL)
181 scp->sc_nbackoff = 0;
182
183 scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
184 if (scp->sc_socket == -1) {
185 scp->sc_error = SVP_CE_SOCKET;
186 scp->sc_errno = errno;
187 scp->sc_cstate = SVP_CS_ERROR;
188 return (SVP_RA_DEGRADE);
189 }
190
191 bzero(&in6, sizeof (struct sockaddr_in6));
192 in6.sin6_family = AF_INET6;
193 in6.sin6_port = htons(scp->sc_remote->sr_rport);
194 bcopy(&scp->sc_addr, &in6.sin6_addr, sizeof (struct in6_addr));
195 ret = connect(scp->sc_socket, (struct sockaddr *)&in6,
196 sizeof (struct sockaddr_in6));
197 if (ret != 0) {
198 boolean_t async = B_FALSE;
199
200 switch (errno) {
201 case EACCES:
202 case EADDRINUSE:
203 case EAFNOSUPPORT:
204 case EALREADY:
205 case EBADF:
206 case EISCONN:
207 case ELOOP:
208 case ENOENT:
209 case ENOSR:
210 case EWOULDBLOCK:
211 libvarpd_panic("unanticipated connect errno %d", errno);
212 break;
213 case EINPROGRESS:
214 case EINTR:
215 async = B_TRUE;
216 default:
217 break;
218 }
219
220 /*
221 * So, we will be connecting to this in the future, advance our
222 * state and make sure that we poll for the next round.
223 */
224 if (async == B_TRUE) {
225 scp->sc_cstate = SVP_CS_CONNECTING;
226 scp->sc_event.se_events = POLLOUT | POLLHUP;
227 ret = svp_event_associate(&scp->sc_event,
228 scp->sc_socket);
229 if (ret == 0)
230 return (SVP_RA_NONE);
231 scp->sc_error = SVP_CE_ASSOCIATE;
232 scp->sc_errno = ret;
233 scp->sc_cstate = SVP_CS_ERROR;
234 return (SVP_RA_DEGRADE);
235 } else {
236 /*
237 * This call failed, which means that we obtained one of
238 * the following:
239 *
240 * EADDRNOTAVAIL
241 * ECONNREFUSED
242 * EIO
243 * ENETUNREACH
244 * EHOSTUNREACH
245 * ENXIO
246 * ETIMEDOUT
247 *
248 * Therefore we need to set ourselves into backoff and
249 * wait for that to clear up.
250 */
251 return (svp_conn_backoff(scp));
252 }
253 }
254
255 /*
256 * We've connected. Successfully move ourselves to the bound
257 * state and start polling.
258 */
259 scp->sc_cstate = SVP_CS_ACTIVE;
260 scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
261 ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
262 if (ret == 0)
263 return (SVP_RA_RESTORE);
264 scp->sc_error = SVP_CE_ASSOCIATE;
265 scp->sc_cstate = SVP_CS_ERROR;
266
267 return (SVP_RA_DEGRADE);
268 }
269
270 /*
271 * This should be the first call we get after a connect. If we have successfully
272 * connected, we should see a writeable event. We may also see an error or a
273 * hang up. In either of these cases, we transition to error mode. If there is
274 * also a readable event, we ignore it at the moment and just let a
275 * reassociation pick it up so we can simplify the set of state transitions that
276 * we have.
277 */
278 static svp_conn_act_t
279 svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
280 {
281 int ret, err;
282 socklen_t sl = sizeof (err);
283 if (!(pe->portev_events & POLLOUT)) {
284 scp->sc_errno = 0;
285 scp->sc_error = SVP_CE_NOPOLLOUT;
286 scp->sc_cstate = SVP_CS_ERROR;
287 return (SVP_RA_DEGRADE);
288 }
289
290 ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err, &sl);
291 if (ret != 0)
292 libvarpd_panic("unanticipated getsockopt error");
293 if (err != 0) {
294 return (svp_conn_backoff(scp));
295 }
296
297 scp->sc_cstate = SVP_CS_ACTIVE;
298 scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
299 ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
300 if (ret == 0)
301 return (SVP_RA_RESTORE);
302 scp->sc_error = SVP_CE_ASSOCIATE;
303 scp->sc_errno = ret;
304 scp->sc_cstate = SVP_CS_ERROR;
305 return (SVP_RA_DEGRADE);
306 }
307
308 static svp_conn_act_t
309 svp_conn_pollout(svp_conn_t *scp)
310 {
311 svp_query_t *sqp;
312 svp_req_t *req;
313 size_t off;
314 struct iovec iov[2];
315 int nvecs = 0;
316 ssize_t ret;
317
318 assert(MUTEX_HELD(&scp->sc_lock));
319
320 /*
321 * We need to find a query and start writing it out.
322 */
323 if (scp->sc_output.sco_query == NULL) {
324 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
325 sqp = list_next(&scp->sc_queries, sqp)) {
326 if (sqp->sq_state != SVP_QUERY_INIT)
327 continue;
328 break;
329 }
330
331 if (sqp == NULL) {
332 scp->sc_event.se_events &= ~POLLOUT;
333 return (SVP_RA_NONE);
334 }
335
336 scp->sc_output.sco_query = sqp;
337 scp->sc_output.sco_offset = 0;
338 sqp->sq_state = SVP_QUERY_WRITING;
339 svp_query_crc32(&sqp->sq_header, sqp->sq_rdata, sqp->sq_rsize);
340 }
341
342 sqp = scp->sc_output.sco_query;
343 req = &sqp->sq_header;
344 off = scp->sc_output.sco_offset;
345 if (off < sizeof (svp_req_t)) {
346 iov[nvecs].iov_base = (void *)((uintptr_t)req + off);
347 iov[nvecs].iov_len = sizeof (svp_req_t) - off;
348 nvecs++;
349 off = 0;
350 } else {
351 off -= sizeof (svp_req_t);
352 }
353
354 iov[nvecs].iov_base = (void *)((uintptr_t)sqp->sq_rdata + off);
355 iov[nvecs].iov_len = sqp->sq_rsize - off;
356 nvecs++;
357
358 do {
359 ret = writev(scp->sc_socket, iov, nvecs);
360 } while (ret == -1 && errno == EAGAIN);
361 if (ret == -1) {
362 switch (errno) {
363 case EAGAIN:
364 scp->sc_event.se_events |= POLLOUT;
365 return (SVP_RA_NONE);
366 case EIO:
367 case ENXIO:
368 case ECONNRESET:
369 return (SVP_RA_ERROR);
370 default:
371 libvarpd_panic("unexpected errno: %d", errno);
372 }
373 }
374
375 sqp->sq_acttime = gethrtime();
376 scp->sc_output.sco_offset += ret;
377 if (ret >= sizeof (svp_req_t) + sqp->sq_rsize) {
378 sqp->sq_state = SVP_QUERY_READING;
379 scp->sc_output.sco_query = NULL;
380 scp->sc_output.sco_offset = 0;
381 scp->sc_event.se_events |= POLLOUT;
382 }
383 return (SVP_RA_NONE);
384 }
385
386 static boolean_t
387 svp_conn_pollin_validate(svp_conn_t *scp)
388 {
389 svp_query_t *sqp;
390 uint32_t nsize;
391 uint16_t nvers, nop;
392 svp_req_t *resp = &scp->sc_input.sci_req;
393
394 assert(MUTEX_HELD(&scp->sc_lock));
395
396 nvers = ntohs(resp->svp_ver);
397 nop = ntohs(resp->svp_op);
398 nsize = ntohl(resp->svp_size);
399
400 if (nvers != SVP_CURRENT_VERSION) {
401 (void) bunyan_warn(svp_bunyan, "unsupported version",
402 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
403 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
404 BUNYAN_T_INT32, "version", nvers,
405 BUNYAN_T_INT32, "operation", nop,
406 BUNYAN_T_INT32, "response_id", resp->svp_id,
407 BUNYAN_T_END);
408 return (B_FALSE);
409 }
410
411 if (nop != SVP_R_VL2_ACK && nop != SVP_R_VL3_ACK &&
412 nop != SVP_R_LOG_ACK && nop != SVP_R_LOG_RM_ACK) {
413 (void) bunyan_warn(svp_bunyan, "unsupported operation",
414 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
415 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
416 BUNYAN_T_INT32, "version", nvers,
417 BUNYAN_T_INT32, "operation", nop,
418 BUNYAN_T_INT32, "response_id", resp->svp_id,
419 BUNYAN_T_END);
420 return (B_FALSE);
421 }
422
423 sqp = svp_conn_query_find(scp, resp->svp_id);
424 if (sqp == NULL) {
425 (void) bunyan_warn(svp_bunyan, "unknown response id",
426 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
427 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
428 BUNYAN_T_INT32, "version", nvers,
429 BUNYAN_T_INT32, "operation", nop,
430 BUNYAN_T_INT32, "response_id", resp->svp_id,
431 BUNYAN_T_END);
432 return (B_FALSE);
433 }
434
435 if (sqp->sq_state != SVP_QUERY_READING) {
436 (void) bunyan_warn(svp_bunyan,
437 "got response for unexpecting query",
438 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
439 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
440 BUNYAN_T_INT32, "version", nvers,
441 BUNYAN_T_INT32, "operation", nop,
442 BUNYAN_T_INT32, "response_id", resp->svp_id,
443 BUNYAN_T_INT32, "query_state", sqp->sq_state,
444 BUNYAN_T_END);
445 return (B_FALSE);
446 }
447
448 if ((nop == SVP_R_VL2_ACK && nsize != sizeof (svp_vl2_ack_t)) ||
449 (nop == SVP_R_VL3_ACK && nsize != sizeof (svp_vl3_ack_t)) ||
450 (nop == SVP_R_LOG_RM_ACK && nsize != sizeof (svp_lrm_ack_t))) {
451 (void) bunyan_warn(svp_bunyan, "response size too large",
452 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
453 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
454 BUNYAN_T_INT32, "version", nvers,
455 BUNYAN_T_INT32, "operation", nop,
456 BUNYAN_T_INT32, "response_id", resp->svp_id,
457 BUNYAN_T_INT32, "response_size", nsize,
458 BUNYAN_T_INT32, "expected_size", nop == SVP_R_VL2_ACK ?
459 sizeof (svp_vl2_ack_t) : sizeof (svp_vl3_ack_t),
460 BUNYAN_T_INT32, "query_state", sqp->sq_state,
461 BUNYAN_T_END);
462 return (B_FALSE);
463 }
464
465 /*
466 * The valid size is anything <= to what the user requested, but at
467 * least svp_log_ack_t bytes large.
468 */
469 if (nop == SVP_R_LOG_ACK) {
470 const char *msg = NULL;
471 if (nsize < sizeof (svp_log_ack_t))
472 msg = "response size too small";
473 else if (nsize > ((svp_log_req_t *)sqp->sq_rdata)->svlr_count)
474 msg = "response size too large";
475 if (msg != NULL) {
476 (void) bunyan_warn(svp_bunyan, msg,
477 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
478 BUNYAN_T_INT32, "remote_port",
479 scp->sc_remote->sr_rport,
480 BUNYAN_T_INT32, "version", nvers,
481 BUNYAN_T_INT32, "operation", nop,
482 BUNYAN_T_INT32, "response_id", resp->svp_id,
483 BUNYAN_T_INT32, "response_size", nsize,
484 BUNYAN_T_INT32, "expected_size",
485 ((svp_log_req_t *)sqp->sq_rdata)->svlr_count,
486 BUNYAN_T_INT32, "query_state", sqp->sq_state,
487 BUNYAN_T_END);
488 return (B_FALSE);
489 }
490 }
491
492 sqp->sq_size = nsize;
493 scp->sc_input.sci_query = sqp;
494 if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK ||
495 nop == SVP_R_LOG_RM_ACK) {
496 sqp->sq_wdata = &sqp->sq_wdun;
497 sqp->sq_wsize = sizeof (svp_query_data_t);
498 } else {
499 VERIFY(nop == SVP_R_LOG_ACK);
500 assert(sqp->sq_wdata != NULL);
501 assert(sqp->sq_wsize != 0);
502 }
503
504 return (B_TRUE);
505 }
506
507 static svp_conn_act_t
508 svp_conn_pollin(svp_conn_t *scp)
509 {
510 size_t off, total;
511 ssize_t ret;
512 svp_query_t *sqp;
513 uint32_t crc;
514 uint16_t nop;
515
516 assert(MUTEX_HELD(&scp->sc_lock));
517
518 /*
519 * No query implies that we're reading in the header and that the offset
520 * is associted with it.
521 */
522 off = scp->sc_input.sci_offset;
523 sqp = scp->sc_input.sci_query;
524 if (scp->sc_input.sci_query == NULL) {
525 svp_req_t *resp = &scp->sc_input.sci_req;
526
527 assert(off < sizeof (svp_req_t));
528
529 do {
530 ret = read(scp->sc_socket,
531 (void *)((uintptr_t)resp + off),
532 sizeof (svp_req_t) - off);
533 } while (ret == -1 && errno == EINTR);
534 if (ret == -1) {
535 switch (errno) {
536 case EAGAIN:
537 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
538 return (SVP_RA_NONE);
539 case EIO:
540 case ECONNRESET:
541 return (SVP_RA_ERROR);
542 break;
543 default:
544 libvarpd_panic("unexpeted read errno: %d",
545 errno);
546 }
547 } else if (ret == 0) {
548 /* Try to reconnect to the remote host */
549 return (SVP_RA_ERROR);
550 }
551
552 /* Didn't get all the data we need */
553 if (off + ret < sizeof (svp_req_t)) {
554 scp->sc_input.sci_offset += ret;
555 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
556 return (SVP_RA_NONE);
557 }
558
559 if (svp_conn_pollin_validate(scp) != B_TRUE)
560 return (SVP_RA_ERROR);
561 }
562
563 sqp = scp->sc_input.sci_query;
564 assert(sqp != NULL);
565 sqp->sq_acttime = gethrtime();
566 total = ntohl(scp->sc_input.sci_req.svp_size);
567 do {
568 ret = read(scp->sc_socket,
569 (void *)((uintptr_t)sqp->sq_wdata + off),
570 total - off);
571 } while (ret == -1 && errno == EINTR);
572
573 if (ret == -1) {
574 switch (errno) {
575 case EAGAIN:
576 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
577 return (SVP_RA_NONE);
578 case EIO:
579 case ECONNRESET:
580 return (SVP_RA_ERROR);
581 break;
582 default:
583 libvarpd_panic("unexpeted read errno: %d", errno);
584 }
585 } else if (ret == 0) {
586 /* Try to reconnect to the remote host */
587 return (SVP_RA_ERROR);
588 }
589
590 if (ret + off < total) {
591 scp->sc_input.sci_offset += ret;
592 return (SVP_RA_NONE);
593 }
594
595 nop = ntohs(scp->sc_input.sci_req.svp_op);
596 crc = scp->sc_input.sci_req.svp_crc32;
597 svp_query_crc32(&scp->sc_input.sci_req, sqp->sq_wdata, total);
598 if (crc != scp->sc_input.sci_req.svp_crc32) {
599 (void) bunyan_info(svp_bunyan, "crc32 mismatch",
600 BUNYAN_T_IP, "remote ip", &scp->sc_addr,
601 BUNYAN_T_INT32, "remote port", scp->sc_remote->sr_rport,
602 BUNYAN_T_INT32, "version",
603 ntohs(scp->sc_input.sci_req.svp_ver),
604 BUNYAN_T_INT32, "operation", nop,
605 BUNYAN_T_INT32, "response id",
606 ntohl(scp->sc_input.sci_req.svp_id),
607 BUNYAN_T_INT32, "query state", sqp->sq_state,
608 BUNYAN_T_UINT32, "msg_crc", ntohl(crc),
609 BUNYAN_T_UINT32, "calc_crc",
610 ntohl(scp->sc_input.sci_req.svp_crc32),
611 BUNYAN_T_END);
612 return (SVP_RA_ERROR);
613 }
614 scp->sc_input.sci_query = NULL;
615 scp->sc_input.sci_offset = 0;
616
617 if (nop == SVP_R_VL2_ACK) {
618 svp_vl2_ack_t *sl2a = sqp->sq_wdata;
619 sqp->sq_status = ntohl(sl2a->sl2a_status);
620 } else if (nop == SVP_R_VL3_ACK) {
621 svp_vl3_ack_t *sl3a = sqp->sq_wdata;
622 sqp->sq_status = ntohl(sl3a->sl3a_status);
623 } else if (nop == SVP_R_LOG_ACK) {
624 svp_log_ack_t *svla = sqp->sq_wdata;
625 sqp->sq_status = ntohl(svla->svla_status);
626 } else if (nop == SVP_R_LOG_RM_ACK) {
627 svp_lrm_ack_t *svra = sqp->sq_wdata;
628 sqp->sq_status = ntohl(svra->svra_status);
629 } else {
630 libvarpd_panic("unhandled nop: %d", nop);
631 }
632
633 list_remove(&scp->sc_queries, sqp);
634 mutex_exit(&scp->sc_lock);
635
636 /*
637 * We have to release all of our resources associated with this entry
638 * before we call the callback. After we call it, the memory will be
639 * lost to time.
640 */
641 svp_query_release(sqp);
642 sqp->sq_func(sqp, sqp->sq_arg);
643 mutex_enter(&scp->sc_lock);
644 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
645
646 return (SVP_RA_NONE);
647 }
648
649 static svp_conn_act_t
650 svp_conn_reset(svp_conn_t *scp)
651 {
652 svp_remote_t *srp = scp->sc_remote;
653
654 assert(MUTEX_HELD(&srp->sr_lock));
655 assert(MUTEX_HELD(&scp->sc_lock));
656
657 assert(svp_event_dissociate(&scp->sc_event, scp->sc_socket) ==
658 ENOENT);
659 if (close(scp->sc_socket) != 0)
660 libvarpd_panic("failed to close socket %d: %d", scp->sc_socket,
661 errno);
662 scp->sc_flags &= ~SVP_CF_TEARDOWN;
663 scp->sc_socket = -1;
664 scp->sc_cstate = SVP_CS_INITIAL;
665 scp->sc_input.sci_query = NULL;
666 scp->sc_output.sco_query = NULL;
667
668 svp_remote_reassign(srp, scp);
669
670 return (svp_conn_connect(scp));
671 }
672
673 /*
674 * This is our general state transition function. We're called here when we want
675 * to advance part of our state machine as well as to re-arm ourselves. We can
676 * also end up here from the standard event loop as a result of having a user
677 * event posted.
678 */
679 static void
680 svp_conn_handler(port_event_t *pe, void *arg)
681 {
682 svp_conn_t *scp = arg;
683 svp_remote_t *srp = scp->sc_remote;
684 svp_conn_act_t ret = SVP_RA_NONE;
685 svp_conn_state_t oldstate;
686
687 mutex_enter(&scp->sc_lock);
688
689 /*
690 * Check if one of our event interrupts is set. An event interrupt, such
691 * as having to be reaped or be torndown is notified by a
692 * PORT_SOURCE_USER event that tries to take care of this. However,
693 * because of the fact that the event loop can be ongoing despite this,
694 * we may get here before the PORT_SOURCE_USER has casued us to get
695 * here. In such a case, if the PORT_SOURCE_USER event is tagged, then
696 * we're going to opt to do nothing here and wait for it to come and
697 * tear us down. That will also indicate to us that we have nothing to
698 * worry about as far as general timing and the like goes.
699 */
700 if ((scp->sc_flags & SVP_CF_UFLAG) != 0 &&
701 (scp->sc_flags & SVP_CF_USER) != 0 &&
702 pe != NULL &&
703 pe->portev_source != PORT_SOURCE_USER) {
704 mutex_exit(&scp->sc_lock);
705 return;
706 }
707
708 if (pe != NULL && pe->portev_source == PORT_SOURCE_USER) {
709 scp->sc_flags &= ~SVP_CF_USER;
710 if ((scp->sc_flags & SVP_CF_UFLAG) == 0) {
711 mutex_exit(&scp->sc_lock);
712 return;
713 }
714 }
715
716 /* Check if this needs to be freed */
717 if (scp->sc_flags & SVP_CF_REAP) {
718 mutex_exit(&scp->sc_lock);
719 svp_conn_destroy(scp);
720 return;
721 }
722
723 /* Check if this needs to be reset */
724 if (scp->sc_flags & SVP_CF_TEARDOWN) {
725 /* Make sure any other users of this are disassociated */
726 ret = SVP_RA_ERROR;
727 goto out;
728 }
729
730 switch (scp->sc_cstate) {
731 case SVP_CS_INITIAL:
732 case SVP_CS_BACKOFF:
733 assert(pe == NULL);
734 ret = svp_conn_connect(scp);
735 break;
736 case SVP_CS_CONNECTING:
737 assert(pe != NULL);
738 ret = svp_conn_poll_connect(pe, scp);
739 break;
740 case SVP_CS_ACTIVE:
741 case SVP_CS_WINDDOWN:
742 assert(pe != NULL);
743 oldstate = scp->sc_cstate;
744 if (pe->portev_events & POLLOUT)
745 ret = svp_conn_pollout(scp);
746 if (ret == SVP_RA_NONE && (pe->portev_events & POLLIN))
747 ret = svp_conn_pollin(scp);
748
749 if (oldstate == SVP_CS_WINDDOWN &&
750 (list_is_empty(&scp->sc_queries) || ret != SVP_RA_NONE)) {
751 ret = SVP_RA_CLEANUP;
752 }
753
754 if (ret == SVP_RA_NONE) {
755 int err;
756 if ((err = svp_event_associate(&scp->sc_event,
757 scp->sc_socket)) != 0) {
758 scp->sc_error = SVP_CE_ASSOCIATE;
759 scp->sc_errno = err;
760 scp->sc_cstate = SVP_CS_ERROR;
761 ret = SVP_RA_DEGRADE;
762 }
763 }
764 break;
765 default:
766 libvarpd_panic("svp_conn_handler encountered unexpected "
767 "state: %d", scp->sc_cstate);
768 }
769 out:
770 mutex_exit(&scp->sc_lock);
771
772 if (ret == SVP_RA_NONE)
773 return;
774
775 mutex_enter(&srp->sr_lock);
776 mutex_enter(&scp->sc_lock);
777 if (ret == SVP_RA_ERROR)
778 ret = svp_conn_reset(scp);
779
780 if (ret == SVP_RA_DEGRADE)
781 svp_conn_degrade(scp);
782 if (ret == SVP_RA_RESTORE)
783 svp_conn_restore(scp);
784
785 if (ret == SVP_RA_CLEANUP) {
786 svp_conn_remove(scp);
787 scp->sc_flags |= SVP_CF_REAP;
788 svp_conn_inject(scp);
789 }
790 mutex_exit(&scp->sc_lock);
791 mutex_exit(&srp->sr_lock);
792 }
793
794 static void
795 svp_conn_backtimer(void *arg)
796 {
797 svp_conn_t *scp = arg;
798
799 svp_conn_handler(NULL, scp);
800 }
801
802 /*
803 * This fires every svp_conn_query_timeout seconds. Its purpos is to determine
804 * if we haven't heard back on a request with in svp_conn_query_timeout seconds.
805 * If any of the svp_conn_query_t's that have been started (indicated by
806 * svp_query_t`sq_acttime != -1), and more than svp_conn_query_timeout seconds
807 * have passed, we basically tear this connection down and reassign outstanding
808 * queries.
809 */
810 static void
811 svp_conn_querytimer(void *arg)
812 {
813 int ret;
814 svp_query_t *sqp;
815 svp_conn_t *scp = arg;
816 hrtime_t now = gethrtime();
817
818 mutex_enter(&scp->sc_lock);
819
820 /*
821 * If we're not in the active state, then we don't care about this as
822 * we're already either going to die or we have no connections to worry
823 * about.
824 */
825 if (scp->sc_cstate != SVP_CS_ACTIVE) {
826 mutex_exit(&scp->sc_lock);
827 return;
828 }
829
830 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
831 sqp = list_next(&scp->sc_queries, sqp)) {
832 if (sqp->sq_acttime == -1)
833 continue;
834 if ((now - sqp->sq_acttime) / NANOSEC > svp_conn_query_timeout)
835 break;
836 }
837
838 /* Nothing timed out, we're good here */
839 if (sqp == NULL) {
840 mutex_exit(&scp->sc_lock);
841 return;
842 }
843
844 (void) bunyan_warn(svp_bunyan, "query timed out on connection",
845 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
846 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
847 BUNYAN_T_INT32, "operation", ntohs(sqp->sq_header.svp_op),
848 BUNYAN_T_END);
849
850 /*
851 * Begin the tear down process for this connect. If we lose the
852 * disassociate, then we don't inject an event. See the big theory
853 * statement in libvarpd_svp.c for more information.
854 */
855 scp->sc_flags |= SVP_CF_TEARDOWN;
856
857 ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket);
858 if (ret == 0)
859 svp_conn_inject(scp);
860 else
861 VERIFY(ret == ENOENT);
862
863 mutex_exit(&scp->sc_lock);
864 }
865
866 /*
867 * This connection has fallen out of DNS, figure out what we need to do with it.
868 */
869 void
870 svp_conn_fallout(svp_conn_t *scp)
871 {
872 svp_remote_t *srp = scp->sc_remote;
873
874 assert(MUTEX_HELD(&srp->sr_lock));
875
876 mutex_enter(&scp->sc_lock);
877 switch (scp->sc_cstate) {
878 case SVP_CS_ERROR:
879 /*
880 * Connection is already inactive, so it's safe to tear down.
881 * Fire it off through the state machine to tear down via the
882 * backoff timer.
883 */
884 svp_conn_remove(scp);
885 scp->sc_flags |= SVP_CF_REAP;
886 svp_conn_inject(scp);
887 break;
888 case SVP_CS_INITIAL:
889 case SVP_CS_BACKOFF:
890 case SVP_CS_CONNECTING:
891 /*
892 * Here, we have something actively going on, so we'll let it be
893 * clean up the next time we hit the event loop by the event
894 * loop itself. As it has no connections, there isn't much to
895 * really do, though we'll take this chance to go ahead and
896 * remove it from the remote.
897 */
898 svp_conn_remove(scp);
899 scp->sc_flags |= SVP_CF_REAP;
900 svp_conn_inject(scp);
901 break;
902 case SVP_CS_ACTIVE:
903 case SVP_CS_WINDDOWN:
904 /*
905 * If there are no outstanding queries, then we should simply
906 * clean this up now,t he same way we would with the others.
907 * Othewrise, as we know the event loop is ongoing, we'll make
908 * sure that these entries get cleaned up once they're done.
909 */
910 scp->sc_cstate = SVP_CS_WINDDOWN;
911 if (list_is_empty(&scp->sc_queries)) {
912 svp_conn_remove(scp);
913 scp->sc_flags |= SVP_CF_REAP;
914 svp_conn_inject(scp);
915 }
916 break;
917 default:
918 libvarpd_panic("svp_conn_fallout encountered"
919 "unkonwn state");
920 }
921 mutex_exit(&scp->sc_lock);
922 mutex_exit(&srp->sr_lock);
923 }
924
925 int
926 svp_conn_create(svp_remote_t *srp, const struct in6_addr *addr)
927 {
928 int ret;
929 svp_conn_t *scp;
930
931 assert(MUTEX_HELD(&srp->sr_lock));
932 scp = umem_zalloc(sizeof (svp_conn_t), UMEM_DEFAULT);
933 if (scp == NULL)
934 return (ENOMEM);
935
936 if ((ret = mutex_init(&scp->sc_lock, USYNC_THREAD | LOCK_ERRORCHECK,
937 NULL)) != 0) {
938 umem_free(scp, sizeof (svp_conn_t));
939 return (ret);
940 }
941
942 scp->sc_remote = srp;
943 scp->sc_event.se_func = svp_conn_handler;
944 scp->sc_event.se_arg = scp;
945 scp->sc_btimer.st_func = svp_conn_backtimer;
946 scp->sc_btimer.st_arg = scp;
947 scp->sc_btimer.st_oneshot = B_TRUE;
948 scp->sc_btimer.st_value = 1;
949
950 scp->sc_qtimer.st_func = svp_conn_querytimer;
951 scp->sc_qtimer.st_arg = scp;
952 scp->sc_qtimer.st_oneshot = B_FALSE;
953 scp->sc_qtimer.st_value = svp_conn_query_timeout;
954
955 scp->sc_socket = -1;
956
957 list_create(&scp->sc_queries, sizeof (svp_query_t),
958 offsetof(svp_query_t, sq_lnode));
959 scp->sc_gen = srp->sr_gen;
960 bcopy(addr, &scp->sc_addr, sizeof (struct in6_addr));
961 scp->sc_cstate = SVP_CS_INITIAL;
962 mutex_enter(&scp->sc_lock);
963 svp_conn_add(scp);
964 mutex_exit(&scp->sc_lock);
965
966 /* Now that we're locked and loaded, add our timers */
967 svp_timer_add(&scp->sc_qtimer);
968 svp_timer_add(&scp->sc_btimer);
969
970 return (0);
971 }
972
973 /*
974 * At the time of calling, the entry has been removed from all lists. In
975 * addition, the entries state should be SVP_CS_ERROR, therefore, we know that
976 * the fd should not be associated with the event loop. We'll double check that
977 * just in case. We should also have already been removed from the remote's
978 * list.
979 */
980 void
981 svp_conn_destroy(svp_conn_t *scp)
982 {
983 int ret;
984
985 mutex_enter(&scp->sc_lock);
986 if (scp->sc_cstate != SVP_CS_ERROR)
987 libvarpd_panic("asked to tear down an active connection");
988 if (scp->sc_flags & SVP_CF_ADDED)
989 libvarpd_panic("asked to remove a connection still in "
990 "the remote list\n");
991 if (!list_is_empty(&scp->sc_queries))
992 libvarpd_panic("asked to remove a connection with non-empty "
993 "query list");
994
995 if ((ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket)) !=
996 ENOENT) {
997 libvarpd_panic("dissociate failed or was actually "
998 "associated: %d", ret);
999 }
1000 mutex_exit(&scp->sc_lock);
1001
1002 /* Verify our timers are killed */
1003 svp_timer_remove(&scp->sc_btimer);
1004 svp_timer_remove(&scp->sc_qtimer);
1005
1006 if (scp->sc_socket != -1 && close(scp->sc_socket) != 0)
1007 libvarpd_panic("failed to close svp_conn_t`scp_socket fd "
1008 "%d: %d", scp->sc_socket, errno);
1009
1010 list_destroy(&scp->sc_queries);
1011 umem_free(scp, sizeof (svp_conn_t));
1012 }
1013
1014 void
1015 svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp)
1016 {
1017 assert(MUTEX_HELD(&scp->sc_lock));
1018 assert(scp->sc_cstate == SVP_CS_ACTIVE);
1019
1020 sqp->sq_acttime = -1;
1021 list_insert_tail(&scp->sc_queries, sqp);
1022 if (!(scp->sc_event.se_events & POLLOUT)) {
1023 scp->sc_event.se_events |= POLLOUT;
1024 /*
1025 * If this becomes frequent, we should instead give up on this
1026 * set of connections instead of aborting.
1027 */
1028 if (svp_event_associate(&scp->sc_event, scp->sc_socket) != 0)
1029 libvarpd_panic("svp_event_associate failed somehow");
1030 }
1031 }