Print this page
Version bump SVP to 2

Split Close
Expand all
Collapse all
          --- old/usr/src/lib/varpd/svp/common/libvarpd_svp_conn.c
          +++ new/usr/src/lib/varpd/svp/common/libvarpd_svp_conn.c
↓ open down ↓ 35 lines elided ↑ open up ↑
  36   36  static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int);
  37   37  
  38   38  typedef enum svp_conn_act {
  39   39          SVP_RA_NONE     = 0x00,
  40   40          SVP_RA_DEGRADE  = 0x01,
  41   41          SVP_RA_RESTORE  = 0x02,
  42   42          SVP_RA_ERROR    = 0x03,
  43   43          SVP_RA_CLEANUP  = 0x04
  44   44  } svp_conn_act_t;
  45   45  
       46 +static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
       47 +
  46   48  static void
  47   49  svp_conn_inject(svp_conn_t *scp)
  48   50  {
  49   51          int ret;
  50   52          assert(MUTEX_HELD(&scp->sc_lock));
  51   53  
  52   54          if (scp->sc_flags & SVP_CF_USER)
  53   55                  return;
  54   56          scp->sc_flags |= SVP_CF_USER;
  55   57          if ((ret = svp_event_inject(&scp->sc_event)) != 0)
↓ open down ↓ 104 lines elided ↑ open up ↑
 160  162                  scp->sc_btimer.st_value =
 161  163                      svp_conn_backoff_tbl[scp->sc_nbackoff - 1];
 162  164          }
 163  165          svp_timer_add(&scp->sc_btimer);
 164  166  
 165  167          if (scp->sc_nbackoff > svp_conn_nbackoff)
 166  168                  return (SVP_RA_DEGRADE);
 167  169          return (SVP_RA_NONE);
 168  170  }
 169  171  
      172 +/*
      173 + * Think of this as an extension to the connect() call in svp_conn_connect().
      174 + * Send a message, receive it, and set the version here.  If the response is
      175 + * too slow or the socket throws an error, indicate a socket error, which
      176 + * will cause the caller to backoff (i.e. close the socket and try again).
      177 + *
      178 + * Version mismatch (corrupt SVP server or too-advanced SVP server) is its
      179 + * own error type.
      180 + */
      181 +static svp_conn_error_t
      182 +svp_conn_version_set(svp_conn_t *scp)
      183 +{
      184 +        svp_req_t ping;
      185 +        ssize_t ret;
      186 +        uint32_t save_crc;
      187 +        uint16_t peer_version;
      188 +        int ntries = 3; /* One second between tries. 3secs should be enough. */
      189 +
      190 +        ping.svp_ver = htons(SVP_CURRENT_VERSION);
      191 +        ping.svp_op = htons(SVP_R_PING);
      192 +        ping.svp_size = 0;      /* Header-only... */
      193 +        ping.svp_id = 0;
      194 +        /* 0-length data... just use the req buffer for the pointer. */
      195 +        svp_query_crc32(&ping, &ping, 0);
      196 +
      197 +        ret = write(scp->sc_socket, &ping, sizeof (ping));
      198 +        if (ret == -1) {
      199 +                /*
      200 +                 * A failed write() call right after connect probably
      201 +                 * indicates a larger connection failure.  Restart the
      202 +                 * connection from scratch.
      203 +                 */
      204 +                return (SVP_CE_SOCKET);
      205 +        }
      206 +        assert(ret == sizeof (ping));
      207 +        do {
      208 +                /*
      209 +                 * Asynch read.  We may loop here once or twice.
      210 +                 * Wait a bit, but don't loop too many times...
      211 +                 */
      212 +                (void) sleep(1);
      213 +                ret = read(scp->sc_socket, &ping, sizeof (ping));
      214 +        } while (--ntries > 0 &&
      215 +            ret == -1 && (errno == EINTR || errno == EAGAIN));
      216 +        if (ret == -1) {
      217 +                /*
      218 +                 * This is actually a failed read() call.  Restart the
      219 +                 * connection from scratch.
      220 +                 */
      221 +                return (SVP_CE_SOCKET);
      222 +        }
      223 +
      224 +        save_crc = ping.svp_crc32;
      225 +        svp_query_crc32(&ping, &ping, 0);
      226 +        peer_version = htons(ping.svp_ver);
      227 +        if (ping.svp_op != htons(SVP_R_PONG) ||
      228 +            ping.svp_size != 0 || ping.svp_id != 0 ||
      229 +            ping.svp_crc32 != save_crc ||
      230 +            peer_version == 0 || peer_version > SVP_CURRENT_VERSION) {
      231 +                return (SVP_CE_VERSION_PONG);
      232 +        }
      233 +
      234 +        /* This connection now has a version! */
      235 +        scp->sc_version = peer_version;
      236 +        return (SVP_CE_NONE);
      237 +}
      238 +
 170  239  static svp_conn_act_t
 171  240  svp_conn_connect(svp_conn_t *scp)
 172  241  {
 173  242          int ret;
 174  243          struct sockaddr_in6 in6;
 175  244  
 176  245          assert(MUTEX_HELD(&scp->sc_lock));
 177  246          assert(scp->sc_cstate == SVP_CS_BACKOFF ||
 178  247              scp->sc_cstate == SVP_CS_INITIAL);
 179  248          assert(scp->sc_socket == -1);
 180  249          if (scp->sc_cstate == SVP_CS_INITIAL)
 181  250                  scp->sc_nbackoff = 0;
 182  251  
      252 +        /* New connect means we need to know the version. */
      253 +        scp->sc_version = 0;
      254 +
 183  255          scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
 184  256          if (scp->sc_socket == -1) {
 185  257                  scp->sc_error = SVP_CE_SOCKET;
 186  258                  scp->sc_errno = errno;
 187  259                  scp->sc_cstate = SVP_CS_ERROR;
 188  260                  return (SVP_RA_DEGRADE);
 189  261          }
 190  262  
 191  263          bzero(&in6, sizeof (struct sockaddr_in6));
 192  264          in6.sin6_family = AF_INET6;
↓ open down ↓ 52 lines elided ↑ open up ↑
 245  317                           * ENXIO
 246  318                           * ETIMEDOUT
 247  319                           *
 248  320                           * Therefore we need to set ourselves into backoff and
 249  321                           * wait for that to clear up.
 250  322                           */
 251  323                          return (svp_conn_backoff(scp));
 252  324                  }
 253  325          }
 254  326  
 255      -        /*
 256      -         * We've connected. Successfully move ourselves to the bound
 257      -         * state and start polling.
 258      -         */
 259      -        scp->sc_cstate = SVP_CS_ACTIVE;
 260      -        scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
 261      -        ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
 262      -        if (ret == 0)
 263      -                return (SVP_RA_RESTORE);
 264      -        scp->sc_error = SVP_CE_ASSOCIATE;
 265      -        scp->sc_cstate = SVP_CS_ERROR;
 266      -
 267      -        return (SVP_RA_DEGRADE);
      327 +        return (svp_conn_poll_connect(NULL, scp));
 268  328  }
 269  329  
 270  330  /*
 271      - * This should be the first call we get after a connect. If we have successfully
 272      - * connected, we should see a writeable event. We may also see an error or a
 273      - * hang up. In either of these cases, we transition to error mode. If there is
 274      - * also a readable event, we ignore it at the moment and just let a
 275      - * reassociation pick it up so we can simplify the set of state transitions that
 276      - * we have.
      331 + * This should be the first call we get after a successful synchronous
      332 + * connect, or a completed (failed or successful) asynchronous connect.  A
      333 + * non-NULL port-event indicates asynchronous completion, a NULL port-event
      334 + * indicates a successful synchronous connect.
      335 + * 
      336 + * If we have successfully connected, we should see a writeable event.  In the
      337 + * asynchronous case, we may also see an error or a hang up. For either hang
      338 + * up or error, we transition to error mode. If there is also a readable event
      339 + * (i.e. incoming data), we ignore it at the moment and just let a
      340 + * reassociation pick it up so we can simplify the set of state transitions
      341 + * that we have.
 277  342   */
 278  343  static svp_conn_act_t
 279  344  svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
 280  345  {
 281      -        int ret, err;
 282      -        socklen_t sl = sizeof (err);
 283      -        if (!(pe->portev_events & POLLOUT)) {
 284      -                scp->sc_errno = 0;
 285      -                scp->sc_error = SVP_CE_NOPOLLOUT;
 286      -                scp->sc_cstate = SVP_CS_ERROR;
 287      -                return (SVP_RA_DEGRADE);
      346 +        int ret;
      347 +        svp_conn_error_t version_error;
      348 +
      349 +        if (pe != NULL) {
      350 +                int err;
      351 +                socklen_t sl = sizeof (err);
      352 +
      353 +                /*
      354 +                 * These bits only matter if we're notified of an
      355 +                 * asynchronous connection completion.
      356 +                 */
      357 +                if (!(pe->portev_events & POLLOUT)) {
      358 +                        scp->sc_errno = 0;
      359 +                        scp->sc_error = SVP_CE_NOPOLLOUT;
      360 +                        scp->sc_cstate = SVP_CS_ERROR;
      361 +                        return (SVP_RA_DEGRADE);
      362 +                }
      363 +
      364 +                ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err,
      365 +                    &sl);
      366 +                if (ret != 0)
      367 +                        libvarpd_panic("unanticipated getsockopt error");
      368 +                if (err != 0) {
      369 +                        return (svp_conn_backoff(scp));
      370 +                }
 288  371          }
 289  372  
 290      -        ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err, &sl);
 291      -        if (ret != 0)
 292      -                libvarpd_panic("unanticipated getsockopt error");
 293      -        if (err != 0) {
      373 +        /* Use a single SVP_R_PING to determine the version. */
      374 +        version_error = svp_conn_version_set(scp);
      375 +        switch (version_error) {
      376 +        case SVP_CE_SOCKET:
      377 +                /* Use this to signal read/write errors... */
 294  378                  return (svp_conn_backoff(scp));
      379 +        case SVP_CE_NONE:
      380 +                assert(scp->sc_version > 0 &&
      381 +                    scp->sc_version <= SVP_CURRENT_VERSION);
      382 +                break;
      383 +        default:
      384 +                scp->sc_error = version_error;
      385 +                scp->sc_cstate = SVP_CS_ERROR;
      386 +                scp->sc_errno = EPROTONOSUPPORT;        /* Protocol error... */
      387 +                return (SVP_RA_DEGRADE);
 295  388          }
 296  389  
 297  390          scp->sc_cstate = SVP_CS_ACTIVE;
 298  391          scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
 299  392          ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
 300  393          if (ret == 0)
 301  394                  return (SVP_RA_RESTORE);
 302  395          scp->sc_error = SVP_CE_ASSOCIATE;
 303  396          scp->sc_errno = ret;
 304  397          scp->sc_cstate = SVP_CS_ERROR;
↓ open down ↓ 45 lines elided ↑ open up ↑
 350  443          } else {
 351  444                  off -= sizeof (svp_req_t);
 352  445          }
 353  446  
 354  447          iov[nvecs].iov_base = (void *)((uintptr_t)sqp->sq_rdata + off);
 355  448          iov[nvecs].iov_len = sqp->sq_rsize - off;
 356  449          nvecs++;
 357  450  
 358  451          do {
 359  452                  ret = writev(scp->sc_socket, iov, nvecs);
 360      -        } while (ret == -1 && errno == EAGAIN);
      453 +        } while (ret == -1 && errno == EINTR);
 361  454          if (ret == -1) {
 362  455                  switch (errno) {
 363  456                  case EAGAIN:
 364  457                          scp->sc_event.se_events |= POLLOUT;
 365  458                          return (SVP_RA_NONE);
 366  459                  case EIO:
 367  460                  case ENXIO:
 368  461                  case ECONNRESET:
 369  462                          return (SVP_RA_ERROR);
 370  463                  default:
↓ open down ↓ 19 lines elided ↑ open up ↑
 390  483          uint32_t nsize;
 391  484          uint16_t nvers, nop;
 392  485          svp_req_t *resp = &scp->sc_input.sci_req;
 393  486  
 394  487          assert(MUTEX_HELD(&scp->sc_lock));
 395  488  
 396  489          nvers = ntohs(resp->svp_ver);
 397  490          nop = ntohs(resp->svp_op);
 398  491          nsize = ntohl(resp->svp_size);
 399  492  
 400      -        if (nvers != SVP_CURRENT_VERSION) {
 401      -                (void) bunyan_warn(svp_bunyan, "unsupported version",
      493 +        /*
      494 +         * A peer that's messing with post-connection version changes is
      495 +         * likely a broken peer.
      496 +         */
      497 +        if (nvers != scp->sc_version) {
      498 +                (void) bunyan_warn(svp_bunyan, "version mismatch",
 402  499                      BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
 403  500                      BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
 404      -                    BUNYAN_T_INT32, "version", nvers,
      501 +                    BUNYAN_T_INT32, "peer version", nvers,
      502 +                    BUNYAN_T_INT32, "our version", scp->sc_version,
 405  503                      BUNYAN_T_INT32, "operation", nop,
 406  504                      BUNYAN_T_INT32, "response_id", resp->svp_id,
 407  505                      BUNYAN_T_END);
 408  506                  return (B_FALSE);
 409  507          }
 410  508  
 411  509          if (nop != SVP_R_VL2_ACK && nop != SVP_R_VL3_ACK &&
 412  510              nop != SVP_R_LOG_ACK && nop != SVP_R_LOG_RM_ACK) {
 413  511                  (void) bunyan_warn(svp_bunyan, "unsupported operation",
 414  512                      BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
↓ open down ↓ 617 lines elided ↑ open up ↑
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX