1 /*
   2  * This file and its contents are supplied under the terms of the
   3  * Common Development and Distribution License ("CDDL"), version 1.0.
   4  * You may only use this file in accordance with the terms of version
   5  * 1.0 of the CDDL.
   6  *
   7  * A full copy of the text of the CDDL should have accompanied this
   8  * source.  A copy of the CDDL is also available via the Internet at
   9  * http://www.illumos.org/license/CDDL.
  10  */
  11 
  12 /*
  13  * Copyright 2015 Joyent, Inc.
  14  */
  15 
  16 /*
  17  * Logic to manage an individual connection to a remote host.
  18  *
  19  * For more information, see the big theory statement in
  20  * lib/varpd/svp/common/libvarpd_svp.c.
  21  */
  22 
  23 #include <assert.h>
  24 #include <umem.h>
  25 #include <errno.h>
  26 #include <strings.h>
  27 #include <unistd.h>
  28 #include <stddef.h>
  29 #include <sys/uio.h>
  30 #include <sys/debug.h>
  31 
  32 #include <libvarpd_svp.h>
  33 
  34 int svp_conn_query_timeout = 30;
  35 static int svp_conn_backoff_tbl[] = { 1, 2, 4, 8, 16, 32 };
  36 static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int);
  37 
  38 typedef enum svp_conn_act {
  39         SVP_RA_NONE     = 0x00,
  40         SVP_RA_DEGRADE  = 0x01,
  41         SVP_RA_RESTORE  = 0x02,
  42         SVP_RA_ERROR    = 0x03,
  43         SVP_RA_CLEANUP  = 0x04
  44 } svp_conn_act_t;
  45 
  46 static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
  47 
  48 static void
  49 svp_conn_inject(svp_conn_t *scp)
  50 {
  51         int ret;
  52         assert(MUTEX_HELD(&scp->sc_lock));
  53 
  54         if (scp->sc_flags & SVP_CF_USER)
  55                 return;
  56         scp->sc_flags |= SVP_CF_USER;
  57         if ((ret = svp_event_inject(&scp->sc_event)) != 0)
  58                 libvarpd_panic("failed to inject event: %d\n", ret);
  59 }
  60 
  61 static void
  62 svp_conn_degrade(svp_conn_t *scp)
  63 {
  64         svp_remote_t *srp = scp->sc_remote;
  65 
  66         assert(MUTEX_HELD(&srp->sr_lock));
  67         assert(MUTEX_HELD(&scp->sc_lock));
  68 
  69         if (scp->sc_flags & SVP_CF_DEGRADED)
  70                 return;
  71 
  72         scp->sc_flags |= SVP_CF_DEGRADED;
  73         srp->sr_ndconns++;
  74         if (srp->sr_ndconns == srp->sr_tconns)
  75                 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
  76 }
  77 
  78 static void
  79 svp_conn_restore(svp_conn_t *scp)
  80 {
  81         svp_remote_t *srp = scp->sc_remote;
  82 
  83         assert(MUTEX_HELD(&srp->sr_lock));
  84         assert(MUTEX_HELD(&scp->sc_lock));
  85 
  86         if (!(scp->sc_flags & SVP_CF_DEGRADED))
  87                 return;
  88 
  89         scp->sc_flags &= ~SVP_CF_DEGRADED;
  90         if (srp->sr_ndconns == srp->sr_tconns)
  91                 svp_remote_restore(srp, SVP_RD_REMOTE_FAIL);
  92         srp->sr_ndconns--;
  93 }
  94 
  95 static void
  96 svp_conn_add(svp_conn_t *scp)
  97 {
  98         svp_remote_t *srp = scp->sc_remote;
  99 
 100         assert(MUTEX_HELD(&srp->sr_lock));
 101         assert(MUTEX_HELD(&scp->sc_lock));
 102 
 103         if (scp->sc_flags & SVP_CF_ADDED)
 104                 return;
 105 
 106         list_insert_tail(&srp->sr_conns, scp);
 107         scp->sc_flags |= SVP_CF_ADDED;
 108         srp->sr_tconns++;
 109 }
 110 
 111 static void
 112 svp_conn_remove(svp_conn_t *scp)
 113 {
 114         svp_remote_t *srp = scp->sc_remote;
 115 
 116         assert(MUTEX_HELD(&srp->sr_lock));
 117         assert(MUTEX_HELD(&scp->sc_lock));
 118 
 119         if (!(scp->sc_flags & SVP_CF_ADDED))
 120                 return;
 121 
 122         scp->sc_flags &= ~SVP_CF_ADDED;
 123         if (scp->sc_flags & SVP_CF_DEGRADED)
 124                 srp->sr_ndconns--;
 125         srp->sr_tconns--;
 126         if (srp->sr_tconns == srp->sr_ndconns)
 127                 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
 128 }
 129 
 130 static svp_query_t *
 131 svp_conn_query_find(svp_conn_t *scp, uint32_t id)
 132 {
 133         svp_query_t *sqp;
 134 
 135         assert(MUTEX_HELD(&scp->sc_lock));
 136 
 137         for (sqp = list_head(&scp->sc_queries); sqp != NULL;
 138             sqp = list_next(&scp->sc_queries, sqp)) {
 139                 if (sqp->sq_header.svp_id == id)
 140                         break;
 141         }
 142 
 143         return (sqp);
 144 }
 145 
 146 static svp_conn_act_t
 147 svp_conn_backoff(svp_conn_t *scp)
 148 {
 149         assert(MUTEX_HELD(&scp->sc_lock));
 150 
 151         if (close(scp->sc_socket) != 0)
 152                 libvarpd_panic("failed to close socket %d: %d\n",
 153                     scp->sc_socket, errno);
 154         scp->sc_socket = -1;
 155 
 156         scp->sc_cstate = SVP_CS_BACKOFF;
 157         scp->sc_nbackoff++;
 158         if (scp->sc_nbackoff >= svp_conn_nbackoff) {
 159                 scp->sc_btimer.st_value =
 160                     svp_conn_backoff_tbl[svp_conn_nbackoff - 1];
 161         } else {
 162                 scp->sc_btimer.st_value =
 163                     svp_conn_backoff_tbl[scp->sc_nbackoff - 1];
 164         }
 165         svp_timer_add(&scp->sc_btimer);
 166 
 167         if (scp->sc_nbackoff > svp_conn_nbackoff)
 168                 return (SVP_RA_DEGRADE);
 169         return (SVP_RA_NONE);
 170 }
 171 
 172 /*
 173  * Think of this as an extension to the connect() call in svp_conn_connect().
 174  * Send a message, receive it, and set the version here.  If the response is
 175  * too slow or the socket throws an error, indicate a socket error, which
 176  * will cause the caller to backoff (i.e. close the socket and try again).
 177  *
 178  * Version mismatch (corrupt SVP server or too-advanced SVP server) is its
 179  * own error type.
 180  */
 181 static svp_conn_error_t
 182 svp_conn_version_set(svp_conn_t *scp)
 183 {
 184         svp_req_t ping;
 185         ssize_t ret;
 186         uint32_t save_crc;
 187         uint16_t peer_version;
 188         int ntries = 3; /* One second between tries. 3secs should be enough. */
 189 
 190         ping.svp_ver = htons(SVP_CURRENT_VERSION);
 191         ping.svp_op = htons(SVP_R_PING);
 192         ping.svp_size = 0;      /* Header-only... */
 193         ping.svp_id = 0;
 194         /* 0-length data... just use the req buffer for the pointer. */
 195         svp_query_crc32(&ping, &ping, 0);
 196 
 197         ret = write(scp->sc_socket, &ping, sizeof (ping));
 198         if (ret == -1) {
 199                 /*
 200                  * A failed write() call right after connect probably
 201                  * indicates a larger connection failure.  Restart the
 202                  * connection from scratch.
 203                  */
 204                 return (SVP_CE_SOCKET);
 205         }
 206         assert(ret == sizeof (ping));
 207         do {
 208                 /*
 209                  * Asynch read.  We may loop here once or twice.
 210                  * Wait a bit, but don't loop too many times...
 211                  */
 212                 (void) sleep(1);
 213                 ret = read(scp->sc_socket, &ping, sizeof (ping));
 214         } while (--ntries > 0 &&
 215             ret == -1 && (errno == EINTR || errno == EAGAIN));
 216         if (ret == -1) {
 217                 /*
 218                  * This is actually a failed read() call.  Restart the
 219                  * connection from scratch.
 220                  */
 221                 return (SVP_CE_SOCKET);
 222         }
 223 
 224         save_crc = ping.svp_crc32;
 225         svp_query_crc32(&ping, &ping, 0);
 226         peer_version = htons(ping.svp_ver);
 227         if (ping.svp_op != htons(SVP_R_PONG) ||
 228             ping.svp_size != 0 || ping.svp_id != 0 ||
 229             ping.svp_crc32 != save_crc ||
 230             peer_version == 0 || peer_version > SVP_CURRENT_VERSION) {
 231                 return (SVP_CE_VERSION_PONG);
 232         }
 233 
 234         /* This connection now has a version! */
 235         scp->sc_version = peer_version;
 236         return (SVP_CE_NONE);
 237 }
 238 
 239 static svp_conn_act_t
 240 svp_conn_connect(svp_conn_t *scp)
 241 {
 242         int ret;
 243         struct sockaddr_in6 in6;
 244 
 245         assert(MUTEX_HELD(&scp->sc_lock));
 246         assert(scp->sc_cstate == SVP_CS_BACKOFF ||
 247             scp->sc_cstate == SVP_CS_INITIAL);
 248         assert(scp->sc_socket == -1);
 249         if (scp->sc_cstate == SVP_CS_INITIAL)
 250                 scp->sc_nbackoff = 0;
 251 
 252         /* New connect means we need to know the version. */
 253         scp->sc_version = 0;
 254 
 255         scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
 256         if (scp->sc_socket == -1) {
 257                 scp->sc_error = SVP_CE_SOCKET;
 258                 scp->sc_errno = errno;
 259                 scp->sc_cstate = SVP_CS_ERROR;
 260                 return (SVP_RA_DEGRADE);
 261         }
 262 
 263         bzero(&in6, sizeof (struct sockaddr_in6));
 264         in6.sin6_family = AF_INET6;
 265         in6.sin6_port = htons(scp->sc_remote->sr_rport);
 266         bcopy(&scp->sc_addr, &in6.sin6_addr,  sizeof (struct in6_addr));
 267         ret = connect(scp->sc_socket, (struct sockaddr *)&in6,
 268             sizeof (struct sockaddr_in6));
 269         if (ret != 0) {
 270                 boolean_t async = B_FALSE;
 271 
 272                 switch (errno) {
 273                 case EACCES:
 274                 case EADDRINUSE:
 275                 case EAFNOSUPPORT:
 276                 case EALREADY:
 277                 case EBADF:
 278                 case EISCONN:
 279                 case ELOOP:
 280                 case ENOENT:
 281                 case ENOSR:
 282                 case EWOULDBLOCK:
 283                         libvarpd_panic("unanticipated connect errno %d", errno);
 284                         break;
 285                 case EINPROGRESS:
 286                 case EINTR:
 287                         async = B_TRUE;
 288                 default:
 289                         break;
 290                 }
 291 
 292                 /*
 293                  * So, we will be connecting to this in the future, advance our
 294                  * state and make sure that we poll for the next round.
 295                  */
 296                 if (async == B_TRUE) {
 297                         scp->sc_cstate = SVP_CS_CONNECTING;
 298                         scp->sc_event.se_events = POLLOUT | POLLHUP;
 299                         ret = svp_event_associate(&scp->sc_event,
 300                             scp->sc_socket);
 301                         if (ret == 0)
 302                                 return (SVP_RA_NONE);
 303                         scp->sc_error = SVP_CE_ASSOCIATE;
 304                         scp->sc_errno = ret;
 305                         scp->sc_cstate = SVP_CS_ERROR;
 306                         return (SVP_RA_DEGRADE);
 307                 } else {
 308                         /*
 309                          * This call failed, which means that we obtained one of
 310                          * the following:
 311                          *
 312                          * EADDRNOTAVAIL
 313                          * ECONNREFUSED
 314                          * EIO
 315                          * ENETUNREACH
 316                          * EHOSTUNREACH
 317                          * ENXIO
 318                          * ETIMEDOUT
 319                          *
 320                          * Therefore we need to set ourselves into backoff and
 321                          * wait for that to clear up.
 322                          */
 323                         return (svp_conn_backoff(scp));
 324                 }
 325         }
 326 
 327         return (svp_conn_poll_connect(NULL, scp));
 328 }
 329 
 330 /*
 331  * This should be the first call we get after a successful synchronous
 332  * connect, or a completed (failed or successful) asynchronous connect.  A
 333  * non-NULL port-event indicates asynchronous completion, a NULL port-event
 334  * indicates a successful synchronous connect.
 335  * 
 336  * If we have successfully connected, we should see a writeable event.  In the
 337  * asynchronous case, we may also see an error or a hang up. For either hang
 338  * up or error, we transition to error mode. If there is also a readable event
 339  * (i.e. incoming data), we ignore it at the moment and just let a
 340  * reassociation pick it up so we can simplify the set of state transitions
 341  * that we have.
 342  */
 343 static svp_conn_act_t
 344 svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
 345 {
 346         int ret;
 347         svp_conn_error_t version_error;
 348 
 349         if (pe != NULL) {
 350                 int err;
 351                 socklen_t sl = sizeof (err);
 352 
 353                 /*
 354                  * These bits only matter if we're notified of an
 355                  * asynchronous connection completion.
 356                  */
 357                 if (!(pe->portev_events & POLLOUT)) {
 358                         scp->sc_errno = 0;
 359                         scp->sc_error = SVP_CE_NOPOLLOUT;
 360                         scp->sc_cstate = SVP_CS_ERROR;
 361                         return (SVP_RA_DEGRADE);
 362                 }
 363 
 364                 ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err,
 365                     &sl);
 366                 if (ret != 0)
 367                         libvarpd_panic("unanticipated getsockopt error");
 368                 if (err != 0) {
 369                         return (svp_conn_backoff(scp));
 370                 }
 371         }
 372 
 373         /* Use a single SVP_R_PING to determine the version. */
 374         version_error = svp_conn_version_set(scp);
 375         switch (version_error) {
 376         case SVP_CE_SOCKET:
 377                 /* Use this to signal read/write errors... */
 378                 return (svp_conn_backoff(scp));
 379         case SVP_CE_NONE:
 380                 assert(scp->sc_version > 0 &&
 381                     scp->sc_version <= SVP_CURRENT_VERSION);
 382                 break;
 383         default:
 384                 scp->sc_error = version_error;
 385                 scp->sc_cstate = SVP_CS_ERROR;
 386                 scp->sc_errno = EPROTONOSUPPORT;     /* Protocol error... */
 387                 return (SVP_RA_DEGRADE);
 388         }
 389 
 390         scp->sc_cstate = SVP_CS_ACTIVE;
 391         scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
 392         ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
 393         if (ret == 0)
 394                 return (SVP_RA_RESTORE);
 395         scp->sc_error = SVP_CE_ASSOCIATE;
 396         scp->sc_errno = ret;
 397         scp->sc_cstate = SVP_CS_ERROR;
 398         return (SVP_RA_DEGRADE);
 399 }
 400 
 401 static svp_conn_act_t
 402 svp_conn_pollout(svp_conn_t *scp)
 403 {
 404         svp_query_t *sqp;
 405         svp_req_t *req;
 406         size_t off;
 407         struct iovec iov[2];
 408         int nvecs = 0;
 409         ssize_t ret;
 410 
 411         assert(MUTEX_HELD(&scp->sc_lock));
 412 
 413         /*
 414          * We need to find a query and start writing it out.
 415          */
 416         if (scp->sc_output.sco_query == NULL) {
 417                 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
 418                     sqp = list_next(&scp->sc_queries, sqp)) {
 419                         if (sqp->sq_state != SVP_QUERY_INIT)
 420                                 continue;
 421                         break;
 422                 }
 423 
 424                 if (sqp == NULL) {
 425                         scp->sc_event.se_events &= ~POLLOUT;
 426                         return (SVP_RA_NONE);
 427                 }
 428 
 429                 scp->sc_output.sco_query = sqp;
 430                 scp->sc_output.sco_offset = 0;
 431                 sqp->sq_state = SVP_QUERY_WRITING;
 432                 svp_query_crc32(&sqp->sq_header, sqp->sq_rdata, sqp->sq_rsize);
 433         }
 434 
 435         sqp = scp->sc_output.sco_query;
 436         req = &sqp->sq_header;
 437         off = scp->sc_output.sco_offset;
 438         if (off < sizeof (svp_req_t)) {
 439                 iov[nvecs].iov_base = (void *)((uintptr_t)req + off);
 440                 iov[nvecs].iov_len = sizeof (svp_req_t) - off;
 441                 nvecs++;
 442                 off = 0;
 443         } else {
 444                 off -= sizeof (svp_req_t);
 445         }
 446 
 447         iov[nvecs].iov_base = (void *)((uintptr_t)sqp->sq_rdata + off);
 448         iov[nvecs].iov_len = sqp->sq_rsize - off;
 449         nvecs++;
 450 
 451         do {
 452                 ret = writev(scp->sc_socket, iov, nvecs);
 453         } while (ret == -1 && errno == EINTR);
 454         if (ret == -1) {
 455                 switch (errno) {
 456                 case EAGAIN:
 457                         scp->sc_event.se_events |= POLLOUT;
 458                         return (SVP_RA_NONE);
 459                 case EIO:
 460                 case ENXIO:
 461                 case ECONNRESET:
 462                         return (SVP_RA_ERROR);
 463                 default:
 464                         libvarpd_panic("unexpected errno: %d", errno);
 465                 }
 466         }
 467 
 468         sqp->sq_acttime = gethrtime();
 469         scp->sc_output.sco_offset += ret;
 470         if (ret >= sizeof (svp_req_t) + sqp->sq_rsize) {
 471                 sqp->sq_state = SVP_QUERY_READING;
 472                 scp->sc_output.sco_query = NULL;
 473                 scp->sc_output.sco_offset = 0;
 474                 scp->sc_event.se_events |= POLLOUT;
 475         }
 476         return (SVP_RA_NONE);
 477 }
 478 
 479 static boolean_t
 480 svp_conn_pollin_validate(svp_conn_t *scp)
 481 {
 482         svp_query_t *sqp;
 483         uint32_t nsize;
 484         uint16_t nvers, nop;
 485         svp_req_t *resp = &scp->sc_input.sci_req;
 486 
 487         assert(MUTEX_HELD(&scp->sc_lock));
 488 
 489         nvers = ntohs(resp->svp_ver);
 490         nop = ntohs(resp->svp_op);
 491         nsize = ntohl(resp->svp_size);
 492 
 493         /*
 494          * A peer that's messing with post-connection version changes is
 495          * likely a broken peer.
 496          */
 497         if (nvers != scp->sc_version) {
 498                 (void) bunyan_warn(svp_bunyan, "version mismatch",
 499                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 500                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 501                     BUNYAN_T_INT32, "peer version", nvers,
 502                     BUNYAN_T_INT32, "our version", scp->sc_version,
 503                     BUNYAN_T_INT32, "operation", nop,
 504                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 505                     BUNYAN_T_END);
 506                 return (B_FALSE);
 507         }
 508 
 509         if (nop != SVP_R_VL2_ACK && nop != SVP_R_VL3_ACK &&
 510             nop != SVP_R_LOG_ACK && nop != SVP_R_LOG_RM_ACK) {
 511                 (void) bunyan_warn(svp_bunyan, "unsupported operation",
 512                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 513                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 514                     BUNYAN_T_INT32, "version", nvers,
 515                     BUNYAN_T_INT32, "operation", nop,
 516                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 517                     BUNYAN_T_END);
 518                 return (B_FALSE);
 519         }
 520 
 521         sqp = svp_conn_query_find(scp, resp->svp_id);
 522         if (sqp == NULL) {
 523                 (void) bunyan_warn(svp_bunyan, "unknown response id",
 524                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 525                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 526                     BUNYAN_T_INT32, "version", nvers,
 527                     BUNYAN_T_INT32, "operation", nop,
 528                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 529                     BUNYAN_T_END);
 530                 return (B_FALSE);
 531         }
 532 
 533         if (sqp->sq_state != SVP_QUERY_READING) {
 534                 (void) bunyan_warn(svp_bunyan,
 535                     "got response for unexpecting query",
 536                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 537                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 538                     BUNYAN_T_INT32, "version", nvers,
 539                     BUNYAN_T_INT32, "operation", nop,
 540                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 541                     BUNYAN_T_INT32, "query_state", sqp->sq_state,
 542                     BUNYAN_T_END);
 543                 return (B_FALSE);
 544         }
 545 
 546         if ((nop == SVP_R_VL2_ACK && nsize != sizeof (svp_vl2_ack_t)) ||
 547             (nop == SVP_R_VL3_ACK && nsize != sizeof (svp_vl3_ack_t)) ||
 548             (nop == SVP_R_LOG_RM_ACK && nsize != sizeof (svp_lrm_ack_t))) {
 549                 (void) bunyan_warn(svp_bunyan, "response size too large",
 550                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 551                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 552                     BUNYAN_T_INT32, "version", nvers,
 553                     BUNYAN_T_INT32, "operation", nop,
 554                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 555                     BUNYAN_T_INT32, "response_size", nsize,
 556                     BUNYAN_T_INT32, "expected_size", nop == SVP_R_VL2_ACK ?
 557                     sizeof (svp_vl2_ack_t) : sizeof (svp_vl3_ack_t),
 558                     BUNYAN_T_INT32, "query_state", sqp->sq_state,
 559                     BUNYAN_T_END);
 560                 return (B_FALSE);
 561         }
 562 
 563         /*
 564          * The valid size is anything <= to what the user requested, but at
 565          * least svp_log_ack_t bytes large.
 566          */
 567         if (nop == SVP_R_LOG_ACK) {
 568                 const char *msg = NULL;
 569                 if (nsize < sizeof (svp_log_ack_t))
 570                         msg = "response size too small";
 571                 else if (nsize > ((svp_log_req_t *)sqp->sq_rdata)->svlr_count)
 572                         msg = "response size too large";
 573                 if (msg != NULL) {
 574                         (void) bunyan_warn(svp_bunyan, msg,
 575                             BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 576                             BUNYAN_T_INT32, "remote_port",
 577                             scp->sc_remote->sr_rport,
 578                             BUNYAN_T_INT32, "version", nvers,
 579                             BUNYAN_T_INT32, "operation", nop,
 580                             BUNYAN_T_INT32, "response_id", resp->svp_id,
 581                             BUNYAN_T_INT32, "response_size", nsize,
 582                             BUNYAN_T_INT32, "expected_size",
 583                             ((svp_log_req_t *)sqp->sq_rdata)->svlr_count,
 584                             BUNYAN_T_INT32, "query_state", sqp->sq_state,
 585                             BUNYAN_T_END);
 586                         return (B_FALSE);
 587                 }
 588         }
 589 
 590         sqp->sq_size = nsize;
 591         scp->sc_input.sci_query = sqp;
 592         if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK ||
 593             nop == SVP_R_LOG_RM_ACK) {
 594                 sqp->sq_wdata = &sqp->sq_wdun;
 595                 sqp->sq_wsize = sizeof (svp_query_data_t);
 596         } else {
 597                 VERIFY(nop == SVP_R_LOG_ACK);
 598                 assert(sqp->sq_wdata != NULL);
 599                 assert(sqp->sq_wsize != 0);
 600         }
 601 
 602         return (B_TRUE);
 603 }
 604 
 605 static svp_conn_act_t
 606 svp_conn_pollin(svp_conn_t *scp)
 607 {
 608         size_t off, total;
 609         ssize_t ret;
 610         svp_query_t *sqp;
 611         uint32_t crc;
 612         uint16_t nop;
 613 
 614         assert(MUTEX_HELD(&scp->sc_lock));
 615 
 616         /*
 617          * No query implies that we're reading in the header and that the offset
 618          * is associted with it.
 619          */
 620         off = scp->sc_input.sci_offset;
 621         sqp = scp->sc_input.sci_query;
 622         if (scp->sc_input.sci_query == NULL) {
 623                 svp_req_t *resp = &scp->sc_input.sci_req;
 624 
 625                 assert(off < sizeof (svp_req_t));
 626 
 627                 do {
 628                         ret = read(scp->sc_socket,
 629                             (void *)((uintptr_t)resp + off),
 630                             sizeof (svp_req_t) - off);
 631                 } while (ret == -1 && errno == EINTR);
 632                 if (ret == -1) {
 633                         switch (errno) {
 634                         case EAGAIN:
 635                                 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 636                                 return (SVP_RA_NONE);
 637                         case EIO:
 638                         case ECONNRESET:
 639                                 return (SVP_RA_ERROR);
 640                                 break;
 641                         default:
 642                                 libvarpd_panic("unexpeted read errno: %d",
 643                                     errno);
 644                         }
 645                 } else if (ret == 0) {
 646                         /* Try to reconnect to the remote host */
 647                         return (SVP_RA_ERROR);
 648                 }
 649 
 650                 /* Didn't get all the data we need */
 651                 if (off + ret < sizeof (svp_req_t)) {
 652                         scp->sc_input.sci_offset += ret;
 653                         scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 654                         return (SVP_RA_NONE);
 655                 }
 656 
 657                 if (svp_conn_pollin_validate(scp) != B_TRUE)
 658                         return (SVP_RA_ERROR);
 659         }
 660 
 661         sqp = scp->sc_input.sci_query;
 662         assert(sqp != NULL);
 663         sqp->sq_acttime = gethrtime();
 664         total = ntohl(scp->sc_input.sci_req.svp_size);
 665         do {
 666                 ret = read(scp->sc_socket,
 667                     (void *)((uintptr_t)sqp->sq_wdata + off),
 668                     total - off);
 669         } while (ret == -1 && errno == EINTR);
 670 
 671         if (ret == -1) {
 672                 switch (errno) {
 673                 case EAGAIN:
 674                         scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 675                         return (SVP_RA_NONE);
 676                 case EIO:
 677                 case ECONNRESET:
 678                         return (SVP_RA_ERROR);
 679                         break;
 680                 default:
 681                         libvarpd_panic("unexpeted read errno: %d", errno);
 682                 }
 683         } else if (ret == 0) {
 684                 /* Try to reconnect to the remote host */
 685                 return (SVP_RA_ERROR);
 686         }
 687 
 688         if (ret + off < total) {
 689                 scp->sc_input.sci_offset += ret;
 690                 return (SVP_RA_NONE);
 691         }
 692 
 693         nop = ntohs(scp->sc_input.sci_req.svp_op);
 694         crc = scp->sc_input.sci_req.svp_crc32;
 695         svp_query_crc32(&scp->sc_input.sci_req, sqp->sq_wdata, total);
 696         if (crc != scp->sc_input.sci_req.svp_crc32) {
 697                 (void) bunyan_info(svp_bunyan, "crc32 mismatch",
 698                     BUNYAN_T_IP, "remote ip", &scp->sc_addr,
 699                     BUNYAN_T_INT32, "remote port", scp->sc_remote->sr_rport,
 700                     BUNYAN_T_INT32, "version",
 701                     ntohs(scp->sc_input.sci_req.svp_ver),
 702                     BUNYAN_T_INT32, "operation", nop,
 703                     BUNYAN_T_INT32, "response id",
 704                     ntohl(scp->sc_input.sci_req.svp_id),
 705                     BUNYAN_T_INT32, "query state", sqp->sq_state,
 706                     BUNYAN_T_UINT32, "msg_crc", ntohl(crc),
 707                     BUNYAN_T_UINT32, "calc_crc",
 708                     ntohl(scp->sc_input.sci_req.svp_crc32),
 709                     BUNYAN_T_END);
 710                 return (SVP_RA_ERROR);
 711         }
 712         scp->sc_input.sci_query = NULL;
 713         scp->sc_input.sci_offset = 0;
 714 
 715         if (nop == SVP_R_VL2_ACK) {
 716                 svp_vl2_ack_t *sl2a = sqp->sq_wdata;
 717                 sqp->sq_status = ntohl(sl2a->sl2a_status);
 718         } else if (nop == SVP_R_VL3_ACK) {
 719                 svp_vl3_ack_t *sl3a = sqp->sq_wdata;
 720                 sqp->sq_status = ntohl(sl3a->sl3a_status);
 721         } else if (nop == SVP_R_LOG_ACK) {
 722                 svp_log_ack_t *svla = sqp->sq_wdata;
 723                 sqp->sq_status = ntohl(svla->svla_status);
 724         } else if (nop == SVP_R_LOG_RM_ACK) {
 725                 svp_lrm_ack_t *svra = sqp->sq_wdata;
 726                 sqp->sq_status = ntohl(svra->svra_status);
 727         } else {
 728                 libvarpd_panic("unhandled nop: %d", nop);
 729         }
 730 
 731         list_remove(&scp->sc_queries, sqp);
 732         mutex_exit(&scp->sc_lock);
 733 
 734         /*
 735          * We have to release all of our resources associated with this entry
 736          * before we call the callback. After we call it, the memory will be
 737          * lost to time.
 738          */
 739         svp_query_release(sqp);
 740         sqp->sq_func(sqp, sqp->sq_arg);
 741         mutex_enter(&scp->sc_lock);
 742         scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 743 
 744         return (SVP_RA_NONE);
 745 }
 746 
 747 static svp_conn_act_t
 748 svp_conn_reset(svp_conn_t *scp)
 749 {
 750         svp_remote_t *srp = scp->sc_remote;
 751 
 752         assert(MUTEX_HELD(&srp->sr_lock));
 753         assert(MUTEX_HELD(&scp->sc_lock));
 754 
 755         assert(svp_event_dissociate(&scp->sc_event, scp->sc_socket) ==
 756             ENOENT);
 757         if (close(scp->sc_socket) != 0)
 758                 libvarpd_panic("failed to close socket %d: %d", scp->sc_socket,
 759                     errno);
 760         scp->sc_flags &= ~SVP_CF_TEARDOWN;
 761         scp->sc_socket = -1;
 762         scp->sc_cstate = SVP_CS_INITIAL;
 763         scp->sc_input.sci_query = NULL;
 764         scp->sc_output.sco_query = NULL;
 765 
 766         svp_remote_reassign(srp, scp);
 767 
 768         return (svp_conn_connect(scp));
 769 }
 770 
 771 /*
 772  * This is our general state transition function. We're called here when we want
 773  * to advance part of our state machine as well as to re-arm ourselves. We can
 774  * also end up here from the standard event loop as a result of having a user
 775  * event posted.
 776  */
 777 static void
 778 svp_conn_handler(port_event_t *pe, void *arg)
 779 {
 780         svp_conn_t *scp = arg;
 781         svp_remote_t *srp = scp->sc_remote;
 782         svp_conn_act_t ret = SVP_RA_NONE;
 783         svp_conn_state_t oldstate;
 784 
 785         mutex_enter(&scp->sc_lock);
 786 
 787         /*
 788          * Check if one of our event interrupts is set. An event interrupt, such
 789          * as having to be reaped or be torndown is notified by a
 790          * PORT_SOURCE_USER event that tries to take care of this. However,
 791          * because of the fact that the event loop can be ongoing despite this,
 792          * we may get here before the PORT_SOURCE_USER has casued us to get
 793          * here. In such a case, if the PORT_SOURCE_USER event is tagged, then
 794          * we're going to opt to do nothing here and wait for it to come and
 795          * tear us down. That will also indicate to us that we have nothing to
 796          * worry about as far as general timing and the like goes.
 797          */
 798         if ((scp->sc_flags & SVP_CF_UFLAG) != 0 &&
 799             (scp->sc_flags & SVP_CF_USER) != 0 &&
 800             pe != NULL &&
 801             pe->portev_source != PORT_SOURCE_USER) {
 802                 mutex_exit(&scp->sc_lock);
 803                 return;
 804         }
 805 
 806         if (pe != NULL && pe->portev_source == PORT_SOURCE_USER) {
 807                 scp->sc_flags &= ~SVP_CF_USER;
 808                 if ((scp->sc_flags & SVP_CF_UFLAG) == 0) {
 809                         mutex_exit(&scp->sc_lock);
 810                         return;
 811                 }
 812         }
 813 
 814         /* Check if this needs to be freed */
 815         if (scp->sc_flags & SVP_CF_REAP) {
 816                 mutex_exit(&scp->sc_lock);
 817                 svp_conn_destroy(scp);
 818                 return;
 819         }
 820 
 821         /* Check if this needs to be reset */
 822         if (scp->sc_flags & SVP_CF_TEARDOWN) {
 823                 /* Make sure any other users of this are disassociated */
 824                 ret = SVP_RA_ERROR;
 825                 goto out;
 826         }
 827 
 828         switch (scp->sc_cstate) {
 829         case SVP_CS_INITIAL:
 830         case SVP_CS_BACKOFF:
 831                 assert(pe == NULL);
 832                 ret = svp_conn_connect(scp);
 833                 break;
 834         case SVP_CS_CONNECTING:
 835                 assert(pe != NULL);
 836                 ret = svp_conn_poll_connect(pe, scp);
 837                 break;
 838         case SVP_CS_ACTIVE:
 839         case SVP_CS_WINDDOWN:
 840                 assert(pe != NULL);
 841                 oldstate = scp->sc_cstate;
 842                 if (pe->portev_events & POLLOUT)
 843                         ret = svp_conn_pollout(scp);
 844                 if (ret == SVP_RA_NONE && (pe->portev_events & POLLIN))
 845                         ret = svp_conn_pollin(scp);
 846 
 847                 if (oldstate == SVP_CS_WINDDOWN &&
 848                     (list_is_empty(&scp->sc_queries) || ret != SVP_RA_NONE)) {
 849                         ret = SVP_RA_CLEANUP;
 850                 }
 851 
 852                 if (ret == SVP_RA_NONE) {
 853                         int err;
 854                         if ((err = svp_event_associate(&scp->sc_event,
 855                             scp->sc_socket)) != 0) {
 856                                 scp->sc_error = SVP_CE_ASSOCIATE;
 857                                 scp->sc_errno = err;
 858                                 scp->sc_cstate = SVP_CS_ERROR;
 859                                 ret = SVP_RA_DEGRADE;
 860                         }
 861                 }
 862                 break;
 863         default:
 864                 libvarpd_panic("svp_conn_handler encountered unexpected "
 865                     "state: %d", scp->sc_cstate);
 866         }
 867 out:
 868         mutex_exit(&scp->sc_lock);
 869 
 870         if (ret == SVP_RA_NONE)
 871                 return;
 872 
 873         mutex_enter(&srp->sr_lock);
 874         mutex_enter(&scp->sc_lock);
 875         if (ret == SVP_RA_ERROR)
 876                 ret = svp_conn_reset(scp);
 877 
 878         if (ret == SVP_RA_DEGRADE)
 879                 svp_conn_degrade(scp);
 880         if (ret == SVP_RA_RESTORE)
 881                 svp_conn_restore(scp);
 882 
 883         if (ret == SVP_RA_CLEANUP) {
 884                 svp_conn_remove(scp);
 885                 scp->sc_flags |= SVP_CF_REAP;
 886                 svp_conn_inject(scp);
 887         }
 888         mutex_exit(&scp->sc_lock);
 889         mutex_exit(&srp->sr_lock);
 890 }
 891 
 892 static void
 893 svp_conn_backtimer(void *arg)
 894 {
 895         svp_conn_t *scp = arg;
 896 
 897         svp_conn_handler(NULL, scp);
 898 }
 899 
 900 /*
 901  * This fires every svp_conn_query_timeout seconds. Its purpos is to determine
 902  * if we haven't heard back on a request with in svp_conn_query_timeout seconds.
 903  * If any of the svp_conn_query_t's that have been started (indicated by
 904  * svp_query_t`sq_acttime != -1), and more than svp_conn_query_timeout seconds
 905  * have passed, we basically tear this connection down and reassign outstanding
 906  * queries.
 907  */
 908 static void
 909 svp_conn_querytimer(void *arg)
 910 {
 911         int ret;
 912         svp_query_t *sqp;
 913         svp_conn_t *scp = arg;
 914         hrtime_t now = gethrtime();
 915 
 916         mutex_enter(&scp->sc_lock);
 917 
 918         /*
 919          * If we're not in the active state, then we don't care about this as
 920          * we're already either going to die or we have no connections to worry
 921          * about.
 922          */
 923         if (scp->sc_cstate != SVP_CS_ACTIVE) {
 924                 mutex_exit(&scp->sc_lock);
 925                 return;
 926         }
 927 
 928         for (sqp = list_head(&scp->sc_queries); sqp != NULL;
 929             sqp = list_next(&scp->sc_queries, sqp)) {
 930                 if (sqp->sq_acttime == -1)
 931                         continue;
 932                 if ((now - sqp->sq_acttime) / NANOSEC > svp_conn_query_timeout)
 933                         break;
 934         }
 935 
 936         /* Nothing timed out, we're good here */
 937         if (sqp == NULL) {
 938                 mutex_exit(&scp->sc_lock);
 939                 return;
 940         }
 941 
 942         (void) bunyan_warn(svp_bunyan, "query timed out on connection",
 943             BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 944             BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 945             BUNYAN_T_INT32, "operation", ntohs(sqp->sq_header.svp_op),
 946             BUNYAN_T_END);
 947 
 948         /*
 949          * Begin the tear down process for this connect. If we lose the
 950          * disassociate, then we don't inject an event. See the big theory
 951          * statement in libvarpd_svp.c for more information.
 952          */
 953         scp->sc_flags |= SVP_CF_TEARDOWN;
 954 
 955         ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket);
 956         if (ret == 0)
 957                 svp_conn_inject(scp);
 958         else
 959                 VERIFY(ret == ENOENT);
 960 
 961         mutex_exit(&scp->sc_lock);
 962 }
 963 
 964 /*
 965  * This connection has fallen out of DNS, figure out what we need to do with it.
 966  */
 967 void
 968 svp_conn_fallout(svp_conn_t *scp)
 969 {
 970         svp_remote_t *srp = scp->sc_remote;
 971 
 972         assert(MUTEX_HELD(&srp->sr_lock));
 973 
 974         mutex_enter(&scp->sc_lock);
 975         switch (scp->sc_cstate) {
 976         case SVP_CS_ERROR:
 977                 /*
 978                  * Connection is already inactive, so it's safe to tear down.
 979                  * Fire it off through the state machine to tear down via the
 980                  * backoff timer.
 981                  */
 982                 svp_conn_remove(scp);
 983                 scp->sc_flags |= SVP_CF_REAP;
 984                 svp_conn_inject(scp);
 985                 break;
 986         case SVP_CS_INITIAL:
 987         case SVP_CS_BACKOFF:
 988         case SVP_CS_CONNECTING:
 989                 /*
 990                  * Here, we have something actively going on, so we'll let it be
 991                  * clean up the next time we hit the event loop by the event
 992                  * loop itself. As it has no connections, there isn't much to
 993                  * really do, though we'll take this chance to go ahead and
 994                  * remove it from the remote.
 995                  */
 996                 svp_conn_remove(scp);
 997                 scp->sc_flags |= SVP_CF_REAP;
 998                 svp_conn_inject(scp);
 999                 break;
1000         case SVP_CS_ACTIVE:
1001         case SVP_CS_WINDDOWN:
1002                 /*
1003                  * If there are no outstanding queries, then we should simply
1004                  * clean this up now,t he same way we would with the others.
1005                  * Othewrise, as we know the event loop is ongoing, we'll make
1006                  * sure that these entries get cleaned up once they're done.
1007                  */
1008                 scp->sc_cstate = SVP_CS_WINDDOWN;
1009                 if (list_is_empty(&scp->sc_queries)) {
1010                         svp_conn_remove(scp);
1011                         scp->sc_flags |= SVP_CF_REAP;
1012                         svp_conn_inject(scp);
1013                 }
1014                 break;
1015         default:
1016                 libvarpd_panic("svp_conn_fallout encountered"
1017                     "unkonwn state");
1018         }
1019         mutex_exit(&scp->sc_lock);
1020         mutex_exit(&srp->sr_lock);
1021 }
1022 
1023 int
1024 svp_conn_create(svp_remote_t *srp, const struct in6_addr *addr)
1025 {
1026         int ret;
1027         svp_conn_t *scp;
1028 
1029         assert(MUTEX_HELD(&srp->sr_lock));
1030         scp = umem_zalloc(sizeof (svp_conn_t), UMEM_DEFAULT);
1031         if (scp == NULL)
1032                 return (ENOMEM);
1033 
1034         if ((ret = mutex_init(&scp->sc_lock, USYNC_THREAD | LOCK_ERRORCHECK,
1035             NULL)) != 0) {
1036                 umem_free(scp, sizeof (svp_conn_t));
1037                 return (ret);
1038         }
1039 
1040         scp->sc_remote = srp;
1041         scp->sc_event.se_func = svp_conn_handler;
1042         scp->sc_event.se_arg = scp;
1043         scp->sc_btimer.st_func = svp_conn_backtimer;
1044         scp->sc_btimer.st_arg = scp;
1045         scp->sc_btimer.st_oneshot = B_TRUE;
1046         scp->sc_btimer.st_value = 1;
1047 
1048         scp->sc_qtimer.st_func = svp_conn_querytimer;
1049         scp->sc_qtimer.st_arg = scp;
1050         scp->sc_qtimer.st_oneshot = B_FALSE;
1051         scp->sc_qtimer.st_value = svp_conn_query_timeout;
1052 
1053         scp->sc_socket = -1;
1054 
1055         list_create(&scp->sc_queries, sizeof (svp_query_t),
1056             offsetof(svp_query_t, sq_lnode));
1057         scp->sc_gen = srp->sr_gen;
1058         bcopy(addr, &scp->sc_addr, sizeof (struct in6_addr));
1059         scp->sc_cstate = SVP_CS_INITIAL;
1060         mutex_enter(&scp->sc_lock);
1061         svp_conn_add(scp);
1062         mutex_exit(&scp->sc_lock);
1063 
1064         /* Now that we're locked and loaded, add our timers */
1065         svp_timer_add(&scp->sc_qtimer);
1066         svp_timer_add(&scp->sc_btimer);
1067 
1068         return (0);
1069 }
1070 
1071 /*
1072  * At the time of calling, the entry has been removed from all lists. In
1073  * addition, the entries state should be SVP_CS_ERROR, therefore, we know that
1074  * the fd should not be associated with the event loop. We'll double check that
1075  * just in case. We should also have already been removed from the remote's
1076  * list.
1077  */
1078 void
1079 svp_conn_destroy(svp_conn_t *scp)
1080 {
1081         int ret;
1082 
1083         mutex_enter(&scp->sc_lock);
1084         if (scp->sc_cstate != SVP_CS_ERROR)
1085                 libvarpd_panic("asked to tear down an active connection");
1086         if (scp->sc_flags & SVP_CF_ADDED)
1087                 libvarpd_panic("asked to remove a connection still in "
1088                     "the remote list\n");
1089         if (!list_is_empty(&scp->sc_queries))
1090                 libvarpd_panic("asked to remove a connection with non-empty "
1091                     "query list");
1092 
1093         if ((ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket)) !=
1094             ENOENT) {
1095                 libvarpd_panic("dissociate failed or was actually "
1096                     "associated: %d", ret);
1097         }
1098         mutex_exit(&scp->sc_lock);
1099 
1100         /* Verify our timers are killed */
1101         svp_timer_remove(&scp->sc_btimer);
1102         svp_timer_remove(&scp->sc_qtimer);
1103 
1104         if (scp->sc_socket != -1 && close(scp->sc_socket) != 0)
1105                 libvarpd_panic("failed to close svp_conn_t`scp_socket fd "
1106                     "%d: %d", scp->sc_socket, errno);
1107 
1108         list_destroy(&scp->sc_queries);
1109         umem_free(scp, sizeof (svp_conn_t));
1110 }
1111 
1112 void
1113 svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp)
1114 {
1115         assert(MUTEX_HELD(&scp->sc_lock));
1116         assert(scp->sc_cstate == SVP_CS_ACTIVE);
1117 
1118         sqp->sq_acttime = -1;
1119         list_insert_tail(&scp->sc_queries, sqp);
1120         if (!(scp->sc_event.se_events & POLLOUT)) {
1121                 scp->sc_event.se_events |= POLLOUT;
1122                 /*
1123                  * If this becomes frequent, we should instead give up on this
1124                  * set of connections instead of aborting.
1125                  */
1126                 if (svp_event_associate(&scp->sc_event, scp->sc_socket) != 0)
1127                         libvarpd_panic("svp_event_associate failed somehow");
1128         }
1129 }