Print this page
Try versioning as a new state


  23 #include <assert.h>
  24 #include <umem.h>
  25 #include <errno.h>
  26 #include <strings.h>
  27 #include <unistd.h>
  28 #include <stddef.h>
  29 #include <sys/uio.h>
  30 #include <sys/debug.h>
  31 
  32 #include <libvarpd_svp.h>
  33 
  34 int svp_conn_query_timeout = 30;
  35 static int svp_conn_backoff_tbl[] = { 1, 2, 4, 8, 16, 32 };
  36 static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int);
  37 
  38 typedef enum svp_conn_act {
  39         SVP_RA_NONE     = 0x00,
  40         SVP_RA_DEGRADE  = 0x01,
  41         SVP_RA_RESTORE  = 0x02,
  42         SVP_RA_ERROR    = 0x03,
  43         SVP_RA_CLEANUP  = 0x04

  44 } svp_conn_act_t;
  45 
  46 static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
  47 
  48 static void
  49 svp_conn_inject(svp_conn_t *scp)
  50 {
  51         int ret;
  52         assert(MUTEX_HELD(&scp->sc_lock));
  53 
  54         if (scp->sc_flags & SVP_CF_USER)
  55                 return;
  56         scp->sc_flags |= SVP_CF_USER;
  57         if ((ret = svp_event_inject(&scp->sc_event)) != 0)
  58                 libvarpd_panic("failed to inject event: %d\n", ret);
  59 }
  60 
  61 static void
  62 svp_conn_degrade(svp_conn_t *scp)
  63 {


  75                 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
  76 }
  77 
  78 static void
  79 svp_conn_restore(svp_conn_t *scp)
  80 {
  81         svp_remote_t *srp = scp->sc_remote;
  82 
  83         assert(MUTEX_HELD(&srp->sr_lock));
  84         assert(MUTEX_HELD(&scp->sc_lock));
  85 
  86         if (!(scp->sc_flags & SVP_CF_DEGRADED))
  87                 return;
  88 
  89         scp->sc_flags &= ~SVP_CF_DEGRADED;
  90         if (srp->sr_ndconns == srp->sr_tconns)
  91                 svp_remote_restore(srp, SVP_RD_REMOTE_FAIL);
  92         srp->sr_ndconns--;
  93 }
  94 

















  95 static void



















































  96 svp_conn_add(svp_conn_t *scp)
  97 {
  98         svp_remote_t *srp = scp->sc_remote;
  99 
 100         assert(MUTEX_HELD(&srp->sr_lock));
 101         assert(MUTEX_HELD(&scp->sc_lock));
 102 
 103         if (scp->sc_flags & SVP_CF_ADDED)
 104                 return;
 105 
 106         list_insert_tail(&srp->sr_conns, scp);
 107         scp->sc_flags |= SVP_CF_ADDED;
 108         srp->sr_tconns++;
 109 }
 110 
 111 static void
 112 svp_conn_remove(svp_conn_t *scp)
 113 {
 114         svp_remote_t *srp = scp->sc_remote;
 115 


 152                 libvarpd_panic("failed to close socket %d: %d\n",
 153                     scp->sc_socket, errno);
 154         scp->sc_socket = -1;
 155 
 156         scp->sc_cstate = SVP_CS_BACKOFF;
 157         scp->sc_nbackoff++;
 158         if (scp->sc_nbackoff >= svp_conn_nbackoff) {
 159                 scp->sc_btimer.st_value =
 160                     svp_conn_backoff_tbl[svp_conn_nbackoff - 1];
 161         } else {
 162                 scp->sc_btimer.st_value =
 163                     svp_conn_backoff_tbl[scp->sc_nbackoff - 1];
 164         }
 165         svp_timer_add(&scp->sc_btimer);
 166 
 167         if (scp->sc_nbackoff > svp_conn_nbackoff)
 168                 return (SVP_RA_DEGRADE);
 169         return (SVP_RA_NONE);
 170 }
 171 
 172 /*
 173  * Think of this as an extension to the connect() call in svp_conn_connect().
 174  * Send a message, receive it, and set the version here.  If the response is
 175  * too slow or the socket throws an error, indicate a socket error, which
 176  * will cause the caller to backoff (i.e. close the socket and try again).
 177  *
 178  * Version mismatch (corrupt SVP server or too-advanced SVP server) is its
 179  * own error type.
 180  */
 181 static svp_conn_error_t
 182 svp_conn_version_set(svp_conn_t *scp)
 183 {
 184         svp_req_t ping;
 185         ssize_t ret;
 186         uint32_t save_crc;
 187         uint16_t peer_version;
 188         int ntries = 3; /* One second between tries. 3secs should be enough. */
 189 
 190         ping.svp_ver = htons(SVP_CURRENT_VERSION);
 191         ping.svp_op = htons(SVP_R_PING);
 192         ping.svp_size = 0;      /* Header-only... */
 193         ping.svp_id = 0;
 194         /* 0-length data... just use the req buffer for the pointer. */
 195         svp_query_crc32(&ping, &ping, 0);
 196 
 197         ret = write(scp->sc_socket, &ping, sizeof (ping));
 198         if (ret == -1) {
 199                 /*
 200                  * A failed write() call right after connect probably
 201                  * indicates a larger connection failure.  Restart the
 202                  * connection from scratch.
 203                  */
 204                 return (SVP_CE_SOCKET);
 205         }
 206         assert(ret == sizeof (ping));
 207         do {
 208                 /*
 209                  * Asynch read.  We may loop here once or twice.
 210                  * Wait a bit, but don't loop too many times...
 211                  */
 212                 (void) sleep(1);
 213                 ret = read(scp->sc_socket, &ping, sizeof (ping));
 214         } while (--ntries > 0 &&
 215             ret == -1 && (errno == EINTR || errno == EAGAIN));
 216         if (ret == -1) {
 217                 /*
 218                  * This is actually a failed read() call.  Restart the
 219                  * connection from scratch.
 220                  */
 221                 return (SVP_CE_SOCKET);
 222         }
 223 
 224         save_crc = ping.svp_crc32;
 225         svp_query_crc32(&ping, &ping, 0);
 226         peer_version = htons(ping.svp_ver);
 227         if (ping.svp_op != htons(SVP_R_PONG) ||
 228             ping.svp_size != 0 || ping.svp_id != 0 ||
 229             ping.svp_crc32 != save_crc ||
 230             peer_version == 0 || peer_version > SVP_CURRENT_VERSION) {
 231                 return (SVP_CE_VERSION_PONG);
 232         }
 233 
 234         /* This connection now has a version! */
 235         scp->sc_version = peer_version;
 236         return (SVP_CE_NONE);
 237 }
 238 
 239 static svp_conn_act_t
 240 svp_conn_connect(svp_conn_t *scp)
 241 {
 242         int ret;
 243         struct sockaddr_in6 in6;
 244 
 245         assert(MUTEX_HELD(&scp->sc_lock));
 246         assert(scp->sc_cstate == SVP_CS_BACKOFF ||
 247             scp->sc_cstate == SVP_CS_INITIAL);
 248         assert(scp->sc_socket == -1);
 249         if (scp->sc_cstate == SVP_CS_INITIAL)
 250                 scp->sc_nbackoff = 0;
 251 
 252         /* New connect means we need to know the version. */
 253         scp->sc_version = 0;
 254 
 255         scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
 256         if (scp->sc_socket == -1) {
 257                 scp->sc_error = SVP_CE_SOCKET;
 258                 scp->sc_errno = errno;


 307                 } else {
 308                         /*
 309                          * This call failed, which means that we obtained one of
 310                          * the following:
 311                          *
 312                          * EADDRNOTAVAIL
 313                          * ECONNREFUSED
 314                          * EIO
 315                          * ENETUNREACH
 316                          * EHOSTUNREACH
 317                          * ENXIO
 318                          * ETIMEDOUT
 319                          *
 320                          * Therefore we need to set ourselves into backoff and
 321                          * wait for that to clear up.
 322                          */
 323                         return (svp_conn_backoff(scp));
 324                 }
 325         }
 326 

 327         return (svp_conn_poll_connect(NULL, scp));
 328 }
 329 
 330 /*
 331  * This should be the first call we get after a successful synchronous
 332  * connect, or a completed (failed or successful) asynchronous connect.  A
 333  * non-NULL port-event indicates asynchronous completion, a NULL port-event
 334  * indicates a successful synchronous connect.
 335  * 
 336  * If we have successfully connected, we should see a writeable event.  In the
 337  * asynchronous case, we may also see an error or a hang up. For either hang
 338  * up or error, we transition to error mode. If there is also a readable event
 339  * (i.e. incoming data), we ignore it at the moment and just let a
 340  * reassociation pick it up so we can simplify the set of state transitions
 341  * that we have.
 342  */
 343 static svp_conn_act_t
 344 svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
 345 {
 346         int ret;


 353                 /*
 354                  * These bits only matter if we're notified of an
 355                  * asynchronous connection completion.
 356                  */
 357                 if (!(pe->portev_events & POLLOUT)) {
 358                         scp->sc_errno = 0;
 359                         scp->sc_error = SVP_CE_NOPOLLOUT;
 360                         scp->sc_cstate = SVP_CS_ERROR;
 361                         return (SVP_RA_DEGRADE);
 362                 }
 363 
 364                 ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err,
 365                     &sl);
 366                 if (ret != 0)
 367                         libvarpd_panic("unanticipated getsockopt error");
 368                 if (err != 0) {
 369                         return (svp_conn_backoff(scp));
 370                 }
 371         }
 372 
 373         /* Use a single SVP_R_PING to determine the version. */
 374         version_error = svp_conn_version_set(scp);
 375         switch (version_error) {
 376         case SVP_CE_SOCKET:
 377                 /* Use this to signal read/write errors... */
 378                 return (svp_conn_backoff(scp));
 379         case SVP_CE_NONE:
 380                 assert(scp->sc_version > 0 &&
 381                     scp->sc_version <= SVP_CURRENT_VERSION);
 382                 break;
 383         default:
 384                 scp->sc_error = version_error;
 385                 scp->sc_cstate = SVP_CS_ERROR;
 386                 scp->sc_errno = EPROTONOSUPPORT;     /* Protocol error... */
 387                 return (SVP_RA_DEGRADE);
 388         }
 389 
 390         scp->sc_cstate = SVP_CS_ACTIVE;
 391         scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
 392         ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
 393         if (ret == 0)
 394                 return (SVP_RA_RESTORE);
 395         scp->sc_error = SVP_CE_ASSOCIATE;
 396         scp->sc_errno = ret;
 397         scp->sc_cstate = SVP_CS_ERROR;
 398         return (SVP_RA_DEGRADE);
 399 }
 400 
 401 static svp_conn_act_t
 402 svp_conn_pollout(svp_conn_t *scp)
 403 {
 404         svp_query_t *sqp;
 405         svp_req_t *req;
 406         size_t off;
 407         struct iovec iov[2];
 408         int nvecs = 0;
 409         ssize_t ret;
 410 
 411         assert(MUTEX_HELD(&scp->sc_lock));
 412 
 413         /*
 414          * We need to find a query and start writing it out.
 415          */
 416         if (scp->sc_output.sco_query == NULL) {
 417                 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
 418                     sqp = list_next(&scp->sc_queries, sqp)) {


 463                 default:
 464                         libvarpd_panic("unexpected errno: %d", errno);
 465                 }
 466         }
 467 
 468         sqp->sq_acttime = gethrtime();
 469         scp->sc_output.sco_offset += ret;
 470         if (ret >= sizeof (svp_req_t) + sqp->sq_rsize) {
 471                 sqp->sq_state = SVP_QUERY_READING;
 472                 scp->sc_output.sco_query = NULL;
 473                 scp->sc_output.sco_offset = 0;
 474                 scp->sc_event.se_events |= POLLOUT;
 475         }
 476         return (SVP_RA_NONE);
 477 }
 478 
 479 static boolean_t
 480 svp_conn_pollin_validate(svp_conn_t *scp)
 481 {
 482         svp_query_t *sqp;
 483         uint32_t nsize;
 484         uint16_t nvers, nop;
 485         svp_req_t *resp = &scp->sc_input.sci_req;
 486 
 487         assert(MUTEX_HELD(&scp->sc_lock));
 488 
 489         nvers = ntohs(resp->svp_ver);
 490         nop = ntohs(resp->svp_op);
 491         nsize = ntohl(resp->svp_size);
 492 
 493         /*
 494          * A peer that's messing with post-connection version changes is
 495          * likely a broken peer.
 496          */
 497         if (nvers != scp->sc_version) {
 498                 (void) bunyan_warn(svp_bunyan, "version mismatch",
 499                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 500                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 501                     BUNYAN_T_INT32, "peer version", nvers,
 502                     BUNYAN_T_INT32, "our version", scp->sc_version,
 503                     BUNYAN_T_INT32, "operation", nop,
 504                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 505                     BUNYAN_T_END);
 506                 return (B_FALSE);
 507         }
 508 
 509         if (nop != SVP_R_VL2_ACK && nop != SVP_R_VL3_ACK &&
 510             nop != SVP_R_LOG_ACK && nop != SVP_R_LOG_RM_ACK) {
















 511                 (void) bunyan_warn(svp_bunyan, "unsupported operation",
 512                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 513                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 514                     BUNYAN_T_INT32, "version", nvers,
 515                     BUNYAN_T_INT32, "operation", nop,
 516                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 517                     BUNYAN_T_END);
 518                 return (B_FALSE);
 519         }
 520 
 521         sqp = svp_conn_query_find(scp, resp->svp_id);
 522         if (sqp == NULL) {
 523                 (void) bunyan_warn(svp_bunyan, "unknown response id",
 524                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 525                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 526                     BUNYAN_T_INT32, "version", nvers,
 527                     BUNYAN_T_INT32, "operation", nop,
 528                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 529                     BUNYAN_T_END);
 530                 return (B_FALSE);
 531         }
 532 
 533         if (sqp->sq_state != SVP_QUERY_READING) {
 534                 (void) bunyan_warn(svp_bunyan,
 535                     "got response for unexpecting query",
 536                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 537                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 538                     BUNYAN_T_INT32, "version", nvers,
 539                     BUNYAN_T_INT32, "operation", nop,
 540                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 541                     BUNYAN_T_INT32, "query_state", sqp->sq_state,
 542                     BUNYAN_T_END);
 543                 return (B_FALSE);
 544         }
 545 
 546         if ((nop == SVP_R_VL2_ACK && nsize != sizeof (svp_vl2_ack_t)) ||
 547             (nop == SVP_R_VL3_ACK && nsize != sizeof (svp_vl3_ack_t)) ||
 548             (nop == SVP_R_LOG_RM_ACK && nsize != sizeof (svp_lrm_ack_t))) {
 549                 (void) bunyan_warn(svp_bunyan, "response size too large",
 550                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 551                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 552                     BUNYAN_T_INT32, "version", nvers,
 553                     BUNYAN_T_INT32, "operation", nop,
 554                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 555                     BUNYAN_T_INT32, "response_size", nsize,
 556                     BUNYAN_T_INT32, "expected_size", nop == SVP_R_VL2_ACK ?
 557                     sizeof (svp_vl2_ack_t) : sizeof (svp_vl3_ack_t),
 558                     BUNYAN_T_INT32, "query_state", sqp->sq_state,
 559                     BUNYAN_T_END);
 560                 return (B_FALSE);
 561         }
 562 
 563         /*
 564          * The valid size is anything <= to what the user requested, but at
 565          * least svp_log_ack_t bytes large.
 566          */
 567         if (nop == SVP_R_LOG_ACK) {
 568                 const char *msg = NULL;


 573                 if (msg != NULL) {
 574                         (void) bunyan_warn(svp_bunyan, msg,
 575                             BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 576                             BUNYAN_T_INT32, "remote_port",
 577                             scp->sc_remote->sr_rport,
 578                             BUNYAN_T_INT32, "version", nvers,
 579                             BUNYAN_T_INT32, "operation", nop,
 580                             BUNYAN_T_INT32, "response_id", resp->svp_id,
 581                             BUNYAN_T_INT32, "response_size", nsize,
 582                             BUNYAN_T_INT32, "expected_size",
 583                             ((svp_log_req_t *)sqp->sq_rdata)->svlr_count,
 584                             BUNYAN_T_INT32, "query_state", sqp->sq_state,
 585                             BUNYAN_T_END);
 586                         return (B_FALSE);
 587                 }
 588         }
 589 
 590         sqp->sq_size = nsize;
 591         scp->sc_input.sci_query = sqp;
 592         if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK ||
 593             nop == SVP_R_LOG_RM_ACK) {

 594                 sqp->sq_wdata = &sqp->sq_wdun;
 595                 sqp->sq_wsize = sizeof (svp_query_data_t);
 596         } else {
 597                 VERIFY(nop == SVP_R_LOG_ACK);
 598                 assert(sqp->sq_wdata != NULL);
 599                 assert(sqp->sq_wsize != 0);
 600         }
 601 
 602         return (B_TRUE);
 603 }
 604 
 605 static svp_conn_act_t
 606 svp_conn_pollin(svp_conn_t *scp)
 607 {
 608         size_t off, total;
 609         ssize_t ret;
 610         svp_query_t *sqp;
 611         uint32_t crc;
 612         uint16_t nop;
 613 


 663         sqp->sq_acttime = gethrtime();
 664         total = ntohl(scp->sc_input.sci_req.svp_size);
 665         do {
 666                 ret = read(scp->sc_socket,
 667                     (void *)((uintptr_t)sqp->sq_wdata + off),
 668                     total - off);
 669         } while (ret == -1 && errno == EINTR);
 670 
 671         if (ret == -1) {
 672                 switch (errno) {
 673                 case EAGAIN:
 674                         scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 675                         return (SVP_RA_NONE);
 676                 case EIO:
 677                 case ECONNRESET:
 678                         return (SVP_RA_ERROR);
 679                         break;
 680                 default:
 681                         libvarpd_panic("unexpeted read errno: %d", errno);
 682                 }
 683         } else if (ret == 0) {
 684                 /* Try to reconnect to the remote host */
 685                 return (SVP_RA_ERROR);
 686         }
 687 
 688         if (ret + off < total) {
 689                 scp->sc_input.sci_offset += ret;
 690                 return (SVP_RA_NONE);
 691         }
 692 
 693         nop = ntohs(scp->sc_input.sci_req.svp_op);
 694         crc = scp->sc_input.sci_req.svp_crc32;
 695         svp_query_crc32(&scp->sc_input.sci_req, sqp->sq_wdata, total);
 696         if (crc != scp->sc_input.sci_req.svp_crc32) {
 697                 (void) bunyan_info(svp_bunyan, "crc32 mismatch",
 698                     BUNYAN_T_IP, "remote ip", &scp->sc_addr,
 699                     BUNYAN_T_INT32, "remote port", scp->sc_remote->sr_rport,
 700                     BUNYAN_T_INT32, "version",
 701                     ntohs(scp->sc_input.sci_req.svp_ver),
 702                     BUNYAN_T_INT32, "operation", nop,
 703                     BUNYAN_T_INT32, "response id",


 707                     BUNYAN_T_UINT32, "calc_crc",
 708                     ntohl(scp->sc_input.sci_req.svp_crc32),
 709                     BUNYAN_T_END);
 710                 return (SVP_RA_ERROR);
 711         }
 712         scp->sc_input.sci_query = NULL;
 713         scp->sc_input.sci_offset = 0;
 714 
 715         if (nop == SVP_R_VL2_ACK) {
 716                 svp_vl2_ack_t *sl2a = sqp->sq_wdata;
 717                 sqp->sq_status = ntohl(sl2a->sl2a_status);
 718         } else if (nop == SVP_R_VL3_ACK) {
 719                 svp_vl3_ack_t *sl3a = sqp->sq_wdata;
 720                 sqp->sq_status = ntohl(sl3a->sl3a_status);
 721         } else if (nop == SVP_R_LOG_ACK) {
 722                 svp_log_ack_t *svla = sqp->sq_wdata;
 723                 sqp->sq_status = ntohl(svla->svla_status);
 724         } else if (nop == SVP_R_LOG_RM_ACK) {
 725                 svp_lrm_ack_t *svra = sqp->sq_wdata;
 726                 sqp->sq_status = ntohl(svra->svra_status);














 727         } else {
 728                 libvarpd_panic("unhandled nop: %d", nop);
 729         }
 730 
 731         list_remove(&scp->sc_queries, sqp);
 732         mutex_exit(&scp->sc_lock);
 733 
 734         /*
 735          * We have to release all of our resources associated with this entry
 736          * before we call the callback. After we call it, the memory will be
 737          * lost to time.
 738          */
 739         svp_query_release(sqp);
 740         sqp->sq_func(sqp, sqp->sq_arg);
 741         mutex_enter(&scp->sc_lock);
 742         scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 743 
 744         return (SVP_RA_NONE);
 745 }
 746 


 818                 return;
 819         }
 820 
 821         /* Check if this needs to be reset */
 822         if (scp->sc_flags & SVP_CF_TEARDOWN) {
 823                 /* Make sure any other users of this are disassociated */
 824                 ret = SVP_RA_ERROR;
 825                 goto out;
 826         }
 827 
 828         switch (scp->sc_cstate) {
 829         case SVP_CS_INITIAL:
 830         case SVP_CS_BACKOFF:
 831                 assert(pe == NULL);
 832                 ret = svp_conn_connect(scp);
 833                 break;
 834         case SVP_CS_CONNECTING:
 835                 assert(pe != NULL);
 836                 ret = svp_conn_poll_connect(pe, scp);
 837                 break;

 838         case SVP_CS_ACTIVE:
 839         case SVP_CS_WINDDOWN:
 840                 assert(pe != NULL);
 841                 oldstate = scp->sc_cstate;
 842                 if (pe->portev_events & POLLOUT)
 843                         ret = svp_conn_pollout(scp);
 844                 if (ret == SVP_RA_NONE && (pe->portev_events & POLLIN))
 845                         ret = svp_conn_pollin(scp);
 846 
 847                 if (oldstate == SVP_CS_WINDDOWN &&
 848                     (list_is_empty(&scp->sc_queries) || ret != SVP_RA_NONE)) {
 849                         ret = SVP_RA_CLEANUP;
 850                 }
 851 
 852                 if (ret == SVP_RA_NONE) {
 853                         int err;
 854                         if ((err = svp_event_associate(&scp->sc_event,
 855                             scp->sc_socket)) != 0) {
 856                                 scp->sc_error = SVP_CE_ASSOCIATE;
 857                                 scp->sc_errno = err;
 858                                 scp->sc_cstate = SVP_CS_ERROR;
 859                                 ret = SVP_RA_DEGRADE;
 860                         }
 861                 }
 862                 break;
 863         default:
 864                 libvarpd_panic("svp_conn_handler encountered unexpected "
 865                     "state: %d", scp->sc_cstate);
 866         }
 867 out:
 868         mutex_exit(&scp->sc_lock);
 869 
 870         if (ret == SVP_RA_NONE)
 871                 return;
 872 
 873         mutex_enter(&srp->sr_lock);
 874         mutex_enter(&scp->sc_lock);



 875         if (ret == SVP_RA_ERROR)
 876                 ret = svp_conn_reset(scp);
 877 
 878         if (ret == SVP_RA_DEGRADE)
 879                 svp_conn_degrade(scp);
 880         if (ret == SVP_RA_RESTORE)
 881                 svp_conn_restore(scp);
 882 
 883         if (ret == SVP_RA_CLEANUP) {
 884                 svp_conn_remove(scp);
 885                 scp->sc_flags |= SVP_CF_REAP;
 886                 svp_conn_inject(scp);
 887         }
 888         mutex_exit(&scp->sc_lock);
 889         mutex_exit(&srp->sr_lock);
 890 }
 891 
 892 static void
 893 svp_conn_backtimer(void *arg)
 894 {


1096                     "associated: %d", ret);
1097         }
1098         mutex_exit(&scp->sc_lock);
1099 
1100         /* Verify our timers are killed */
1101         svp_timer_remove(&scp->sc_btimer);
1102         svp_timer_remove(&scp->sc_qtimer);
1103 
1104         if (scp->sc_socket != -1 && close(scp->sc_socket) != 0)
1105                 libvarpd_panic("failed to close svp_conn_t`scp_socket fd "
1106                     "%d: %d", scp->sc_socket, errno);
1107 
1108         list_destroy(&scp->sc_queries);
1109         umem_free(scp, sizeof (svp_conn_t));
1110 }
1111 
1112 void
1113 svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp)
1114 {
1115         assert(MUTEX_HELD(&scp->sc_lock));
1116         assert(scp->sc_cstate == SVP_CS_ACTIVE);

1117 
1118         sqp->sq_acttime = -1;
1119         list_insert_tail(&scp->sc_queries, sqp);
1120         if (!(scp->sc_event.se_events & POLLOUT)) {
1121                 scp->sc_event.se_events |= POLLOUT;
1122                 /*
1123                  * If this becomes frequent, we should instead give up on this
1124                  * set of connections instead of aborting.
1125                  */
1126                 if (svp_event_associate(&scp->sc_event, scp->sc_socket) != 0)
1127                         libvarpd_panic("svp_event_associate failed somehow");
1128         }
1129 }


  23 #include <assert.h>
  24 #include <umem.h>
  25 #include <errno.h>
  26 #include <strings.h>
  27 #include <unistd.h>
  28 #include <stddef.h>
  29 #include <sys/uio.h>
  30 #include <sys/debug.h>
  31 
  32 #include <libvarpd_svp.h>
  33 
  34 int svp_conn_query_timeout = 30;
  35 static int svp_conn_backoff_tbl[] = { 1, 2, 4, 8, 16, 32 };
  36 static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int);
  37 
  38 typedef enum svp_conn_act {
  39         SVP_RA_NONE     = 0x00,
  40         SVP_RA_DEGRADE  = 0x01,
  41         SVP_RA_RESTORE  = 0x02,
  42         SVP_RA_ERROR    = 0x03,
  43         SVP_RA_CLEANUP  = 0x04,
  44         SVP_RA_FIND_VERSION = 0x05
  45 } svp_conn_act_t;
  46 
  47 static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
  48 
  49 static void
  50 svp_conn_inject(svp_conn_t *scp)
  51 {
  52         int ret;
  53         assert(MUTEX_HELD(&scp->sc_lock));
  54 
  55         if (scp->sc_flags & SVP_CF_USER)
  56                 return;
  57         scp->sc_flags |= SVP_CF_USER;
  58         if ((ret = svp_event_inject(&scp->sc_event)) != 0)
  59                 libvarpd_panic("failed to inject event: %d\n", ret);
  60 }
  61 
  62 static void
  63 svp_conn_degrade(svp_conn_t *scp)
  64 {


  76                 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
  77 }
  78 
  79 static void
  80 svp_conn_restore(svp_conn_t *scp)
  81 {
  82         svp_remote_t *srp = scp->sc_remote;
  83 
  84         assert(MUTEX_HELD(&srp->sr_lock));
  85         assert(MUTEX_HELD(&scp->sc_lock));
  86 
  87         if (!(scp->sc_flags & SVP_CF_DEGRADED))
  88                 return;
  89 
  90         scp->sc_flags &= ~SVP_CF_DEGRADED;
  91         if (srp->sr_ndconns == srp->sr_tconns)
  92                 svp_remote_restore(srp, SVP_RD_REMOTE_FAIL);
  93         srp->sr_ndconns--;
  94 }
  95 
  96 static svp_conn_act_t
  97 svp_conn_pong_handler(svp_conn_t *scp, svp_query_t *sqp)
  98 {
  99         uint16_t remote_version = ntohs(scp->sc_input.sci_req.svp_ver);
 100 
 101         if (scp->sc_cstate == SVP_CS_VERSIONING) {
 102                 /* Transition VERSIONING -> ACTIVE. */
 103                 assert(scp->sc_version == 0);
 104                 if (remote_version == 0 || remote_version > SVP_CURRENT_VERSION)
 105                         return (SVP_RA_ERROR);
 106                 scp->sc_version = remote_version;
 107                 scp->sc_cstate = SVP_CS_ACTIVE;
 108         }
 109 
 110         return (SVP_RA_NONE);
 111 }
 112 
 113 static void
 114 svp_conn_ping_cb(svp_query_t *sqp, void *arg)
 115 {
 116         size_t len = (size_t)arg;
 117 
 118         assert(len == sizeof (svp_query_t));
 119         umem_free(sqp, len);
 120 }
 121 
 122 static svp_conn_act_t
 123 svp_conn_ping_version(svp_conn_t *scp)
 124 {
 125         svp_remote_t *srp = scp->sc_remote;
 126         svp_query_t *sqp = umem_zalloc(sizeof (svp_query_t), UMEM_DEFAULT);
 127         int ret;
 128 
 129         assert(MUTEX_HELD(&srp->sr_lock));
 130         assert(MUTEX_HELD(&scp->sc_lock));
 131         assert(scp->sc_cstate == SVP_CS_CONNECTING);
 132 
 133         if (sqp == NULL)
 134                 return (SVP_RA_ERROR);
 135 
 136         /* Only set things that need to be non-0/non-NULL. */
 137         sqp->sq_state = SVP_QUERY_INIT;
 138         sqp->sq_func = svp_conn_ping_cb;
 139         sqp->sq_arg = (void *)sizeof (svp_query_t);
 140         sqp->sq_header.svp_op = htons(SVP_R_PING);
 141         sqp->sq_header.svp_ver = htons(SVP_CURRENT_VERSION);
 142         sqp->sq_header.svp_id = svp_id_alloc();
 143         if (sqp->sq_header.svp_id == -1) {
 144                 umem_free(sqp, sizeof (svp_query_t));
 145                 return (SVP_RA_ERROR);
 146         }
 147 
 148         scp->sc_cstate = SVP_CS_VERSIONING;
 149         /* Set the event flags now... */
 150         scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP | POLLOUT;
 151         /* ...so I can just queue it up directly... */
 152         svp_conn_queue(scp, sqp);
 153         /* ... and then associate the event port myself. */
 154         ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
 155         if (ret == 0)
 156                 return (SVP_RA_RESTORE);
 157         scp->sc_error = SVP_CE_ASSOCIATE;
 158         scp->sc_errno = ret;
 159         scp->sc_cstate = SVP_CS_ERROR;
 160         umem_free(sqp, sizeof (svp_query_t));
 161         return (SVP_RA_DEGRADE);
 162 }
 163 
 164 static void
 165 svp_conn_add(svp_conn_t *scp)
 166 {
 167         svp_remote_t *srp = scp->sc_remote;
 168 
 169         assert(MUTEX_HELD(&srp->sr_lock));
 170         assert(MUTEX_HELD(&scp->sc_lock));
 171 
 172         if (scp->sc_flags & SVP_CF_ADDED)
 173                 return;
 174 
 175         list_insert_tail(&srp->sr_conns, scp);
 176         scp->sc_flags |= SVP_CF_ADDED;
 177         srp->sr_tconns++;
 178 }
 179 
 180 static void
 181 svp_conn_remove(svp_conn_t *scp)
 182 {
 183         svp_remote_t *srp = scp->sc_remote;
 184 


 221                 libvarpd_panic("failed to close socket %d: %d\n",
 222                     scp->sc_socket, errno);
 223         scp->sc_socket = -1;
 224 
 225         scp->sc_cstate = SVP_CS_BACKOFF;
 226         scp->sc_nbackoff++;
 227         if (scp->sc_nbackoff >= svp_conn_nbackoff) {
 228                 scp->sc_btimer.st_value =
 229                     svp_conn_backoff_tbl[svp_conn_nbackoff - 1];
 230         } else {
 231                 scp->sc_btimer.st_value =
 232                     svp_conn_backoff_tbl[scp->sc_nbackoff - 1];
 233         }
 234         svp_timer_add(&scp->sc_btimer);
 235 
 236         if (scp->sc_nbackoff > svp_conn_nbackoff)
 237                 return (SVP_RA_DEGRADE);
 238         return (SVP_RA_NONE);
 239 }
 240 



































































 241 static svp_conn_act_t
 242 svp_conn_connect(svp_conn_t *scp)
 243 {
 244         int ret;
 245         struct sockaddr_in6 in6;
 246 
 247         assert(MUTEX_HELD(&scp->sc_lock));
 248         assert(scp->sc_cstate == SVP_CS_BACKOFF ||
 249             scp->sc_cstate == SVP_CS_INITIAL);
 250         assert(scp->sc_socket == -1);
 251         if (scp->sc_cstate == SVP_CS_INITIAL)
 252                 scp->sc_nbackoff = 0;
 253 
 254         /* New connect means we need to know the version. */
 255         scp->sc_version = 0;
 256 
 257         scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
 258         if (scp->sc_socket == -1) {
 259                 scp->sc_error = SVP_CE_SOCKET;
 260                 scp->sc_errno = errno;


 309                 } else {
 310                         /*
 311                          * This call failed, which means that we obtained one of
 312                          * the following:
 313                          *
 314                          * EADDRNOTAVAIL
 315                          * ECONNREFUSED
 316                          * EIO
 317                          * ENETUNREACH
 318                          * EHOSTUNREACH
 319                          * ENXIO
 320                          * ETIMEDOUT
 321                          *
 322                          * Therefore we need to set ourselves into backoff and
 323                          * wait for that to clear up.
 324                          */
 325                         return (svp_conn_backoff(scp));
 326                 }
 327         }
 328 
 329         /* Immediately successful connection, move to SVP_CS_VERSIONING. */
 330         return (svp_conn_poll_connect(NULL, scp));
 331 }
 332 
 333 /*
 334  * This should be the first call we get after a successful synchronous
 335  * connect, or a completed (failed or successful) asynchronous connect.  A
 336  * non-NULL port-event indicates asynchronous completion, a NULL port-event
 337  * indicates a successful synchronous connect.
 338  * 
 339  * If we have successfully connected, we should see a writeable event.  In the
 340  * asynchronous case, we may also see an error or a hang up. For either hang
 341  * up or error, we transition to error mode. If there is also a readable event
 342  * (i.e. incoming data), we ignore it at the moment and just let a
 343  * reassociation pick it up so we can simplify the set of state transitions
 344  * that we have.
 345  */
 346 static svp_conn_act_t
 347 svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
 348 {
 349         int ret;


 356                 /*
 357                  * These bits only matter if we're notified of an
 358                  * asynchronous connection completion.
 359                  */
 360                 if (!(pe->portev_events & POLLOUT)) {
 361                         scp->sc_errno = 0;
 362                         scp->sc_error = SVP_CE_NOPOLLOUT;
 363                         scp->sc_cstate = SVP_CS_ERROR;
 364                         return (SVP_RA_DEGRADE);
 365                 }
 366 
 367                 ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err,
 368                     &sl);
 369                 if (ret != 0)
 370                         libvarpd_panic("unanticipated getsockopt error");
 371                 if (err != 0) {
 372                         return (svp_conn_backoff(scp));
 373                 }
 374         }
 375 
 376         return (SVP_RA_FIND_VERSION);

























 377 }
 378 
 379 static svp_conn_act_t
 380 svp_conn_pollout(svp_conn_t *scp)
 381 {
 382         svp_query_t *sqp;
 383         svp_req_t *req;
 384         size_t off;
 385         struct iovec iov[2];
 386         int nvecs = 0;
 387         ssize_t ret;
 388 
 389         assert(MUTEX_HELD(&scp->sc_lock));
 390 
 391         /*
 392          * We need to find a query and start writing it out.
 393          */
 394         if (scp->sc_output.sco_query == NULL) {
 395                 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
 396                     sqp = list_next(&scp->sc_queries, sqp)) {


 441                 default:
 442                         libvarpd_panic("unexpected errno: %d", errno);
 443                 }
 444         }
 445 
 446         sqp->sq_acttime = gethrtime();
 447         scp->sc_output.sco_offset += ret;
 448         if (ret >= sizeof (svp_req_t) + sqp->sq_rsize) {
 449                 sqp->sq_state = SVP_QUERY_READING;
 450                 scp->sc_output.sco_query = NULL;
 451                 scp->sc_output.sco_offset = 0;
 452                 scp->sc_event.se_events |= POLLOUT;
 453         }
 454         return (SVP_RA_NONE);
 455 }
 456 
 457 static boolean_t
 458 svp_conn_pollin_validate(svp_conn_t *scp)
 459 {
 460         svp_query_t *sqp;
 461         uint32_t nsize, expected_size = 0;
 462         uint16_t nvers, nop;
 463         svp_req_t *resp = &scp->sc_input.sci_req;
 464 
 465         assert(MUTEX_HELD(&scp->sc_lock));
 466 
 467         nvers = ntohs(resp->svp_ver);
 468         nop = ntohs(resp->svp_op);
 469         nsize = ntohl(resp->svp_size);
 470 
 471         /*
 472          * A peer that's messing with post-connection version changes is
 473          * likely a broken peer.
 474          */
 475         if (scp->sc_cstate != SVP_CS_VERSIONING && nvers != scp->sc_version) {
 476                 (void) bunyan_warn(svp_bunyan, "version mismatch",
 477                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 478                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 479                     BUNYAN_T_INT32, "peer version", nvers,
 480                     BUNYAN_T_INT32, "our version", scp->sc_version,
 481                     BUNYAN_T_INT32, "operation", nop,
 482                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 483                     BUNYAN_T_END);
 484                 return (B_FALSE);
 485         }
 486 
 487         switch (nop) {
 488         case SVP_R_VL2_ACK:
 489                 expected_size = sizeof (svp_vl2_ack_t);
 490                 break;
 491         case SVP_R_VL3_ACK:
 492                 expected_size = sizeof (svp_vl3_ack_t);
 493                 break;
 494         case SVP_R_LOG_RM_ACK:
 495                 expected_size = sizeof (svp_lrm_ack_t);
 496                 break;
 497         case SVP_R_ROUTE_ACK:
 498                 expected_size = sizeof (svp_route_ack_t);
 499                 break;
 500         case SVP_R_LOG_ACK:
 501         case SVP_R_PONG:
 502                 /* No expected size (LOG_ACK) or size is 0 (PONG). */
 503                 break;
 504         default:
 505                 (void) bunyan_warn(svp_bunyan, "unsupported operation",
 506                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 507                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 508                     BUNYAN_T_INT32, "version", nvers,
 509                     BUNYAN_T_INT32, "operation", nop,
 510                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 511                     BUNYAN_T_END);
 512                 return (B_FALSE);
 513         }
 514 
 515         sqp = svp_conn_query_find(scp, resp->svp_id);
 516         if (sqp == NULL) {
 517                 (void) bunyan_warn(svp_bunyan, "unknown response id",
 518                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 519                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 520                     BUNYAN_T_INT32, "version", nvers,
 521                     BUNYAN_T_INT32, "operation", nop,
 522                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 523                     BUNYAN_T_END);
 524                 return (B_FALSE);
 525         }
 526 
 527         if (sqp->sq_state != SVP_QUERY_READING) {
 528                 (void) bunyan_warn(svp_bunyan,
 529                     "got response for unexpecting query",
 530                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 531                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 532                     BUNYAN_T_INT32, "version", nvers,
 533                     BUNYAN_T_INT32, "operation", nop,
 534                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 535                     BUNYAN_T_INT32, "query_state", sqp->sq_state,
 536                     BUNYAN_T_END);
 537                 return (B_FALSE);
 538         }
 539 
 540         if (nop != SVP_R_LOG_RM_ACK && nsize != expected_size) {


 541                 (void) bunyan_warn(svp_bunyan, "response size too large",
 542                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 543                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 544                     BUNYAN_T_INT32, "version", nvers,
 545                     BUNYAN_T_INT32, "operation", nop,
 546                     BUNYAN_T_INT32, "response_id", resp->svp_id,
 547                     BUNYAN_T_INT32, "response_size", nsize,
 548                     BUNYAN_T_INT32, "expected_size", nop == SVP_R_VL2_ACK ?
 549                     sizeof (svp_vl2_ack_t) : sizeof (svp_vl3_ack_t),
 550                     BUNYAN_T_INT32, "query_state", sqp->sq_state,
 551                     BUNYAN_T_END);
 552                 return (B_FALSE);
 553         }
 554 
 555         /*
 556          * The valid size is anything <= to what the user requested, but at
 557          * least svp_log_ack_t bytes large.
 558          */
 559         if (nop == SVP_R_LOG_ACK) {
 560                 const char *msg = NULL;


 565                 if (msg != NULL) {
 566                         (void) bunyan_warn(svp_bunyan, msg,
 567                             BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 568                             BUNYAN_T_INT32, "remote_port",
 569                             scp->sc_remote->sr_rport,
 570                             BUNYAN_T_INT32, "version", nvers,
 571                             BUNYAN_T_INT32, "operation", nop,
 572                             BUNYAN_T_INT32, "response_id", resp->svp_id,
 573                             BUNYAN_T_INT32, "response_size", nsize,
 574                             BUNYAN_T_INT32, "expected_size",
 575                             ((svp_log_req_t *)sqp->sq_rdata)->svlr_count,
 576                             BUNYAN_T_INT32, "query_state", sqp->sq_state,
 577                             BUNYAN_T_END);
 578                         return (B_FALSE);
 579                 }
 580         }
 581 
 582         sqp->sq_size = nsize;
 583         scp->sc_input.sci_query = sqp;
 584         if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK ||
 585             nop == SVP_R_LOG_RM_ACK || nop == SVP_R_ROUTE_ACK ||
 586             nop == SVP_R_PONG) {
 587                 sqp->sq_wdata = &sqp->sq_wdun;
 588                 sqp->sq_wsize = sizeof (svp_query_data_t);
 589         } else {
 590                 VERIFY(nop == SVP_R_LOG_ACK);
 591                 assert(sqp->sq_wdata != NULL);
 592                 assert(sqp->sq_wsize != 0);
 593         }
 594 
 595         return (B_TRUE);
 596 }
 597 
 598 static svp_conn_act_t
 599 svp_conn_pollin(svp_conn_t *scp)
 600 {
 601         size_t off, total;
 602         ssize_t ret;
 603         svp_query_t *sqp;
 604         uint32_t crc;
 605         uint16_t nop;
 606 


 656         sqp->sq_acttime = gethrtime();
 657         total = ntohl(scp->sc_input.sci_req.svp_size);
 658         do {
 659                 ret = read(scp->sc_socket,
 660                     (void *)((uintptr_t)sqp->sq_wdata + off),
 661                     total - off);
 662         } while (ret == -1 && errno == EINTR);
 663 
 664         if (ret == -1) {
 665                 switch (errno) {
 666                 case EAGAIN:
 667                         scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 668                         return (SVP_RA_NONE);
 669                 case EIO:
 670                 case ECONNRESET:
 671                         return (SVP_RA_ERROR);
 672                         break;
 673                 default:
 674                         libvarpd_panic("unexpeted read errno: %d", errno);
 675                 }
 676         } else if (ret == 0 && total - off > 0) {
 677                 /* Try to reconnect to the remote host */
 678                 return (SVP_RA_ERROR);
 679         }
 680 
 681         if (ret + off < total) {
 682                 scp->sc_input.sci_offset += ret;
 683                 return (SVP_RA_NONE);
 684         }
 685 
 686         nop = ntohs(scp->sc_input.sci_req.svp_op);
 687         crc = scp->sc_input.sci_req.svp_crc32;
 688         svp_query_crc32(&scp->sc_input.sci_req, sqp->sq_wdata, total);
 689         if (crc != scp->sc_input.sci_req.svp_crc32) {
 690                 (void) bunyan_info(svp_bunyan, "crc32 mismatch",
 691                     BUNYAN_T_IP, "remote ip", &scp->sc_addr,
 692                     BUNYAN_T_INT32, "remote port", scp->sc_remote->sr_rport,
 693                     BUNYAN_T_INT32, "version",
 694                     ntohs(scp->sc_input.sci_req.svp_ver),
 695                     BUNYAN_T_INT32, "operation", nop,
 696                     BUNYAN_T_INT32, "response id",


 700                     BUNYAN_T_UINT32, "calc_crc",
 701                     ntohl(scp->sc_input.sci_req.svp_crc32),
 702                     BUNYAN_T_END);
 703                 return (SVP_RA_ERROR);
 704         }
 705         scp->sc_input.sci_query = NULL;
 706         scp->sc_input.sci_offset = 0;
 707 
 708         if (nop == SVP_R_VL2_ACK) {
 709                 svp_vl2_ack_t *sl2a = sqp->sq_wdata;
 710                 sqp->sq_status = ntohl(sl2a->sl2a_status);
 711         } else if (nop == SVP_R_VL3_ACK) {
 712                 svp_vl3_ack_t *sl3a = sqp->sq_wdata;
 713                 sqp->sq_status = ntohl(sl3a->sl3a_status);
 714         } else if (nop == SVP_R_LOG_ACK) {
 715                 svp_log_ack_t *svla = sqp->sq_wdata;
 716                 sqp->sq_status = ntohl(svla->svla_status);
 717         } else if (nop == SVP_R_LOG_RM_ACK) {
 718                 svp_lrm_ack_t *svra = sqp->sq_wdata;
 719                 sqp->sq_status = ntohl(svra->svra_status);
 720         } else if (nop == SVP_R_ROUTE_ACK) {
 721                 svp_route_ack_t *sra = sqp->sq_wdata;
 722                 sqp->sq_status = ntohl(sra->sra_status);
 723         } else if (nop == SVP_R_PONG) {
 724                 /*
 725                  * Handle the PONG versioning-capture here, as we need
 726                  * the version number, the scp_lock held, and the ability
 727                  * to error out.
 728                  */
 729                 svp_conn_act_t cbret;
 730 
 731                 cbret = svp_conn_pong_handler(scp, sqp);
 732                 if (cbret != SVP_RA_NONE)
 733                         return (cbret);
 734         } else {
 735                 libvarpd_panic("unhandled nop: %d", nop);
 736         }
 737 
 738         list_remove(&scp->sc_queries, sqp);
 739         mutex_exit(&scp->sc_lock);
 740 
 741         /*
 742          * We have to release all of our resources associated with this entry
 743          * before we call the callback. After we call it, the memory will be
 744          * lost to time.
 745          */
 746         svp_query_release(sqp);
 747         sqp->sq_func(sqp, sqp->sq_arg);
 748         mutex_enter(&scp->sc_lock);
 749         scp->sc_event.se_events |= POLLIN | POLLRDNORM;
 750 
 751         return (SVP_RA_NONE);
 752 }
 753 


 825                 return;
 826         }
 827 
 828         /* Check if this needs to be reset */
 829         if (scp->sc_flags & SVP_CF_TEARDOWN) {
 830                 /* Make sure any other users of this are disassociated */
 831                 ret = SVP_RA_ERROR;
 832                 goto out;
 833         }
 834 
 835         switch (scp->sc_cstate) {
 836         case SVP_CS_INITIAL:
 837         case SVP_CS_BACKOFF:
 838                 assert(pe == NULL);
 839                 ret = svp_conn_connect(scp);
 840                 break;
 841         case SVP_CS_CONNECTING:
 842                 assert(pe != NULL);
 843                 ret = svp_conn_poll_connect(pe, scp);
 844                 break;
 845         case SVP_CS_VERSIONING:
 846         case SVP_CS_ACTIVE:
 847         case SVP_CS_WINDDOWN:
 848                 assert(pe != NULL);
 849                 oldstate = scp->sc_cstate;
 850                 if (pe->portev_events & POLLOUT)
 851                         ret = svp_conn_pollout(scp);
 852                 if (ret == SVP_RA_NONE && (pe->portev_events & POLLIN))
 853                         ret = svp_conn_pollin(scp);
 854 
 855                 if (oldstate == SVP_CS_WINDDOWN &&
 856                     (list_is_empty(&scp->sc_queries) || ret != SVP_RA_NONE)) {
 857                         ret = SVP_RA_CLEANUP;
 858                 }
 859 
 860                 if (ret == SVP_RA_NONE) {
 861                         int err;
 862                         if ((err = svp_event_associate(&scp->sc_event,
 863                             scp->sc_socket)) != 0) {
 864                                 scp->sc_error = SVP_CE_ASSOCIATE;
 865                                 scp->sc_errno = err;
 866                                 scp->sc_cstate = SVP_CS_ERROR;
 867                                 ret = SVP_RA_DEGRADE;
 868                         }
 869                 }
 870                 break;
 871         default:
 872                 libvarpd_panic("svp_conn_handler encountered unexpected "
 873                     "state: %d", scp->sc_cstate);
 874         }
 875 out:
 876         mutex_exit(&scp->sc_lock);
 877 
 878         if (ret == SVP_RA_NONE)
 879                 return;
 880 
 881         mutex_enter(&srp->sr_lock);
 882         mutex_enter(&scp->sc_lock);
 883         if (ret == SVP_RA_FIND_VERSION)
 884                 ret = svp_conn_ping_version(scp);
 885 
 886         if (ret == SVP_RA_ERROR)
 887                 ret = svp_conn_reset(scp);
 888 
 889         if (ret == SVP_RA_DEGRADE)
 890                 svp_conn_degrade(scp);
 891         if (ret == SVP_RA_RESTORE)
 892                 svp_conn_restore(scp);
 893 
 894         if (ret == SVP_RA_CLEANUP) {
 895                 svp_conn_remove(scp);
 896                 scp->sc_flags |= SVP_CF_REAP;
 897                 svp_conn_inject(scp);
 898         }
 899         mutex_exit(&scp->sc_lock);
 900         mutex_exit(&srp->sr_lock);
 901 }
 902 
 903 static void
 904 svp_conn_backtimer(void *arg)
 905 {


1107                     "associated: %d", ret);
1108         }
1109         mutex_exit(&scp->sc_lock);
1110 
1111         /* Verify our timers are killed */
1112         svp_timer_remove(&scp->sc_btimer);
1113         svp_timer_remove(&scp->sc_qtimer);
1114 
1115         if (scp->sc_socket != -1 && close(scp->sc_socket) != 0)
1116                 libvarpd_panic("failed to close svp_conn_t`scp_socket fd "
1117                     "%d: %d", scp->sc_socket, errno);
1118 
1119         list_destroy(&scp->sc_queries);
1120         umem_free(scp, sizeof (svp_conn_t));
1121 }
1122 
1123 void
1124 svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp)
1125 {
1126         assert(MUTEX_HELD(&scp->sc_lock));
1127         assert(scp->sc_cstate == SVP_CS_ACTIVE ||
1128             scp->sc_cstate == SVP_CS_VERSIONING);
1129 
1130         sqp->sq_acttime = -1;
1131         list_insert_tail(&scp->sc_queries, sqp);
1132         if (!(scp->sc_event.se_events & POLLOUT)) {
1133                 scp->sc_event.se_events |= POLLOUT;
1134                 /*
1135                  * If this becomes frequent, we should instead give up on this
1136                  * set of connections instead of aborting.
1137                  */
1138                 if (svp_event_associate(&scp->sc_event, scp->sc_socket) != 0)
1139                         libvarpd_panic("svp_event_associate failed somehow");
1140         }
1141 }