1 /*
   2  * This file and its contents are supplied under the terms of the
   3  * Common Development and Distribution License ("CDDL"), version 1.0.
   4  * You may only use this file in accordance with the terms of version
   5  * 1.0 of the CDDL.
   6  *
   7  * A full copy of the text of the CDDL should have accompanied this
   8  * source.  A copy of the CDDL is also available via the Internet at
   9  * http://www.illumos.org/license/CDDL.
  10  */
  11 
  12 /*
  13  * Copyright 2015 Joyent, Inc.
  14  */
  15 
  16 /*
  17  * Logic to manage an individual connection to a remote host.
  18  *
  19  * For more information, see the big theory statement in
  20  * lib/varpd/svp/common/libvarpd_svp.c.
  21  */
  22 
  23 #include <assert.h>
  24 #include <umem.h>
  25 #include <errno.h>
  26 #include <strings.h>
  27 #include <unistd.h>
  28 #include <stddef.h>
  29 #include <sys/uio.h>
  30 #include <sys/debug.h>
  31 
  32 #include <libvarpd_svp.h>
  33 
  34 int svp_conn_query_timeout = 30;
  35 static int svp_conn_backoff_tbl[] = { 1, 2, 4, 8, 16, 32 };
  36 static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int);
  37 
  38 typedef enum svp_conn_act {
  39         SVP_RA_NONE     = 0x00,
  40         SVP_RA_DEGRADE  = 0x01,
  41         SVP_RA_RESTORE  = 0x02,
  42         SVP_RA_ERROR    = 0x03,
  43         SVP_RA_CLEANUP  = 0x04,
  44         SVP_RA_FIND_VERSION = 0x05
  45 } svp_conn_act_t;
  46 
  47 static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
  48 
  49 static void
  50 svp_conn_inject(svp_conn_t *scp)
  51 {
  52         int ret;
  53         assert(MUTEX_HELD(&scp->sc_lock));
  54 
  55         if (scp->sc_flags & SVP_CF_USER)
  56                 return;
  57         scp->sc_flags |= SVP_CF_USER;
  58         if ((ret = svp_event_inject(&scp->sc_event)) != 0)
  59                 libvarpd_panic("failed to inject event: %d\n", ret);
  60 }
  61 
  62 static void
  63 svp_conn_degrade(svp_conn_t *scp)
  64 {
  65         svp_remote_t *srp = scp->sc_remote;
  66 
  67         assert(MUTEX_HELD(&srp->sr_lock));
  68         assert(MUTEX_HELD(&scp->sc_lock));
  69 
  70         if (scp->sc_flags & SVP_CF_DEGRADED)
  71                 return;
  72 
  73         scp->sc_flags |= SVP_CF_DEGRADED;
  74         srp->sr_ndconns++;
  75         if (srp->sr_ndconns == srp->sr_tconns)
  76                 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
  77 }
  78 
  79 static void
  80 svp_conn_restore(svp_conn_t *scp)
  81 {
  82         svp_remote_t *srp = scp->sc_remote;
  83 
  84         assert(MUTEX_HELD(&srp->sr_lock));
  85         assert(MUTEX_HELD(&scp->sc_lock));
  86 
  87         if (!(scp->sc_flags & SVP_CF_DEGRADED))
  88                 return;
  89 
  90         scp->sc_flags &= ~SVP_CF_DEGRADED;
  91         if (srp->sr_ndconns == srp->sr_tconns)
  92                 svp_remote_restore(srp, SVP_RD_REMOTE_FAIL);
  93         srp->sr_ndconns--;
  94 }
  95 
  96 static svp_conn_act_t
  97 svp_conn_pong_handler(svp_conn_t *scp, svp_query_t *sqp)
  98 {
  99         uint16_t remote_version = ntohs(scp->sc_input.sci_req.svp_ver);
 100 
 101         if (scp->sc_cstate == SVP_CS_VERSIONING) {
 102                 /* Transition VERSIONING -> ACTIVE. */
 103                 assert(scp->sc_version == 0);
 104                 if (remote_version == 0 || remote_version > SVP_CURRENT_VERSION)
 105                         return (SVP_RA_ERROR);
 106                 scp->sc_version = remote_version;
 107                 scp->sc_cstate = SVP_CS_ACTIVE;
 108         }
 109 
 110         return (SVP_RA_NONE);
 111 }
 112 
 113 static void
 114 svp_conn_ping_cb(svp_query_t *sqp, void *arg)
 115 {
 116         size_t len = (size_t)arg;
 117 
 118         assert(len == sizeof (svp_query_t));
 119         umem_free(sqp, len);
 120 }
 121 
 122 static svp_conn_act_t
 123 svp_conn_ping_version(svp_conn_t *scp)
 124 {
 125         svp_remote_t *srp = scp->sc_remote;
 126         svp_query_t *sqp = umem_zalloc(sizeof (svp_query_t), UMEM_DEFAULT);
 127         int ret;
 128 
 129         assert(MUTEX_HELD(&srp->sr_lock));
 130         assert(MUTEX_HELD(&scp->sc_lock));
 131         assert(scp->sc_cstate == SVP_CS_CONNECTING);
 132 
 133         if (sqp == NULL)
 134                 return (SVP_RA_ERROR);
 135 
 136         /* Only set things that need to be non-0/non-NULL. */
 137         sqp->sq_state = SVP_QUERY_INIT;
 138         sqp->sq_func = svp_conn_ping_cb;
 139         sqp->sq_arg = (void *)sizeof (svp_query_t);
 140         sqp->sq_header.svp_op = htons(SVP_R_PING);
 141         sqp->sq_header.svp_ver = htons(SVP_CURRENT_VERSION);
 142         sqp->sq_header.svp_id = svp_id_alloc();
 143         if (sqp->sq_header.svp_id == -1) {
 144                 umem_free(sqp, sizeof (svp_query_t));
 145                 return (SVP_RA_ERROR);
 146         }
 147 
 148         scp->sc_cstate = SVP_CS_VERSIONING;
 149         /* Set the event flags now... */
 150         scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP | POLLOUT;
 151         /* ...so I can just queue it up directly... */
 152         svp_conn_queue(scp, sqp);
 153         /* ... and then associate the event port myself. */
 154         ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
 155         if (ret == 0)
 156                 return (SVP_RA_RESTORE);
 157         scp->sc_error = SVP_CE_ASSOCIATE;
 158         scp->sc_errno = ret;
 159         scp->sc_cstate = SVP_CS_ERROR;
 160         umem_free(sqp, sizeof (svp_query_t));
 161         return (SVP_RA_DEGRADE);
 162 }
 163 
 164 static void
 165 svp_conn_add(svp_conn_t *scp)
 166 {
 167         svp_remote_t *srp = scp->sc_remote;
 168 
 169         assert(MUTEX_HELD(&srp->sr_lock));
 170         assert(MUTEX_HELD(&scp->sc_lock));
 171 
 172         if (scp->sc_flags & SVP_CF_ADDED)
 173                 return;
 174 
 175         list_insert_tail(&srp->sr_conns, scp);
 176         scp->sc_flags |= SVP_CF_ADDED;
 177         srp->sr_tconns++;
 178 }
 179 
 180 static void
 181 svp_conn_remove(svp_conn_t *scp)
 182 {
 183         svp_remote_t *srp = scp->sc_remote;
 184 
 185         assert(MUTEX_HELD(&srp->sr_lock));
 186         assert(MUTEX_HELD(&scp->sc_lock));
 187 
 188         if (!(scp->sc_flags & SVP_CF_ADDED))
 189                 return;
 190 
 191         scp->sc_flags &= ~SVP_CF_ADDED;
 192         if (scp->sc_flags & SVP_CF_DEGRADED)
 193                 srp->sr_ndconns--;
 194         srp->sr_tconns--;
 195         if (srp->sr_tconns == srp->sr_ndconns)
 196                 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
 197 }
 198 
 199 static svp_query_t *
 200 svp_conn_query_find(svp_conn_t *scp, uint32_t id)
 201 {
 202         svp_query_t *sqp;
 203 
 204         assert(MUTEX_HELD(&scp->sc_lock));
 205 
 206         for (sqp = list_head(&scp->sc_queries); sqp != NULL;
 207             sqp = list_next(&scp->sc_queries, sqp)) {
 208                 if (sqp->sq_header.svp_id == id)
 209                         break;
 210         }
 211 
 212         return (sqp);
 213 }
 214 
 215 static svp_conn_act_t
 216 svp_conn_backoff(svp_conn_t *scp)
 217 {
 218         assert(MUTEX_HELD(&scp->sc_lock));
 219 
 220         if (close(scp->sc_socket) != 0)
 221                 libvarpd_panic("failed to close socket %d: %d\n",
 222                     scp->sc_socket, errno);
 223         scp->sc_socket = -1;
 224 
 225         scp->sc_cstate = SVP_CS_BACKOFF;
 226         scp->sc_nbackoff++;
 227         if (scp->sc_nbackoff >= svp_conn_nbackoff) {
 228                 scp->sc_btimer.st_value =
 229                     svp_conn_backoff_tbl[svp_conn_nbackoff - 1];
 230         } else {
 231                 scp->sc_btimer.st_value =
 232                     svp_conn_backoff_tbl[scp->sc_nbackoff - 1];
 233         }
 234         svp_timer_add(&scp->sc_btimer);
 235 
 236         if (scp->sc_nbackoff > svp_conn_nbackoff)
 237                 return (SVP_RA_DEGRADE);
 238         return (SVP_RA_NONE);
 239 }
 240 
 241 static svp_conn_act_t
 242 svp_conn_connect(svp_conn_t *scp)
 243 {
 244         int ret;
 245         struct sockaddr_in6 in6;
 246 
 247         assert(MUTEX_HELD(&scp->sc_lock));
 248         assert(scp->sc_cstate == SVP_CS_BACKOFF ||
 249             scp->sc_cstate == SVP_CS_INITIAL);
 250         assert(scp->sc_socket == -1);
 251         if (scp->sc_cstate == SVP_CS_INITIAL)
 252                 scp->sc_nbackoff = 0;
 253 
 254         /* New connect means we need to know the version. */
 255         scp->sc_version = 0;
 256 
 257         scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
 258         if (scp->sc_socket == -1) {
 259                 scp->sc_error = SVP_CE_SOCKET;
 260                 scp->sc_errno = errno;
 261                 scp->sc_cstate = SVP_CS_ERROR;
 262                 return (SVP_RA_DEGRADE);
 263         }
 264 
 265         bzero(&in6, sizeof (struct sockaddr_in6));
 266         in6.sin6_family = AF_INET6;
 267         in6.sin6_port = htons(scp->sc_remote->sr_rport);
 268         bcopy(&scp->sc_addr, &in6.sin6_addr,  sizeof (struct in6_addr));
 269         ret = connect(scp->sc_socket, (struct sockaddr *)&in6,
 270             sizeof (struct sockaddr_in6));
 271         if (ret != 0) {
 272                 boolean_t async = B_FALSE;
 273 
 274                 switch (errno) {
 275                 case EACCES:
 276                 case EADDRINUSE:
 277                 case EAFNOSUPPORT:
 278                 case EALREADY:
 279                 case EBADF:
 280                 case EISCONN:
 281                 case ELOOP:
 282                 case ENOENT:
 283                 case ENOSR:
 284                 case EWOULDBLOCK:
 285                         libvarpd_panic("unanticipated connect errno %d", errno);
 286                         break;
 287                 case EINPROGRESS:
 288                 case EINTR:
 289                         async = B_TRUE;
 290                 default:
 291                         break;
 292                 }
 293 
 294                 /*
 295                  * So, we will be connecting to this in the future, advance our
 296                  * state and make sure that we poll for the next round.
 297                  */
 298                 if (async == B_TRUE) {
 299                         scp->sc_cstate = SVP_CS_CONNECTING;
 300                         scp->sc_event.se_events = POLLOUT | POLLHUP;
 301                         ret = svp_event_associate(&scp->sc_event,
 302                             scp->sc_socket);
 303                         if (ret == 0)
 304                                 return (SVP_RA_NONE);
 305                         scp->sc_error = SVP_CE_ASSOCIATE;
 306                         scp->sc_errno = ret;
 307                         scp->sc_cstate = SVP_CS_ERROR;
 308                         return (SVP_RA_DEGRADE);
 309                 } else {
 310                         /*
 311                          * This call failed, which means that we obtained one of
 312                          * the following:
 313                          *
 314                          * EADDRNOTAVAIL
 315                          * ECONNREFUSED
 316                          * EIO
 317                          * ENETUNREACH
 318                          * EHOSTUNREACH
 319                          * ENXIO
 320                          * ETIMEDOUT
 321                          *
 322                          * Therefore we need to set ourselves into backoff and
 323                          * wait for that to clear up.
 324                          */
 325                         return (svp_conn_backoff(scp));
 326                 }
 327         }
 328 
 329         /* Immediately successful connection, move to SVP_CS_VERSIONING. */
 330         return (svp_conn_poll_connect(NULL, scp));
 331 }
 332 
 333 /*
 334  * This should be the first call we get after a successful synchronous
 335  * connect, or a completed (failed or successful) asynchronous connect.  A
 336  * non-NULL port-event indicates asynchronous completion, a NULL port-event
 337  * indicates a successful synchronous connect.
 338  * 
 339  * If we have successfully connected, we should see a writeable event.  In the
 340  * asynchronous case, we may also see an error or a hang up. For either hang
 341  * up or error, we transition to error mode. If there is also a readable event
 342  * (i.e. incoming data), we ignore it at the moment and just let a
 343  * reassociation pick it up so we can simplify the set of state transitions
 344  * that we have.
 345  */
 346 static svp_conn_act_t
 347 svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
 348 {
 349         int ret;
 350         svp_conn_error_t version_error;
 351 
 352         if (pe != NULL) {
 353                 int err;
 354                 socklen_t sl = sizeof (err);
 355 
 356                 /*
 357                  * These bits only matter if we're notified of an
 358                  * asynchronous connection completion.
 359                  */
 360                 if (!(pe->portev_events & POLLOUT)) {
 361                         scp->sc_errno = 0;
 362                         scp->sc_error = SVP_CE_NOPOLLOUT;
 363                         scp->sc_cstate = SVP_CS_ERROR;
 364                         return (SVP_RA_DEGRADE);
 365                 }
 366 
 367                 ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err,
 368                     &sl);
 369                 if (ret != 0)
 370                         libvarpd_panic("unanticipated getsockopt error");
 371                 if (err != 0) {
 372                         return (svp_conn_backoff(scp));
 373                 }
 374         }
 375 
 376         return (SVP_RA_FIND_VERSION);
 377 }
 378 
 379 static svp_conn_act_t
 380 svp_conn_pollout(svp_conn_t *scp)
 381 {
 382         svp_query_t *sqp;
 383         svp_req_t *req;
 384         size_t off;
 385         struct iovec iov[2];
 386         int nvecs = 0;
 387         ssize_t ret;
 388 
 389         assert(MUTEX_HELD(&scp->sc_lock));
 390 
 391         /*
 392          * We need to find a query and start writing it out.
 393          */
 394         if (scp->sc_output.sco_query == NULL) {
 395                 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
 396                     sqp = list_next(&scp->sc_queries, sqp)) {
 397                         if (sqp->sq_state != SVP_QUERY_INIT)
 398                                 continue;
 399                         break;
 400                 }
 401 
 402                 if (sqp == NULL) {
 403                         scp->sc_event.se_events &= ~POLLOUT;
 404                         return (SVP_RA_NONE);
 405                 }
 406 
 407                 scp->sc_output.sco_query = sqp;
 408                 scp->sc_output.sco_offset = 0;
 409                 sqp->sq_state = SVP_QUERY_WRITING;
 410                 svp_query_crc32(&sqp->sq_header, sqp->sq_rdata, sqp->sq_rsize);
 411         }
 412 
 413         sqp = scp->sc_output.sco_query;
 414         req = &sqp->sq_header;
 415         off = scp->sc_output.sco_offset;
 416         if (off < sizeof (svp_req_t)) {
 417                 iov[nvecs].iov_base = (void *)((uintptr_t)req + off);
 418                 iov[nvecs].iov_len = sizeof (svp_req_t) - off;
 419                 nvecs++;
 420                 off = 0;
 421         } else {
 422                 off -= sizeof (svp_req_t);
 423         }
 424 
 425         iov[nvecs].iov_base = (void *)((uintptr_t)sqp->sq_rdata + off);
 426         iov[nvecs].iov_len = sqp->sq_rsize - off;
 427         nvecs++;
 428 
 429         do {
 430                 ret = writev(scp->sc_socket, iov, nvecs);
 431         } while (ret == -1 && errno == EINTR);
 432         if (ret == -1) {
 433                 switch (errno) {
 434                 case EAGAIN:
 435                         scp->sc_event.se_events |= POLLOUT;
 436                         return (SVP_RA_NONE);
 437                 case EIO:
 438                 case ENXIO:
 439                 case ECONNRESET:
 440                         return (SVP_RA_ERROR);
 441                 default:
 442                         libvarpd_panic("unexpected errno: %d", errno);
 443                 }
 444         }
 445 
 446         sqp->sq_acttime = gethrtime();
 447         scp->sc_output.sco_offset += ret;
 448         if (ret >= sizeof (svp_req_t) + sqp->sq_rsize) {
 449                 sqp->sq_state = SVP_QUERY_READING;
 450                 scp->sc_output.sco_query = NULL;
 451                 scp->sc_output.sco_offset = 0;
 452                 scp->sc_event.se_events |= POLLOUT;
 453         }
 454         return (SVP_RA_NONE);
 455 }
 456 
 457 static boolean_t
 458 svp_conn_pollin_validate(svp_conn_t *scp)
 459 {
 460         svp_query_t *sqp;
 461         uint32_t nsize, expected_size = 0;
 462         uint16_t nvers, nop;
 463         svp_req_t *resp = &scp->sc_input.sci_req;
 464 
 465         assert(MUTEX_HELD(&scp->sc_lock));
 466 
 467         nvers = ntohs(resp->svp_ver);
 468         nop = ntohs(resp->svp_op);
 469         nsize = ntohl(resp->svp_size);
 470 
 471         /*
 472          * A peer that's messing with post-connection version changes is
 473          * likely a broken peer.
 474          */
 475         if (scp->sc_cstate != SVP_CS_VERSIONING && nvers != scp->sc_version) {
 476                 (void) bunyan_warn(svp_bunyan, "version mismatch",
 477                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 478                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 479                     BUNYAN_T_INT32, "peer version", nvers,
 480                     BUNYAN_T_INT32, "our version", scp->sc_version,
 481                     BUNYAN_T_INT32, "operation", nop,
 482                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 483                     BUNYAN_T_END);
 484                 return (B_FALSE);
 485         }
 486 
 487         switch (nop) {
 488         case SVP_R_VL2_ACK:
 489                 expected_size = sizeof (svp_vl2_ack_t);
 490                 break;
 491         case SVP_R_VL3_ACK:
 492                 expected_size = sizeof (svp_vl3_ack_t);
 493                 break;
 494         case SVP_R_LOG_RM_ACK:
 495                 expected_size = sizeof (svp_lrm_ack_t);
 496                 break;
 497         case SVP_R_ROUTE_ACK:
 498                 expected_size = sizeof (svp_route_ack_t);
 499                 break;
 500         case SVP_R_LOG_ACK:
 501         case SVP_R_PONG:
 502                 /* No expected size (LOG_ACK) or size is 0 (PONG). */
 503                 break;
 504         default:
 505                 (void) bunyan_warn(svp_bunyan, "unsupported operation",
 506                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 507                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 508                     BUNYAN_T_INT32, "version", nvers,
 509                     BUNYAN_T_INT32, "operation", nop,
 510                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 511                     BUNYAN_T_END);
 512                 return (B_FALSE);
 513         }
 514 
 515         sqp = svp_conn_query_find(scp, resp->svp_id);
 516         if (sqp == NULL) {
 517                 (void) bunyan_warn(svp_bunyan, "unknown response id",
 518                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 519                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 520                     BUNYAN_T_INT32, "version", nvers,
 521                     BUNYAN_T_INT32, "operation", nop,
 522                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 523                     BUNYAN_T_END);
 524                 return (B_FALSE);
 525         }
 526 
 527         if (sqp->sq_state != SVP_QUERY_READING) {
 528                 (void) bunyan_warn(svp_bunyan,
 529                     "got response for unexpecting query",
 530                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 531                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 532                     BUNYAN_T_INT32, "version", nvers,
 533                     BUNYAN_T_INT32, "operation", nop,
 534                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 535                     BUNYAN_T_INT32, "query_state", sqp->sq_state,
 536                     BUNYAN_T_END);
 537                 return (B_FALSE);
 538         }
 539 
 540         if (nop != SVP_R_LOG_RM_ACK && nsize != expected_size) {
 541                 (void) bunyan_warn(svp_bunyan, "response size too large",
 542                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 543                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 544                     BUNYAN_T_INT32, "version", nvers,
 545                     BUNYAN_T_INT32, "operation", nop,
 546                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 547                     BUNYAN_T_INT32, "response_size", nsize,
 548                     BUNYAN_T_INT32, "expected_size", nop == SVP_R_VL2_ACK ?
 549                     sizeof (svp_vl2_ack_t) : sizeof (svp_vl3_ack_t),
 550                     BUNYAN_T_INT32, "query_state", sqp->sq_state,
 551                     BUNYAN_T_END);
 552                 return (B_FALSE);
 553         }
 554 
 555         /*
 556          * The valid size is anything <= to what the user requested, but at
 557          * least svp_log_ack_t bytes large.
 558          */
 559         if (nop == SVP_R_LOG_ACK) {
 560                 const char *msg = NULL;
 561                 if (nsize < sizeof (svp_log_ack_t))
 562                         msg = "response size too small";
 563                 else if (nsize > ((svp_log_req_t *)sqp->sq_rdata)->svlr_count)
 564                         msg = "response size too large";
 565                 if (msg != NULL) {
 566                         (void) bunyan_warn(svp_bunyan, msg,
 567                             BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 568                             BUNYAN_T_INT32, "remote_port",
 569                             scp->sc_remote->sr_rport,
 570                             BUNYAN_T_INT32, "version", nvers,
 571                             BUNYAN_T_INT32, "operation", nop,
 572                             BUNYAN_T_INT32, "response_id", resp->svp_id,
 573                             BUNYAN_T_INT32, "response_size", nsize,
 574                             BUNYAN_T_INT32, "expected_size",
 575                             ((svp_log_req_t *)sqp->sq_rdata)->svlr_count,
 576                             BUNYAN_T_INT32, "query_state", sqp->sq_state,
 577                             BUNYAN_T_END);
 578                         return (B_FALSE);
 579                 }
 580         }
 581 
 582         sqp->sq_size = nsize;
 583         scp->sc_input.sci_query = sqp;
 584         if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK ||
 585             nop == SVP_R_LOG_RM_ACK || nop == SVP_R_ROUTE_ACK ||
 586             nop == SVP_R_PONG) {
 587                 sqp->sq_wdata = &sqp->sq_wdun;
 588                 sqp->sq_wsize = sizeof (svp_query_data_t);
 589         } else {
 590                 VERIFY(nop == SVP_R_LOG_ACK);
 591                 assert(sqp->sq_wdata != NULL);
 592                 assert(sqp->sq_wsize != 0);
 593         }
 594 
 595         return (B_TRUE);
 596 }
 597 
 598 static svp_conn_act_t
 599 svp_conn_pollin(svp_conn_t *scp)
 600 {
 601         size_t off, total;
 602         ssize_t ret;
 603         svp_query_t *sqp;
 604         uint32_t crc;
 605         uint16_t nop;
 606 
 607         assert(MUTEX_HELD(&scp->sc_lock));
 608 
 609         /*
 610          * No query implies that we're reading in the header and that the offset
 611          * is associted with it.
 612          */
 613         off = scp->sc_input.sci_offset;
 614         sqp = scp->sc_input.sci_query;
 615         if (scp->sc_input.sci_query == NULL) {
 616                 svp_req_t *resp = &scp->sc_input.sci_req;
 617 
 618                 assert(off < sizeof (svp_req_t));
 619 
 620                 do {
 621                         ret = read(scp->sc_socket,
 622                             (void *)((uintptr_t)resp + off),
 623                             sizeof (svp_req_t) - off);
 624                 } while (ret == -1 && errno == EINTR);
 625                 if (ret == -1) {
 626                         switch (errno) {
 627                         case EAGAIN:
 628                                 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 629                                 return (SVP_RA_NONE);
 630                         case EIO:
 631                         case ECONNRESET:
 632                                 return (SVP_RA_ERROR);
 633                                 break;
 634                         default:
 635                                 libvarpd_panic("unexpeted read errno: %d",
 636                                     errno);
 637                         }
 638                 } else if (ret == 0) {
 639                         /* Try to reconnect to the remote host */
 640                         return (SVP_RA_ERROR);
 641                 }
 642 
 643                 /* Didn't get all the data we need */
 644                 if (off + ret < sizeof (svp_req_t)) {
 645                         scp->sc_input.sci_offset += ret;
 646                         scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 647                         return (SVP_RA_NONE);
 648                 }
 649 
 650                 if (svp_conn_pollin_validate(scp) != B_TRUE)
 651                         return (SVP_RA_ERROR);
 652         }
 653 
 654         sqp = scp->sc_input.sci_query;
 655         assert(sqp != NULL);
 656         sqp->sq_acttime = gethrtime();
 657         total = ntohl(scp->sc_input.sci_req.svp_size);
 658         do {
 659                 ret = read(scp->sc_socket,
 660                     (void *)((uintptr_t)sqp->sq_wdata + off),
 661                     total - off);
 662         } while (ret == -1 && errno == EINTR);
 663 
 664         if (ret == -1) {
 665                 switch (errno) {
 666                 case EAGAIN:
 667                         scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 668                         return (SVP_RA_NONE);
 669                 case EIO:
 670                 case ECONNRESET:
 671                         return (SVP_RA_ERROR);
 672                         break;
 673                 default:
 674                         libvarpd_panic("unexpeted read errno: %d", errno);
 675                 }
 676         } else if (ret == 0 && total - off > 0) {
 677                 /* Try to reconnect to the remote host */
 678                 return (SVP_RA_ERROR);
 679         }
 680 
 681         if (ret + off < total) {
 682                 scp->sc_input.sci_offset += ret;
 683                 return (SVP_RA_NONE);
 684         }
 685 
 686         nop = ntohs(scp->sc_input.sci_req.svp_op);
 687         crc = scp->sc_input.sci_req.svp_crc32;
 688         svp_query_crc32(&scp->sc_input.sci_req, sqp->sq_wdata, total);
 689         if (crc != scp->sc_input.sci_req.svp_crc32) {
 690                 (void) bunyan_info(svp_bunyan, "crc32 mismatch",
 691                     BUNYAN_T_IP, "remote ip", &scp->sc_addr,
 692                     BUNYAN_T_INT32, "remote port", scp->sc_remote->sr_rport,
 693                     BUNYAN_T_INT32, "version",
 694                     ntohs(scp->sc_input.sci_req.svp_ver),
 695                     BUNYAN_T_INT32, "operation", nop,
 696                     BUNYAN_T_INT32, "response id",
 697                     ntohl(scp->sc_input.sci_req.svp_id),
 698                     BUNYAN_T_INT32, "query state", sqp->sq_state,
 699                     BUNYAN_T_UINT32, "msg_crc", ntohl(crc),
 700                     BUNYAN_T_UINT32, "calc_crc",
 701                     ntohl(scp->sc_input.sci_req.svp_crc32),
 702                     BUNYAN_T_END);
 703                 return (SVP_RA_ERROR);
 704         }
 705         scp->sc_input.sci_query = NULL;
 706         scp->sc_input.sci_offset = 0;
 707 
 708         if (nop == SVP_R_VL2_ACK) {
 709                 svp_vl2_ack_t *sl2a = sqp->sq_wdata;
 710                 sqp->sq_status = ntohl(sl2a->sl2a_status);
 711         } else if (nop == SVP_R_VL3_ACK) {
 712                 svp_vl3_ack_t *sl3a = sqp->sq_wdata;
 713                 sqp->sq_status = ntohl(sl3a->sl3a_status);
 714         } else if (nop == SVP_R_LOG_ACK) {
 715                 svp_log_ack_t *svla = sqp->sq_wdata;
 716                 sqp->sq_status = ntohl(svla->svla_status);
 717         } else if (nop == SVP_R_LOG_RM_ACK) {
 718                 svp_lrm_ack_t *svra = sqp->sq_wdata;
 719                 sqp->sq_status = ntohl(svra->svra_status);
 720         } else if (nop == SVP_R_ROUTE_ACK) {
 721                 svp_route_ack_t *sra = sqp->sq_wdata;
 722                 sqp->sq_status = ntohl(sra->sra_status);
 723         } else if (nop == SVP_R_PONG) {
 724                 /*
 725                  * Handle the PONG versioning-capture here, as we need
 726                  * the version number, the scp_lock held, and the ability
 727                  * to error out.
 728                  */
 729                 svp_conn_act_t cbret;
 730 
 731                 cbret = svp_conn_pong_handler(scp, sqp);
 732                 if (cbret != SVP_RA_NONE)
 733                         return (cbret);
 734         } else {
 735                 libvarpd_panic("unhandled nop: %d", nop);
 736         }
 737 
 738         list_remove(&scp->sc_queries, sqp);
 739         mutex_exit(&scp->sc_lock);
 740 
 741         /*
 742          * We have to release all of our resources associated with this entry
 743          * before we call the callback. After we call it, the memory will be
 744          * lost to time.
 745          */
 746         svp_query_release(sqp);
 747         sqp->sq_func(sqp, sqp->sq_arg);
 748         mutex_enter(&scp->sc_lock);
 749         scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 750 
 751         return (SVP_RA_NONE);
 752 }
 753 
 754 static svp_conn_act_t
 755 svp_conn_reset(svp_conn_t *scp)
 756 {
 757         svp_remote_t *srp = scp->sc_remote;
 758 
 759         assert(MUTEX_HELD(&srp->sr_lock));
 760         assert(MUTEX_HELD(&scp->sc_lock));
 761 
 762         assert(svp_event_dissociate(&scp->sc_event, scp->sc_socket) ==
 763             ENOENT);
 764         if (close(scp->sc_socket) != 0)
 765                 libvarpd_panic("failed to close socket %d: %d", scp->sc_socket,
 766                     errno);
 767         scp->sc_flags &= ~SVP_CF_TEARDOWN;
 768         scp->sc_socket = -1;
 769         scp->sc_cstate = SVP_CS_INITIAL;
 770         scp->sc_input.sci_query = NULL;
 771         scp->sc_output.sco_query = NULL;
 772 
 773         svp_remote_reassign(srp, scp);
 774 
 775         return (svp_conn_connect(scp));
 776 }
 777 
 778 /*
 779  * This is our general state transition function. We're called here when we want
 780  * to advance part of our state machine as well as to re-arm ourselves. We can
 781  * also end up here from the standard event loop as a result of having a user
 782  * event posted.
 783  */
 784 static void
 785 svp_conn_handler(port_event_t *pe, void *arg)
 786 {
 787         svp_conn_t *scp = arg;
 788         svp_remote_t *srp = scp->sc_remote;
 789         svp_conn_act_t ret = SVP_RA_NONE;
 790         svp_conn_state_t oldstate;
 791 
 792         mutex_enter(&scp->sc_lock);
 793 
 794         /*
 795          * Check if one of our event interrupts is set. An event interrupt, such
 796          * as having to be reaped or be torndown is notified by a
 797          * PORT_SOURCE_USER event that tries to take care of this. However,
 798          * because of the fact that the event loop can be ongoing despite this,
 799          * we may get here before the PORT_SOURCE_USER has casued us to get
 800          * here. In such a case, if the PORT_SOURCE_USER event is tagged, then
 801          * we're going to opt to do nothing here and wait for it to come and
 802          * tear us down. That will also indicate to us that we have nothing to
 803          * worry about as far as general timing and the like goes.
 804          */
 805         if ((scp->sc_flags & SVP_CF_UFLAG) != 0 &&
 806             (scp->sc_flags & SVP_CF_USER) != 0 &&
 807             pe != NULL &&
 808             pe->portev_source != PORT_SOURCE_USER) {
 809                 mutex_exit(&scp->sc_lock);
 810                 return;
 811         }
 812 
 813         if (pe != NULL && pe->portev_source == PORT_SOURCE_USER) {
 814                 scp->sc_flags &= ~SVP_CF_USER;
 815                 if ((scp->sc_flags & SVP_CF_UFLAG) == 0) {
 816                         mutex_exit(&scp->sc_lock);
 817                         return;
 818                 }
 819         }
 820 
 821         /* Check if this needs to be freed */
 822         if (scp->sc_flags & SVP_CF_REAP) {
 823                 mutex_exit(&scp->sc_lock);
 824                 svp_conn_destroy(scp);
 825                 return;
 826         }
 827 
 828         /* Check if this needs to be reset */
 829         if (scp->sc_flags & SVP_CF_TEARDOWN) {
 830                 /* Make sure any other users of this are disassociated */
 831                 ret = SVP_RA_ERROR;
 832                 goto out;
 833         }
 834 
 835         switch (scp->sc_cstate) {
 836         case SVP_CS_INITIAL:
 837         case SVP_CS_BACKOFF:
 838                 assert(pe == NULL);
 839                 ret = svp_conn_connect(scp);
 840                 break;
 841         case SVP_CS_CONNECTING:
 842                 assert(pe != NULL);
 843                 ret = svp_conn_poll_connect(pe, scp);
 844                 break;
 845         case SVP_CS_VERSIONING:
 846         case SVP_CS_ACTIVE:
 847         case SVP_CS_WINDDOWN:
 848                 assert(pe != NULL);
 849                 oldstate = scp->sc_cstate;
 850                 if (pe->portev_events & POLLOUT)
 851                         ret = svp_conn_pollout(scp);
 852                 if (ret == SVP_RA_NONE && (pe->portev_events & POLLIN))
 853                         ret = svp_conn_pollin(scp);
 854 
 855                 if (oldstate == SVP_CS_WINDDOWN &&
 856                     (list_is_empty(&scp->sc_queries) || ret != SVP_RA_NONE)) {
 857                         ret = SVP_RA_CLEANUP;
 858                 }
 859 
 860                 if (ret == SVP_RA_NONE) {
 861                         int err;
 862                         if ((err = svp_event_associate(&scp->sc_event,
 863                             scp->sc_socket)) != 0) {
 864                                 scp->sc_error = SVP_CE_ASSOCIATE;
 865                                 scp->sc_errno = err;
 866                                 scp->sc_cstate = SVP_CS_ERROR;
 867                                 ret = SVP_RA_DEGRADE;
 868                         }
 869                 }
 870                 break;
 871         default:
 872                 libvarpd_panic("svp_conn_handler encountered unexpected "
 873                     "state: %d", scp->sc_cstate);
 874         }
 875 out:
 876         mutex_exit(&scp->sc_lock);
 877 
 878         if (ret == SVP_RA_NONE)
 879                 return;
 880 
 881         mutex_enter(&srp->sr_lock);
 882         mutex_enter(&scp->sc_lock);
 883         if (ret == SVP_RA_FIND_VERSION)
 884                 ret = svp_conn_ping_version(scp);
 885 
 886         if (ret == SVP_RA_ERROR)
 887                 ret = svp_conn_reset(scp);
 888 
 889         if (ret == SVP_RA_DEGRADE)
 890                 svp_conn_degrade(scp);
 891         if (ret == SVP_RA_RESTORE)
 892                 svp_conn_restore(scp);
 893 
 894         if (ret == SVP_RA_CLEANUP) {
 895                 svp_conn_remove(scp);
 896                 scp->sc_flags |= SVP_CF_REAP;
 897                 svp_conn_inject(scp);
 898         }
 899         mutex_exit(&scp->sc_lock);
 900         mutex_exit(&srp->sr_lock);
 901 }
 902 
 903 static void
 904 svp_conn_backtimer(void *arg)
 905 {
 906         svp_conn_t *scp = arg;
 907 
 908         svp_conn_handler(NULL, scp);
 909 }
 910 
 911 /*
 912  * This fires every svp_conn_query_timeout seconds. Its purpos is to determine
 913  * if we haven't heard back on a request with in svp_conn_query_timeout seconds.
 914  * If any of the svp_conn_query_t's that have been started (indicated by
 915  * svp_query_t`sq_acttime != -1), and more than svp_conn_query_timeout seconds
 916  * have passed, we basically tear this connection down and reassign outstanding
 917  * queries.
 918  */
 919 static void
 920 svp_conn_querytimer(void *arg)
 921 {
 922         int ret;
 923         svp_query_t *sqp;
 924         svp_conn_t *scp = arg;
 925         hrtime_t now = gethrtime();
 926 
 927         mutex_enter(&scp->sc_lock);
 928 
 929         /*
 930          * If we're not in the active state, then we don't care about this as
 931          * we're already either going to die or we have no connections to worry
 932          * about.
 933          */
 934         if (scp->sc_cstate != SVP_CS_ACTIVE) {
 935                 mutex_exit(&scp->sc_lock);
 936                 return;
 937         }
 938 
 939         for (sqp = list_head(&scp->sc_queries); sqp != NULL;
 940             sqp = list_next(&scp->sc_queries, sqp)) {
 941                 if (sqp->sq_acttime == -1)
 942                         continue;
 943                 if ((now - sqp->sq_acttime) / NANOSEC > svp_conn_query_timeout)
 944                         break;
 945         }
 946 
 947         /* Nothing timed out, we're good here */
 948         if (sqp == NULL) {
 949                 mutex_exit(&scp->sc_lock);
 950                 return;
 951         }
 952 
 953         (void) bunyan_warn(svp_bunyan, "query timed out on connection",
 954             BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 955             BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 956             BUNYAN_T_INT32, "operation", ntohs(sqp->sq_header.svp_op),
 957             BUNYAN_T_END);
 958 
 959         /*
 960          * Begin the tear down process for this connect. If we lose the
 961          * disassociate, then we don't inject an event. See the big theory
 962          * statement in libvarpd_svp.c for more information.
 963          */
 964         scp->sc_flags |= SVP_CF_TEARDOWN;
 965 
 966         ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket);
 967         if (ret == 0)
 968                 svp_conn_inject(scp);
 969         else
 970                 VERIFY(ret == ENOENT);
 971 
 972         mutex_exit(&scp->sc_lock);
 973 }
 974 
 975 /*
 976  * This connection has fallen out of DNS, figure out what we need to do with it.
 977  */
 978 void
 979 svp_conn_fallout(svp_conn_t *scp)
 980 {
 981         svp_remote_t *srp = scp->sc_remote;
 982 
 983         assert(MUTEX_HELD(&srp->sr_lock));
 984 
 985         mutex_enter(&scp->sc_lock);
 986         switch (scp->sc_cstate) {
 987         case SVP_CS_ERROR:
 988                 /*
 989                  * Connection is already inactive, so it's safe to tear down.
 990                  * Fire it off through the state machine to tear down via the
 991                  * backoff timer.
 992                  */
 993                 svp_conn_remove(scp);
 994                 scp->sc_flags |= SVP_CF_REAP;
 995                 svp_conn_inject(scp);
 996                 break;
 997         case SVP_CS_INITIAL:
 998         case SVP_CS_BACKOFF:
 999         case SVP_CS_CONNECTING:
1000                 /*
1001                  * Here, we have something actively going on, so we'll let it be
1002                  * clean up the next time we hit the event loop by the event
1003                  * loop itself. As it has no connections, there isn't much to
1004                  * really do, though we'll take this chance to go ahead and
1005                  * remove it from the remote.
1006                  */
1007                 svp_conn_remove(scp);
1008                 scp->sc_flags |= SVP_CF_REAP;
1009                 svp_conn_inject(scp);
1010                 break;
1011         case SVP_CS_ACTIVE:
1012         case SVP_CS_WINDDOWN:
1013                 /*
1014                  * If there are no outstanding queries, then we should simply
1015                  * clean this up now,t he same way we would with the others.
1016                  * Othewrise, as we know the event loop is ongoing, we'll make
1017                  * sure that these entries get cleaned up once they're done.
1018                  */
1019                 scp->sc_cstate = SVP_CS_WINDDOWN;
1020                 if (list_is_empty(&scp->sc_queries)) {
1021                         svp_conn_remove(scp);
1022                         scp->sc_flags |= SVP_CF_REAP;
1023                         svp_conn_inject(scp);
1024                 }
1025                 break;
1026         default:
1027                 libvarpd_panic("svp_conn_fallout encountered"
1028                     "unkonwn state");
1029         }
1030         mutex_exit(&scp->sc_lock);
1031         mutex_exit(&srp->sr_lock);
1032 }
1033 
1034 int
1035 svp_conn_create(svp_remote_t *srp, const struct in6_addr *addr)
1036 {
1037         int ret;
1038         svp_conn_t *scp;
1039 
1040         assert(MUTEX_HELD(&srp->sr_lock));
1041         scp = umem_zalloc(sizeof (svp_conn_t), UMEM_DEFAULT);
1042         if (scp == NULL)
1043                 return (ENOMEM);
1044 
1045         if ((ret = mutex_init(&scp->sc_lock, USYNC_THREAD | LOCK_ERRORCHECK,
1046             NULL)) != 0) {
1047                 umem_free(scp, sizeof (svp_conn_t));
1048                 return (ret);
1049         }
1050 
1051         scp->sc_remote = srp;
1052         scp->sc_event.se_func = svp_conn_handler;
1053         scp->sc_event.se_arg = scp;
1054         scp->sc_btimer.st_func = svp_conn_backtimer;
1055         scp->sc_btimer.st_arg = scp;
1056         scp->sc_btimer.st_oneshot = B_TRUE;
1057         scp->sc_btimer.st_value = 1;
1058 
1059         scp->sc_qtimer.st_func = svp_conn_querytimer;
1060         scp->sc_qtimer.st_arg = scp;
1061         scp->sc_qtimer.st_oneshot = B_FALSE;
1062         scp->sc_qtimer.st_value = svp_conn_query_timeout;
1063 
1064         scp->sc_socket = -1;
1065 
1066         list_create(&scp->sc_queries, sizeof (svp_query_t),
1067             offsetof(svp_query_t, sq_lnode));
1068         scp->sc_gen = srp->sr_gen;
1069         bcopy(addr, &scp->sc_addr, sizeof (struct in6_addr));
1070         scp->sc_cstate = SVP_CS_INITIAL;
1071         mutex_enter(&scp->sc_lock);
1072         svp_conn_add(scp);
1073         mutex_exit(&scp->sc_lock);
1074 
1075         /* Now that we're locked and loaded, add our timers */
1076         svp_timer_add(&scp->sc_qtimer);
1077         svp_timer_add(&scp->sc_btimer);
1078 
1079         return (0);
1080 }
1081 
1082 /*
1083  * At the time of calling, the entry has been removed from all lists. In
1084  * addition, the entries state should be SVP_CS_ERROR, therefore, we know that
1085  * the fd should not be associated with the event loop. We'll double check that
1086  * just in case. We should also have already been removed from the remote's
1087  * list.
1088  */
1089 void
1090 svp_conn_destroy(svp_conn_t *scp)
1091 {
1092         int ret;
1093 
1094         mutex_enter(&scp->sc_lock);
1095         if (scp->sc_cstate != SVP_CS_ERROR)
1096                 libvarpd_panic("asked to tear down an active connection");
1097         if (scp->sc_flags & SVP_CF_ADDED)
1098                 libvarpd_panic("asked to remove a connection still in "
1099                     "the remote list\n");
1100         if (!list_is_empty(&scp->sc_queries))
1101                 libvarpd_panic("asked to remove a connection with non-empty "
1102                     "query list");
1103 
1104         if ((ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket)) !=
1105             ENOENT) {
1106                 libvarpd_panic("dissociate failed or was actually "
1107                     "associated: %d", ret);
1108         }
1109         mutex_exit(&scp->sc_lock);
1110 
1111         /* Verify our timers are killed */
1112         svp_timer_remove(&scp->sc_btimer);
1113         svp_timer_remove(&scp->sc_qtimer);
1114 
1115         if (scp->sc_socket != -1 && close(scp->sc_socket) != 0)
1116                 libvarpd_panic("failed to close svp_conn_t`scp_socket fd "
1117                     "%d: %d", scp->sc_socket, errno);
1118 
1119         list_destroy(&scp->sc_queries);
1120         umem_free(scp, sizeof (svp_conn_t));
1121 }
1122 
1123 void
1124 svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp)
1125 {
1126         assert(MUTEX_HELD(&scp->sc_lock));
1127         assert(scp->sc_cstate == SVP_CS_ACTIVE ||
1128             scp->sc_cstate == SVP_CS_VERSIONING);
1129 
1130         sqp->sq_acttime = -1;
1131         list_insert_tail(&scp->sc_queries, sqp);
1132         if (!(scp->sc_event.se_events & POLLOUT)) {
1133                 scp->sc_event.se_events |= POLLOUT;
1134                 /*
1135                  * If this becomes frequent, we should instead give up on this
1136                  * set of connections instead of aborting.
1137                  */
1138                 if (svp_event_associate(&scp->sc_event, scp->sc_socket) != 0)
1139                         libvarpd_panic("svp_event_associate failed somehow");
1140         }
1141 }