1 /*
   2  * This file and its contents are supplied under the terms of the
   3  * Common Development and Distribution License ("CDDL"), version 1.0.
   4  * You may only use this file in accordance with the terms of version
   5  * 1.0 of the CDDL.
   6  *
   7  * A full copy of the text of the CDDL should have accompanied this
   8  * source.  A copy of the CDDL is also available via the Internet at
   9  * http://www.illumos.org/license/CDDL.
  10  */
  11 
  12 /*
  13  * Copyright 2015 Joyent, Inc.
  14  */
  15 
  16 /*
  17  * Logic to manage an individual connection to a remote host.
  18  *
  19  * For more information, see the big theory statement in
  20  * lib/varpd/svp/common/libvarpd_svp.c.
  21  */
  22 
  23 #include <assert.h>
  24 #include <umem.h>
  25 #include <errno.h>
  26 #include <strings.h>
  27 #include <unistd.h>
  28 #include <stddef.h>
  29 #include <sys/uio.h>
  30 #include <sys/debug.h>
  31 
  32 #include <libvarpd_svp.h>
  33 
  34 int svp_conn_query_timeout = 30;
  35 static int svp_conn_backoff_tbl[] = { 1, 2, 4, 8, 16, 32 };
  36 static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int);
  37 
  38 typedef enum svp_conn_act {
  39         SVP_RA_NONE     = 0x00,
  40         SVP_RA_DEGRADE  = 0x01,
  41         SVP_RA_RESTORE  = 0x02,
  42         SVP_RA_ERROR    = 0x03,
  43         SVP_RA_CLEANUP  = 0x04
  44 } svp_conn_act_t;
  45 
  46 static void
  47 svp_conn_inject(svp_conn_t *scp)
  48 {
  49         int ret;
  50         assert(MUTEX_HELD(&scp->sc_lock));
  51 
  52         if (scp->sc_flags & SVP_CF_USER)
  53                 return;
  54         scp->sc_flags |= SVP_CF_USER;
  55         if ((ret = svp_event_inject(&scp->sc_event)) != 0)
  56                 libvarpd_panic("failed to inject event: %d\n", ret);
  57 }
  58 
  59 static void
  60 svp_conn_degrade(svp_conn_t *scp)
  61 {
  62         svp_remote_t *srp = scp->sc_remote;
  63 
  64         assert(MUTEX_HELD(&srp->sr_lock));
  65         assert(MUTEX_HELD(&scp->sc_lock));
  66 
  67         if (scp->sc_flags & SVP_CF_DEGRADED)
  68                 return;
  69 
  70         scp->sc_flags |= SVP_CF_DEGRADED;
  71         srp->sr_ndconns++;
  72         if (srp->sr_ndconns == srp->sr_tconns)
  73                 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
  74 }
  75 
  76 static void
  77 svp_conn_restore(svp_conn_t *scp)
  78 {
  79         svp_remote_t *srp = scp->sc_remote;
  80 
  81         assert(MUTEX_HELD(&srp->sr_lock));
  82         assert(MUTEX_HELD(&scp->sc_lock));
  83 
  84         if (!(scp->sc_flags & SVP_CF_DEGRADED))
  85                 return;
  86 
  87         scp->sc_flags &= ~SVP_CF_DEGRADED;
  88         if (srp->sr_ndconns == srp->sr_tconns)
  89                 svp_remote_restore(srp, SVP_RD_REMOTE_FAIL);
  90         srp->sr_ndconns--;
  91 }
  92 
  93 static void
  94 svp_conn_add(svp_conn_t *scp)
  95 {
  96         svp_remote_t *srp = scp->sc_remote;
  97 
  98         assert(MUTEX_HELD(&srp->sr_lock));
  99         assert(MUTEX_HELD(&scp->sc_lock));
 100 
 101         if (scp->sc_flags & SVP_CF_ADDED)
 102                 return;
 103 
 104         list_insert_tail(&srp->sr_conns, scp);
 105         scp->sc_flags |= SVP_CF_ADDED;
 106         srp->sr_tconns++;
 107 }
 108 
 109 static void
 110 svp_conn_remove(svp_conn_t *scp)
 111 {
 112         svp_remote_t *srp = scp->sc_remote;
 113 
 114         assert(MUTEX_HELD(&srp->sr_lock));
 115         assert(MUTEX_HELD(&scp->sc_lock));
 116 
 117         if (!(scp->sc_flags & SVP_CF_ADDED))
 118                 return;
 119 
 120         scp->sc_flags &= ~SVP_CF_ADDED;
 121         if (scp->sc_flags & SVP_CF_DEGRADED)
 122                 srp->sr_ndconns--;
 123         srp->sr_tconns--;
 124         if (srp->sr_tconns == srp->sr_ndconns)
 125                 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
 126 }
 127 
 128 static svp_query_t *
 129 svp_conn_query_find(svp_conn_t *scp, uint32_t id)
 130 {
 131         svp_query_t *sqp;
 132 
 133         assert(MUTEX_HELD(&scp->sc_lock));
 134 
 135         for (sqp = list_head(&scp->sc_queries); sqp != NULL;
 136             sqp = list_next(&scp->sc_queries, sqp)) {
 137                 if (sqp->sq_header.svp_id == id)
 138                         break;
 139         }
 140 
 141         return (sqp);
 142 }
 143 
 144 static svp_conn_act_t
 145 svp_conn_backoff(svp_conn_t *scp)
 146 {
 147         assert(MUTEX_HELD(&scp->sc_lock));
 148 
 149         if (close(scp->sc_socket) != 0)
 150                 libvarpd_panic("failed to close socket %d: %d\n",
 151                     scp->sc_socket, errno);
 152         scp->sc_socket = -1;
 153 
 154         scp->sc_cstate = SVP_CS_BACKOFF;
 155         scp->sc_nbackoff++;
 156         if (scp->sc_nbackoff >= svp_conn_nbackoff) {
 157                 scp->sc_btimer.st_value =
 158                     svp_conn_backoff_tbl[svp_conn_nbackoff - 1];
 159         } else {
 160                 scp->sc_btimer.st_value =
 161                     svp_conn_backoff_tbl[scp->sc_nbackoff - 1];
 162         }
 163         svp_timer_add(&scp->sc_btimer);
 164 
 165         if (scp->sc_nbackoff > svp_conn_nbackoff)
 166                 return (SVP_RA_DEGRADE);
 167         return (SVP_RA_NONE);
 168 }
 169 
 170 static svp_conn_act_t
 171 svp_conn_connect(svp_conn_t *scp)
 172 {
 173         int ret;
 174         struct sockaddr_in6 in6;
 175 
 176         assert(MUTEX_HELD(&scp->sc_lock));
 177         assert(scp->sc_cstate == SVP_CS_BACKOFF ||
 178             scp->sc_cstate == SVP_CS_INITIAL);
 179         assert(scp->sc_socket == -1);
 180         if (scp->sc_cstate == SVP_CS_INITIAL)
 181                 scp->sc_nbackoff = 0;
 182 
 183         scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
 184         if (scp->sc_socket == -1) {
 185                 scp->sc_error = SVP_CE_SOCKET;
 186                 scp->sc_errno = errno;
 187                 scp->sc_cstate = SVP_CS_ERROR;
 188                 return (SVP_RA_DEGRADE);
 189         }
 190 
 191         bzero(&in6, sizeof (struct sockaddr_in6));
 192         in6.sin6_family = AF_INET6;
 193         in6.sin6_port = htons(scp->sc_remote->sr_rport);
 194         bcopy(&scp->sc_addr, &in6.sin6_addr,  sizeof (struct in6_addr));
 195         ret = connect(scp->sc_socket, (struct sockaddr *)&in6,
 196             sizeof (struct sockaddr_in6));
 197         if (ret != 0) {
 198                 boolean_t async = B_FALSE;
 199 
 200                 switch (errno) {
 201                 case EACCES:
 202                 case EADDRINUSE:
 203                 case EAFNOSUPPORT:
 204                 case EALREADY:
 205                 case EBADF:
 206                 case EISCONN:
 207                 case ELOOP:
 208                 case ENOENT:
 209                 case ENOSR:
 210                 case EWOULDBLOCK:
 211                         libvarpd_panic("unanticipated connect errno %d", errno);
 212                         break;
 213                 case EINPROGRESS:
 214                 case EINTR:
 215                         async = B_TRUE;
 216                 default:
 217                         break;
 218                 }
 219 
 220                 /*
 221                  * So, we will be connecting to this in the future, advance our
 222                  * state and make sure that we poll for the next round.
 223                  */
 224                 if (async == B_TRUE) {
 225                         scp->sc_cstate = SVP_CS_CONNECTING;
 226                         scp->sc_event.se_events = POLLOUT | POLLHUP;
 227                         ret = svp_event_associate(&scp->sc_event,
 228                             scp->sc_socket);
 229                         if (ret == 0)
 230                                 return (SVP_RA_NONE);
 231                         scp->sc_error = SVP_CE_ASSOCIATE;
 232                         scp->sc_errno = ret;
 233                         scp->sc_cstate = SVP_CS_ERROR;
 234                         return (SVP_RA_DEGRADE);
 235                 } else {
 236                         /*
 237                          * This call failed, which means that we obtained one of
 238                          * the following:
 239                          *
 240                          * EADDRNOTAVAIL
 241                          * ECONNREFUSED
 242                          * EIO
 243                          * ENETUNREACH
 244                          * EHOSTUNREACH
 245                          * ENXIO
 246                          * ETIMEDOUT
 247                          *
 248                          * Therefore we need to set ourselves into backoff and
 249                          * wait for that to clear up.
 250                          */
 251                         return (svp_conn_backoff(scp));
 252                 }
 253         }
 254 
 255         /*
 256          * We've connected. Successfully move ourselves to the bound
 257          * state and start polling.
 258          */
 259         scp->sc_cstate = SVP_CS_ACTIVE;
 260         scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
 261         ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
 262         if (ret == 0)
 263                 return (SVP_RA_RESTORE);
 264         scp->sc_error = SVP_CE_ASSOCIATE;
 265         scp->sc_cstate = SVP_CS_ERROR;
 266 
 267         return (SVP_RA_DEGRADE);
 268 }
 269 
 270 /*
 271  * This should be the first call we get after a connect. If we have successfully
 272  * connected, we should see a writeable event. We may also see an error or a
 273  * hang up. In either of these cases, we transition to error mode. If there is
 274  * also a readable event, we ignore it at the moment and just let a
 275  * reassociation pick it up so we can simplify the set of state transitions that
 276  * we have.
 277  */
 278 static svp_conn_act_t
 279 svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
 280 {
 281         int ret, err;
 282         socklen_t sl = sizeof (err);
 283         if (!(pe->portev_events & POLLOUT)) {
 284                 scp->sc_errno = 0;
 285                 scp->sc_error = SVP_CE_NOPOLLOUT;
 286                 scp->sc_cstate = SVP_CS_ERROR;
 287                 return (SVP_RA_DEGRADE);
 288         }
 289 
 290         ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err, &sl);
 291         if (ret != 0)
 292                 libvarpd_panic("unanticipated getsockopt error");
 293         if (err != 0) {
 294                 return (svp_conn_backoff(scp));
 295         }
 296 
 297         scp->sc_cstate = SVP_CS_ACTIVE;
 298         scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
 299         ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
 300         if (ret == 0)
 301                 return (SVP_RA_RESTORE);
 302         scp->sc_error = SVP_CE_ASSOCIATE;
 303         scp->sc_errno = ret;
 304         scp->sc_cstate = SVP_CS_ERROR;
 305         return (SVP_RA_DEGRADE);
 306 }
 307 
 308 static svp_conn_act_t
 309 svp_conn_pollout(svp_conn_t *scp)
 310 {
 311         svp_query_t *sqp;
 312         svp_req_t *req;
 313         size_t off;
 314         struct iovec iov[2];
 315         int nvecs = 0;
 316         ssize_t ret;
 317 
 318         assert(MUTEX_HELD(&scp->sc_lock));
 319 
 320         /*
 321          * We need to find a query and start writing it out.
 322          */
 323         if (scp->sc_output.sco_query == NULL) {
 324                 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
 325                     sqp = list_next(&scp->sc_queries, sqp)) {
 326                         if (sqp->sq_state != SVP_QUERY_INIT)
 327                                 continue;
 328                         break;
 329                 }
 330 
 331                 if (sqp == NULL) {
 332                         scp->sc_event.se_events &= ~POLLOUT;
 333                         return (SVP_RA_NONE);
 334                 }
 335 
 336                 scp->sc_output.sco_query = sqp;
 337                 scp->sc_output.sco_offset = 0;
 338                 sqp->sq_state = SVP_QUERY_WRITING;
 339                 svp_query_crc32(&sqp->sq_header, sqp->sq_rdata, sqp->sq_rsize);
 340         }
 341 
 342         sqp = scp->sc_output.sco_query;
 343         req = &sqp->sq_header;
 344         off = scp->sc_output.sco_offset;
 345         if (off < sizeof (svp_req_t)) {
 346                 iov[nvecs].iov_base = (void *)((uintptr_t)req + off);
 347                 iov[nvecs].iov_len = sizeof (svp_req_t) - off;
 348                 nvecs++;
 349                 off = 0;
 350         } else {
 351                 off -= sizeof (svp_req_t);
 352         }
 353 
 354         iov[nvecs].iov_base = (void *)((uintptr_t)sqp->sq_rdata + off);
 355         iov[nvecs].iov_len = sqp->sq_rsize - off;
 356         nvecs++;
 357 
 358         do {
 359                 ret = writev(scp->sc_socket, iov, nvecs);
 360         } while (ret == -1 && errno == EAGAIN);
 361         if (ret == -1) {
 362                 switch (errno) {
 363                 case EAGAIN:
 364                         scp->sc_event.se_events |= POLLOUT;
 365                         return (SVP_RA_NONE);
 366                 case EIO:
 367                 case ENXIO:
 368                 case ECONNRESET:
 369                         return (SVP_RA_ERROR);
 370                 default:
 371                         libvarpd_panic("unexpected errno: %d", errno);
 372                 }
 373         }
 374 
 375         sqp->sq_acttime = gethrtime();
 376         scp->sc_output.sco_offset += ret;
 377         if (ret >= sizeof (svp_req_t) + sqp->sq_rsize) {
 378                 sqp->sq_state = SVP_QUERY_READING;
 379                 scp->sc_output.sco_query = NULL;
 380                 scp->sc_output.sco_offset = 0;
 381                 scp->sc_event.se_events |= POLLOUT;
 382         }
 383         return (SVP_RA_NONE);
 384 }
 385 
 386 static boolean_t
 387 svp_conn_pollin_validate(svp_conn_t *scp)
 388 {
 389         svp_query_t *sqp;
 390         uint32_t nsize;
 391         uint16_t nvers, nop;
 392         svp_req_t *resp = &scp->sc_input.sci_req;
 393 
 394         assert(MUTEX_HELD(&scp->sc_lock));
 395 
 396         nvers = ntohs(resp->svp_ver);
 397         nop = ntohs(resp->svp_op);
 398         nsize = ntohl(resp->svp_size);
 399 
 400         if (nvers != SVP_CURRENT_VERSION) {
 401                 (void) bunyan_warn(svp_bunyan, "unsupported version",
 402                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 403                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 404                     BUNYAN_T_INT32, "version", nvers,
 405                     BUNYAN_T_INT32, "operation", nop,
 406                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 407                     BUNYAN_T_END);
 408                 return (B_FALSE);
 409         }
 410 
 411         if (nop != SVP_R_VL2_ACK && nop != SVP_R_VL3_ACK &&
 412             nop != SVP_R_LOG_ACK && nop != SVP_R_LOG_RM_ACK) {
 413                 (void) bunyan_warn(svp_bunyan, "unsupported operation",
 414                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 415                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 416                     BUNYAN_T_INT32, "version", nvers,
 417                     BUNYAN_T_INT32, "operation", nop,
 418                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 419                     BUNYAN_T_END);
 420                 return (B_FALSE);
 421         }
 422 
 423         sqp = svp_conn_query_find(scp, resp->svp_id);
 424         if (sqp == NULL) {
 425                 (void) bunyan_warn(svp_bunyan, "unknown response id",
 426                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 427                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 428                     BUNYAN_T_INT32, "version", nvers,
 429                     BUNYAN_T_INT32, "operation", nop,
 430                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 431                     BUNYAN_T_END);
 432                 return (B_FALSE);
 433         }
 434 
 435         if (sqp->sq_state != SVP_QUERY_READING) {
 436                 (void) bunyan_warn(svp_bunyan,
 437                     "got response for unexpecting query",
 438                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 439                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 440                     BUNYAN_T_INT32, "version", nvers,
 441                     BUNYAN_T_INT32, "operation", nop,
 442                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 443                     BUNYAN_T_INT32, "query_state", sqp->sq_state,
 444                     BUNYAN_T_END);
 445                 return (B_FALSE);
 446         }
 447 
 448         if ((nop == SVP_R_VL2_ACK && nsize != sizeof (svp_vl2_ack_t)) ||
 449             (nop == SVP_R_VL3_ACK && nsize != sizeof (svp_vl3_ack_t)) ||
 450             (nop == SVP_R_LOG_RM_ACK && nsize != sizeof (svp_lrm_ack_t))) {
 451                 (void) bunyan_warn(svp_bunyan, "response size too large",
 452                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 453                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 454                     BUNYAN_T_INT32, "version", nvers,
 455                     BUNYAN_T_INT32, "operation", nop,
 456                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 457                     BUNYAN_T_INT32, "response_size", nsize,
 458                     BUNYAN_T_INT32, "expected_size", nop == SVP_R_VL2_ACK ?
 459                     sizeof (svp_vl2_ack_t) : sizeof (svp_vl3_ack_t),
 460                     BUNYAN_T_INT32, "query_state", sqp->sq_state,
 461                     BUNYAN_T_END);
 462                 return (B_FALSE);
 463         }
 464 
 465         /*
 466          * The valid size is anything <= to what the user requested, but at
 467          * least svp_log_ack_t bytes large.
 468          */
 469         if (nop == SVP_R_LOG_ACK) {
 470                 const char *msg = NULL;
 471                 if (nsize < sizeof (svp_log_ack_t))
 472                         msg = "response size too small";
 473                 else if (nsize > ((svp_log_req_t *)sqp->sq_rdata)->svlr_count)
 474                         msg = "response size too large";
 475                 if (msg != NULL) {
 476                         (void) bunyan_warn(svp_bunyan, msg,
 477                             BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 478                             BUNYAN_T_INT32, "remote_port",
 479                             scp->sc_remote->sr_rport,
 480                             BUNYAN_T_INT32, "version", nvers,
 481                             BUNYAN_T_INT32, "operation", nop,
 482                             BUNYAN_T_INT32, "response_id", resp->svp_id,
 483                             BUNYAN_T_INT32, "response_size", nsize,
 484                             BUNYAN_T_INT32, "expected_size",
 485                             ((svp_log_req_t *)sqp->sq_rdata)->svlr_count,
 486                             BUNYAN_T_INT32, "query_state", sqp->sq_state,
 487                             BUNYAN_T_END);
 488                         return (B_FALSE);
 489                 }
 490         }
 491 
 492         sqp->sq_size = nsize;
 493         scp->sc_input.sci_query = sqp;
 494         if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK ||
 495             nop == SVP_R_LOG_RM_ACK) {
 496                 sqp->sq_wdata = &sqp->sq_wdun;
 497                 sqp->sq_wsize = sizeof (svp_query_data_t);
 498         } else {
 499                 VERIFY(nop == SVP_R_LOG_ACK);
 500                 assert(sqp->sq_wdata != NULL);
 501                 assert(sqp->sq_wsize != 0);
 502         }
 503 
 504         return (B_TRUE);
 505 }
 506 
 507 static svp_conn_act_t
 508 svp_conn_pollin(svp_conn_t *scp)
 509 {
 510         size_t off, total;
 511         ssize_t ret;
 512         svp_query_t *sqp;
 513         uint32_t crc;
 514         uint16_t nop;
 515 
 516         assert(MUTEX_HELD(&scp->sc_lock));
 517 
 518         /*
 519          * No query implies that we're reading in the header and that the offset
 520          * is associted with it.
 521          */
 522         off = scp->sc_input.sci_offset;
 523         sqp = scp->sc_input.sci_query;
 524         if (scp->sc_input.sci_query == NULL) {
 525                 svp_req_t *resp = &scp->sc_input.sci_req;
 526 
 527                 assert(off < sizeof (svp_req_t));
 528 
 529                 do {
 530                         ret = read(scp->sc_socket,
 531                             (void *)((uintptr_t)resp + off),
 532                             sizeof (svp_req_t) - off);
 533                 } while (ret == -1 && errno == EINTR);
 534                 if (ret == -1) {
 535                         switch (errno) {
 536                         case EAGAIN:
 537                                 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 538                                 return (SVP_RA_NONE);
 539                         case EIO:
 540                         case ECONNRESET:
 541                                 return (SVP_RA_ERROR);
 542                                 break;
 543                         default:
 544                                 libvarpd_panic("unexpeted read errno: %d",
 545                                     errno);
 546                         }
 547                 } else if (ret == 0) {
 548                         /* Try to reconnect to the remote host */
 549                         return (SVP_RA_ERROR);
 550                 }
 551 
 552                 /* Didn't get all the data we need */
 553                 if (off + ret < sizeof (svp_req_t)) {
 554                         scp->sc_input.sci_offset += ret;
 555                         scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 556                         return (SVP_RA_NONE);
 557                 }
 558 
 559                 if (svp_conn_pollin_validate(scp) != B_TRUE)
 560                         return (SVP_RA_ERROR);
 561         }
 562 
 563         sqp = scp->sc_input.sci_query;
 564         assert(sqp != NULL);
 565         sqp->sq_acttime = gethrtime();
 566         total = ntohl(scp->sc_input.sci_req.svp_size);
 567         do {
 568                 ret = read(scp->sc_socket,
 569                     (void *)((uintptr_t)sqp->sq_wdata + off),
 570                     total - off);
 571         } while (ret == -1 && errno == EINTR);
 572 
 573         if (ret == -1) {
 574                 switch (errno) {
 575                 case EAGAIN:
 576                         scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 577                         return (SVP_RA_NONE);
 578                 case EIO:
 579                 case ECONNRESET:
 580                         return (SVP_RA_ERROR);
 581                         break;
 582                 default:
 583                         libvarpd_panic("unexpeted read errno: %d", errno);
 584                 }
 585         } else if (ret == 0) {
 586                 /* Try to reconnect to the remote host */
 587                 return (SVP_RA_ERROR);
 588         }
 589 
 590         if (ret + off < total) {
 591                 scp->sc_input.sci_offset += ret;
 592                 return (SVP_RA_NONE);
 593         }
 594 
 595         nop = ntohs(scp->sc_input.sci_req.svp_op);
 596         crc = scp->sc_input.sci_req.svp_crc32;
 597         svp_query_crc32(&scp->sc_input.sci_req, sqp->sq_wdata, total);
 598         if (crc != scp->sc_input.sci_req.svp_crc32) {
 599                 (void) bunyan_info(svp_bunyan, "crc32 mismatch",
 600                     BUNYAN_T_IP, "remote ip", &scp->sc_addr,
 601                     BUNYAN_T_INT32, "remote port", scp->sc_remote->sr_rport,
 602                     BUNYAN_T_INT32, "version",
 603                     ntohs(scp->sc_input.sci_req.svp_ver),
 604                     BUNYAN_T_INT32, "operation", nop,
 605                     BUNYAN_T_INT32, "response id",
 606                     ntohl(scp->sc_input.sci_req.svp_id),
 607                     BUNYAN_T_INT32, "query state", sqp->sq_state,
 608                     BUNYAN_T_UINT32, "msg_crc", ntohl(crc),
 609                     BUNYAN_T_UINT32, "calc_crc",
 610                     ntohl(scp->sc_input.sci_req.svp_crc32),
 611                     BUNYAN_T_END);
 612                 return (SVP_RA_ERROR);
 613         }
 614         scp->sc_input.sci_query = NULL;
 615         scp->sc_input.sci_offset = 0;
 616 
 617         if (nop == SVP_R_VL2_ACK) {
 618                 svp_vl2_ack_t *sl2a = sqp->sq_wdata;
 619                 sqp->sq_status = ntohl(sl2a->sl2a_status);
 620         } else if (nop == SVP_R_VL3_ACK) {
 621                 svp_vl3_ack_t *sl3a = sqp->sq_wdata;
 622                 sqp->sq_status = ntohl(sl3a->sl3a_status);
 623         } else if (nop == SVP_R_LOG_ACK) {
 624                 svp_log_ack_t *svla = sqp->sq_wdata;
 625                 sqp->sq_status = ntohl(svla->svla_status);
 626         } else if (nop == SVP_R_LOG_RM_ACK) {
 627                 svp_lrm_ack_t *svra = sqp->sq_wdata;
 628                 sqp->sq_status = ntohl(svra->svra_status);
 629         } else {
 630                 libvarpd_panic("unhandled nop: %d", nop);
 631         }
 632 
 633         list_remove(&scp->sc_queries, sqp);
 634         mutex_exit(&scp->sc_lock);
 635 
 636         /*
 637          * We have to release all of our resources associated with this entry
 638          * before we call the callback. After we call it, the memory will be
 639          * lost to time.
 640          */
 641         svp_query_release(sqp);
 642         sqp->sq_func(sqp, sqp->sq_arg);
 643         mutex_enter(&scp->sc_lock);
 644         scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 645 
 646         return (SVP_RA_NONE);
 647 }
 648 
 649 static svp_conn_act_t
 650 svp_conn_reset(svp_conn_t *scp)
 651 {
 652         svp_remote_t *srp = scp->sc_remote;
 653 
 654         assert(MUTEX_HELD(&srp->sr_lock));
 655         assert(MUTEX_HELD(&scp->sc_lock));
 656 
 657         assert(svp_event_dissociate(&scp->sc_event, scp->sc_socket) ==
 658             ENOENT);
 659         if (close(scp->sc_socket) != 0)
 660                 libvarpd_panic("failed to close socket %d: %d", scp->sc_socket,
 661                     errno);
 662         scp->sc_flags &= ~SVP_CF_TEARDOWN;
 663         scp->sc_socket = -1;
 664         scp->sc_cstate = SVP_CS_INITIAL;
 665         scp->sc_input.sci_query = NULL;
 666         scp->sc_output.sco_query = NULL;
 667 
 668         svp_remote_reassign(srp, scp);
 669 
 670         return (svp_conn_connect(scp));
 671 }
 672 
 673 /*
 674  * This is our general state transition function. We're called here when we want
 675  * to advance part of our state machine as well as to re-arm ourselves. We can
 676  * also end up here from the standard event loop as a result of having a user
 677  * event posted.
 678  */
 679 static void
 680 svp_conn_handler(port_event_t *pe, void *arg)
 681 {
 682         svp_conn_t *scp = arg;
 683         svp_remote_t *srp = scp->sc_remote;
 684         svp_conn_act_t ret = SVP_RA_NONE;
 685         svp_conn_state_t oldstate;
 686 
 687         mutex_enter(&scp->sc_lock);
 688 
 689         /*
 690          * Check if one of our event interrupts is set. An event interrupt, such
 691          * as having to be reaped or be torndown is notified by a
 692          * PORT_SOURCE_USER event that tries to take care of this. However,
 693          * because of the fact that the event loop can be ongoing despite this,
 694          * we may get here before the PORT_SOURCE_USER has casued us to get
 695          * here. In such a case, if the PORT_SOURCE_USER event is tagged, then
 696          * we're going to opt to do nothing here and wait for it to come and
 697          * tear us down. That will also indicate to us that we have nothing to
 698          * worry about as far as general timing and the like goes.
 699          */
 700         if ((scp->sc_flags & SVP_CF_UFLAG) != 0 &&
 701             (scp->sc_flags & SVP_CF_USER) != 0 &&
 702             pe != NULL &&
 703             pe->portev_source != PORT_SOURCE_USER) {
 704                 mutex_exit(&scp->sc_lock);
 705                 return;
 706         }
 707 
 708         if (pe != NULL && pe->portev_source == PORT_SOURCE_USER) {
 709                 scp->sc_flags &= ~SVP_CF_USER;
 710                 if ((scp->sc_flags & SVP_CF_UFLAG) == 0) {
 711                         mutex_exit(&scp->sc_lock);
 712                         return;
 713                 }
 714         }
 715 
 716         /* Check if this needs to be freed */
 717         if (scp->sc_flags & SVP_CF_REAP) {
 718                 mutex_exit(&scp->sc_lock);
 719                 svp_conn_destroy(scp);
 720                 return;
 721         }
 722 
 723         /* Check if this needs to be reset */
 724         if (scp->sc_flags & SVP_CF_TEARDOWN) {
 725                 /* Make sure any other users of this are disassociated */
 726                 ret = SVP_RA_ERROR;
 727                 goto out;
 728         }
 729 
 730         switch (scp->sc_cstate) {
 731         case SVP_CS_INITIAL:
 732         case SVP_CS_BACKOFF:
 733                 assert(pe == NULL);
 734                 ret = svp_conn_connect(scp);
 735                 break;
 736         case SVP_CS_CONNECTING:
 737                 assert(pe != NULL);
 738                 ret = svp_conn_poll_connect(pe, scp);
 739                 break;
 740         case SVP_CS_ACTIVE:
 741         case SVP_CS_WINDDOWN:
 742                 assert(pe != NULL);
 743                 oldstate = scp->sc_cstate;
 744                 if (pe->portev_events & POLLOUT)
 745                         ret = svp_conn_pollout(scp);
 746                 if (ret == SVP_RA_NONE && (pe->portev_events & POLLIN))
 747                         ret = svp_conn_pollin(scp);
 748 
 749                 if (oldstate == SVP_CS_WINDDOWN &&
 750                     (list_is_empty(&scp->sc_queries) || ret != SVP_RA_NONE)) {
 751                         ret = SVP_RA_CLEANUP;
 752                 }
 753 
 754                 if (ret == SVP_RA_NONE) {
 755                         int err;
 756                         if ((err = svp_event_associate(&scp->sc_event,
 757                             scp->sc_socket)) != 0) {
 758                                 scp->sc_error = SVP_CE_ASSOCIATE;
 759                                 scp->sc_errno = err;
 760                                 scp->sc_cstate = SVP_CS_ERROR;
 761                                 ret = SVP_RA_DEGRADE;
 762                         }
 763                 }
 764                 break;
 765         default:
 766                 libvarpd_panic("svp_conn_handler encountered unexpected "
 767                     "state: %d", scp->sc_cstate);
 768         }
 769 out:
 770         mutex_exit(&scp->sc_lock);
 771 
 772         if (ret == SVP_RA_NONE)
 773                 return;
 774 
 775         mutex_enter(&srp->sr_lock);
 776         mutex_enter(&scp->sc_lock);
 777         if (ret == SVP_RA_ERROR)
 778                 ret = svp_conn_reset(scp);
 779 
 780         if (ret == SVP_RA_DEGRADE)
 781                 svp_conn_degrade(scp);
 782         if (ret == SVP_RA_RESTORE)
 783                 svp_conn_restore(scp);
 784 
 785         if (ret == SVP_RA_CLEANUP) {
 786                 svp_conn_remove(scp);
 787                 scp->sc_flags |= SVP_CF_REAP;
 788                 svp_conn_inject(scp);
 789         }
 790         mutex_exit(&scp->sc_lock);
 791         mutex_exit(&srp->sr_lock);
 792 }
 793 
 794 static void
 795 svp_conn_backtimer(void *arg)
 796 {
 797         svp_conn_t *scp = arg;
 798 
 799         svp_conn_handler(NULL, scp);
 800 }
 801 
 802 /*
 803  * This fires every svp_conn_query_timeout seconds. Its purpos is to determine
 804  * if we haven't heard back on a request with in svp_conn_query_timeout seconds.
 805  * If any of the svp_conn_query_t's that have been started (indicated by
 806  * svp_query_t`sq_acttime != -1), and more than svp_conn_query_timeout seconds
 807  * have passed, we basically tear this connection down and reassign outstanding
 808  * queries.
 809  */
 810 static void
 811 svp_conn_querytimer(void *arg)
 812 {
 813         int ret;
 814         svp_query_t *sqp;
 815         svp_conn_t *scp = arg;
 816         hrtime_t now = gethrtime();
 817 
 818         mutex_enter(&scp->sc_lock);
 819 
 820         /*
 821          * If we're not in the active state, then we don't care about this as
 822          * we're already either going to die or we have no connections to worry
 823          * about.
 824          */
 825         if (scp->sc_cstate != SVP_CS_ACTIVE) {
 826                 mutex_exit(&scp->sc_lock);
 827                 return;
 828         }
 829 
 830         for (sqp = list_head(&scp->sc_queries); sqp != NULL;
 831             sqp = list_next(&scp->sc_queries, sqp)) {
 832                 if (sqp->sq_acttime == -1)
 833                         continue;
 834                 if ((now - sqp->sq_acttime) / NANOSEC > svp_conn_query_timeout)
 835                         break;
 836         }
 837 
 838         /* Nothing timed out, we're good here */
 839         if (sqp == NULL) {
 840                 mutex_exit(&scp->sc_lock);
 841                 return;
 842         }
 843 
 844         (void) bunyan_warn(svp_bunyan, "query timed out on connection",
 845             BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 846             BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 847             BUNYAN_T_INT32, "operation", ntohs(sqp->sq_header.svp_op),
 848             BUNYAN_T_END);
 849 
 850         /*
 851          * Begin the tear down process for this connect. If we lose the
 852          * disassociate, then we don't inject an event. See the big theory
 853          * statement in libvarpd_svp.c for more information.
 854          */
 855         scp->sc_flags |= SVP_CF_TEARDOWN;
 856 
 857         ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket);
 858         if (ret == 0)
 859                 svp_conn_inject(scp);
 860         else
 861                 VERIFY(ret == ENOENT);
 862 
 863         mutex_exit(&scp->sc_lock);
 864 }
 865 
 866 /*
 867  * This connection has fallen out of DNS, figure out what we need to do with it.
 868  */
 869 void
 870 svp_conn_fallout(svp_conn_t *scp)
 871 {
 872         svp_remote_t *srp = scp->sc_remote;
 873 
 874         assert(MUTEX_HELD(&srp->sr_lock));
 875 
 876         mutex_enter(&scp->sc_lock);
 877         switch (scp->sc_cstate) {
 878         case SVP_CS_ERROR:
 879                 /*
 880                  * Connection is already inactive, so it's safe to tear down.
 881                  * Fire it off through the state machine to tear down via the
 882                  * backoff timer.
 883                  */
 884                 svp_conn_remove(scp);
 885                 scp->sc_flags |= SVP_CF_REAP;
 886                 svp_conn_inject(scp);
 887                 break;
 888         case SVP_CS_INITIAL:
 889         case SVP_CS_BACKOFF:
 890         case SVP_CS_CONNECTING:
 891                 /*
 892                  * Here, we have something actively going on, so we'll let it be
 893                  * clean up the next time we hit the event loop by the event
 894                  * loop itself. As it has no connections, there isn't much to
 895                  * really do, though we'll take this chance to go ahead and
 896                  * remove it from the remote.
 897                  */
 898                 svp_conn_remove(scp);
 899                 scp->sc_flags |= SVP_CF_REAP;
 900                 svp_conn_inject(scp);
 901                 break;
 902         case SVP_CS_ACTIVE:
 903         case SVP_CS_WINDDOWN:
 904                 /*
 905                  * If there are no outstanding queries, then we should simply
 906                  * clean this up now,t he same way we would with the others.
 907                  * Othewrise, as we know the event loop is ongoing, we'll make
 908                  * sure that these entries get cleaned up once they're done.
 909                  */
 910                 scp->sc_cstate = SVP_CS_WINDDOWN;
 911                 if (list_is_empty(&scp->sc_queries)) {
 912                         svp_conn_remove(scp);
 913                         scp->sc_flags |= SVP_CF_REAP;
 914                         svp_conn_inject(scp);
 915                 }
 916                 break;
 917         default:
 918                 libvarpd_panic("svp_conn_fallout encountered"
 919                     "unkonwn state");
 920         }
 921         mutex_exit(&scp->sc_lock);
 922         mutex_exit(&srp->sr_lock);
 923 }
 924 
 925 int
 926 svp_conn_create(svp_remote_t *srp, const struct in6_addr *addr)
 927 {
 928         int ret;
 929         svp_conn_t *scp;
 930 
 931         assert(MUTEX_HELD(&srp->sr_lock));
 932         scp = umem_zalloc(sizeof (svp_conn_t), UMEM_DEFAULT);
 933         if (scp == NULL)
 934                 return (ENOMEM);
 935 
 936         if ((ret = mutex_init(&scp->sc_lock, USYNC_THREAD | LOCK_ERRORCHECK,
 937             NULL)) != 0) {
 938                 umem_free(scp, sizeof (svp_conn_t));
 939                 return (ret);
 940         }
 941 
 942         scp->sc_remote = srp;
 943         scp->sc_event.se_func = svp_conn_handler;
 944         scp->sc_event.se_arg = scp;
 945         scp->sc_btimer.st_func = svp_conn_backtimer;
 946         scp->sc_btimer.st_arg = scp;
 947         scp->sc_btimer.st_oneshot = B_TRUE;
 948         scp->sc_btimer.st_value = 1;
 949 
 950         scp->sc_qtimer.st_func = svp_conn_querytimer;
 951         scp->sc_qtimer.st_arg = scp;
 952         scp->sc_qtimer.st_oneshot = B_FALSE;
 953         scp->sc_qtimer.st_value = svp_conn_query_timeout;
 954 
 955         scp->sc_socket = -1;
 956 
 957         list_create(&scp->sc_queries, sizeof (svp_query_t),
 958             offsetof(svp_query_t, sq_lnode));
 959         scp->sc_gen = srp->sr_gen;
 960         bcopy(addr, &scp->sc_addr, sizeof (struct in6_addr));
 961         scp->sc_cstate = SVP_CS_INITIAL;
 962         mutex_enter(&scp->sc_lock);
 963         svp_conn_add(scp);
 964         mutex_exit(&scp->sc_lock);
 965 
 966         /* Now that we're locked and loaded, add our timers */
 967         svp_timer_add(&scp->sc_qtimer);
 968         svp_timer_add(&scp->sc_btimer);
 969 
 970         return (0);
 971 }
 972 
 973 /*
 974  * At the time of calling, the entry has been removed from all lists. In
 975  * addition, the entries state should be SVP_CS_ERROR, therefore, we know that
 976  * the fd should not be associated with the event loop. We'll double check that
 977  * just in case. We should also have already been removed from the remote's
 978  * list.
 979  */
 980 void
 981 svp_conn_destroy(svp_conn_t *scp)
 982 {
 983         int ret;
 984 
 985         mutex_enter(&scp->sc_lock);
 986         if (scp->sc_cstate != SVP_CS_ERROR)
 987                 libvarpd_panic("asked to tear down an active connection");
 988         if (scp->sc_flags & SVP_CF_ADDED)
 989                 libvarpd_panic("asked to remove a connection still in "
 990                     "the remote list\n");
 991         if (!list_is_empty(&scp->sc_queries))
 992                 libvarpd_panic("asked to remove a connection with non-empty "
 993                     "query list");
 994 
 995         if ((ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket)) !=
 996             ENOENT) {
 997                 libvarpd_panic("dissociate failed or was actually "
 998                     "associated: %d", ret);
 999         }
1000         mutex_exit(&scp->sc_lock);
1001 
1002         /* Verify our timers are killed */
1003         svp_timer_remove(&scp->sc_btimer);
1004         svp_timer_remove(&scp->sc_qtimer);
1005 
1006         if (scp->sc_socket != -1 && close(scp->sc_socket) != 0)
1007                 libvarpd_panic("failed to close svp_conn_t`scp_socket fd "
1008                     "%d: %d", scp->sc_socket, errno);
1009 
1010         list_destroy(&scp->sc_queries);
1011         umem_free(scp, sizeof (svp_conn_t));
1012 }
1013 
1014 void
1015 svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp)
1016 {
1017         assert(MUTEX_HELD(&scp->sc_lock));
1018         assert(scp->sc_cstate == SVP_CS_ACTIVE);
1019 
1020         sqp->sq_acttime = -1;
1021         list_insert_tail(&scp->sc_queries, sqp);
1022         if (!(scp->sc_event.se_events & POLLOUT)) {
1023                 scp->sc_event.se_events |= POLLOUT;
1024                 /*
1025                  * If this becomes frequent, we should instead give up on this
1026                  * set of connections instead of aborting.
1027                  */
1028                 if (svp_event_associate(&scp->sc_event, scp->sc_socket) != 0)
1029                         libvarpd_panic("svp_event_associate failed somehow");
1030         }
1031 }