Print this page
Try versioning as a new state

@@ -38,11 +38,12 @@
 typedef enum svp_conn_act {
         SVP_RA_NONE     = 0x00,
         SVP_RA_DEGRADE  = 0x01,
         SVP_RA_RESTORE  = 0x02,
         SVP_RA_ERROR    = 0x03,
-        SVP_RA_CLEANUP  = 0x04
+        SVP_RA_CLEANUP  = 0x04,
+        SVP_RA_FIND_VERSION = 0x05
 } svp_conn_act_t;
 
 static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
 
 static void

@@ -90,11 +91,79 @@
         if (srp->sr_ndconns == srp->sr_tconns)
                 svp_remote_restore(srp, SVP_RD_REMOTE_FAIL);
         srp->sr_ndconns--;
 }
 
+static svp_conn_act_t
+svp_conn_pong_handler(svp_conn_t *scp, svp_query_t *sqp)
+{
+        uint16_t remote_version = ntohs(scp->sc_input.sci_req.svp_ver);
+
+        if (scp->sc_cstate == SVP_CS_VERSIONING) {
+                /* Transition VERSIONING -> ACTIVE. */
+                assert(scp->sc_version == 0);
+                if (remote_version == 0 || remote_version > SVP_CURRENT_VERSION)
+                        return (SVP_RA_ERROR);
+                scp->sc_version = remote_version;
+                scp->sc_cstate = SVP_CS_ACTIVE;
+        }
+
+        return (SVP_RA_NONE);
+}
+
 static void
+svp_conn_ping_cb(svp_query_t *sqp, void *arg)
+{
+        size_t len = (size_t)arg;
+
+        assert(len == sizeof (svp_query_t));
+        umem_free(sqp, len);
+}
+
+static svp_conn_act_t
+svp_conn_ping_version(svp_conn_t *scp)
+{
+        svp_remote_t *srp = scp->sc_remote;
+        svp_query_t *sqp = umem_zalloc(sizeof (svp_query_t), UMEM_DEFAULT);
+        int ret;
+
+        assert(MUTEX_HELD(&srp->sr_lock));
+        assert(MUTEX_HELD(&scp->sc_lock));
+        assert(scp->sc_cstate == SVP_CS_CONNECTING);
+
+        if (sqp == NULL)
+                return (SVP_RA_ERROR);
+
+        /* Only set things that need to be non-0/non-NULL. */
+        sqp->sq_state = SVP_QUERY_INIT;
+        sqp->sq_func = svp_conn_ping_cb;
+        sqp->sq_arg = (void *)sizeof (svp_query_t);
+        sqp->sq_header.svp_op = htons(SVP_R_PING);
+        sqp->sq_header.svp_ver = htons(SVP_CURRENT_VERSION);
+        sqp->sq_header.svp_id = svp_id_alloc();
+        if (sqp->sq_header.svp_id == -1) {
+                umem_free(sqp, sizeof (svp_query_t));
+                return (SVP_RA_ERROR);
+        }
+
+        scp->sc_cstate = SVP_CS_VERSIONING;
+        /* Set the event flags now... */
+        scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP | POLLOUT;
+        /* ...so I can just queue it up directly... */
+        svp_conn_queue(scp, sqp);
+        /* ... and then associate the event port myself. */
+        ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
+        if (ret == 0)
+                return (SVP_RA_RESTORE);
+        scp->sc_error = SVP_CE_ASSOCIATE;
+        scp->sc_errno = ret;
+        scp->sc_cstate = SVP_CS_ERROR;
+        umem_free(sqp, sizeof (svp_query_t));
+        return (SVP_RA_DEGRADE);
+}
+
+static void
 svp_conn_add(svp_conn_t *scp)
 {
         svp_remote_t *srp = scp->sc_remote;
 
         assert(MUTEX_HELD(&srp->sr_lock));

@@ -167,77 +236,10 @@
         if (scp->sc_nbackoff > svp_conn_nbackoff)
                 return (SVP_RA_DEGRADE);
         return (SVP_RA_NONE);
 }
 
-/*
- * Think of this as an extension to the connect() call in svp_conn_connect().
- * Send a message, receive it, and set the version here.  If the response is
- * too slow or the socket throws an error, indicate a socket error, which
- * will cause the caller to backoff (i.e. close the socket and try again).
- *
- * Version mismatch (corrupt SVP server or too-advanced SVP server) is its
- * own error type.
- */
-static svp_conn_error_t
-svp_conn_version_set(svp_conn_t *scp)
-{
-        svp_req_t ping;
-        ssize_t ret;
-        uint32_t save_crc;
-        uint16_t peer_version;
-        int ntries = 3; /* One second between tries. 3secs should be enough. */
-
-        ping.svp_ver = htons(SVP_CURRENT_VERSION);
-        ping.svp_op = htons(SVP_R_PING);
-        ping.svp_size = 0;      /* Header-only... */
-        ping.svp_id = 0;
-        /* 0-length data... just use the req buffer for the pointer. */
-        svp_query_crc32(&ping, &ping, 0);
-
-        ret = write(scp->sc_socket, &ping, sizeof (ping));
-        if (ret == -1) {
-                /*
-                 * A failed write() call right after connect probably
-                 * indicates a larger connection failure.  Restart the
-                 * connection from scratch.
-                 */
-                return (SVP_CE_SOCKET);
-        }
-        assert(ret == sizeof (ping));
-        do {
-                /*
-                 * Asynch read.  We may loop here once or twice.
-                 * Wait a bit, but don't loop too many times...
-                 */
-                (void) sleep(1);
-                ret = read(scp->sc_socket, &ping, sizeof (ping));
-        } while (--ntries > 0 &&
-            ret == -1 && (errno == EINTR || errno == EAGAIN));
-        if (ret == -1) {
-                /*
-                 * This is actually a failed read() call.  Restart the
-                 * connection from scratch.
-                 */
-                return (SVP_CE_SOCKET);
-        }
-
-        save_crc = ping.svp_crc32;
-        svp_query_crc32(&ping, &ping, 0);
-        peer_version = htons(ping.svp_ver);
-        if (ping.svp_op != htons(SVP_R_PONG) ||
-            ping.svp_size != 0 || ping.svp_id != 0 ||
-            ping.svp_crc32 != save_crc ||
-            peer_version == 0 || peer_version > SVP_CURRENT_VERSION) {
-                return (SVP_CE_VERSION_PONG);
-        }
-
-        /* This connection now has a version! */
-        scp->sc_version = peer_version;
-        return (SVP_CE_NONE);
-}
-
 static svp_conn_act_t
 svp_conn_connect(svp_conn_t *scp)
 {
         int ret;
         struct sockaddr_in6 in6;

@@ -322,10 +324,11 @@
                          */
                         return (svp_conn_backoff(scp));
                 }
         }
 
+        /* Immediately successful connection, move to SVP_CS_VERSIONING. */
         return (svp_conn_poll_connect(NULL, scp));
 }
 
 /*
  * This should be the first call we get after a successful synchronous

@@ -368,36 +371,11 @@
                 if (err != 0) {
                         return (svp_conn_backoff(scp));
                 }
         }
 
-        /* Use a single SVP_R_PING to determine the version. */
-        version_error = svp_conn_version_set(scp);
-        switch (version_error) {
-        case SVP_CE_SOCKET:
-                /* Use this to signal read/write errors... */
-                return (svp_conn_backoff(scp));
-        case SVP_CE_NONE:
-                assert(scp->sc_version > 0 &&
-                    scp->sc_version <= SVP_CURRENT_VERSION);
-                break;
-        default:
-                scp->sc_error = version_error;
-                scp->sc_cstate = SVP_CS_ERROR;
-                scp->sc_errno = EPROTONOSUPPORT;        /* Protocol error... */
-                return (SVP_RA_DEGRADE);
-        }
-
-        scp->sc_cstate = SVP_CS_ACTIVE;
-        scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
-        ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
-        if (ret == 0)
-                return (SVP_RA_RESTORE);
-        scp->sc_error = SVP_CE_ASSOCIATE;
-        scp->sc_errno = ret;
-        scp->sc_cstate = SVP_CS_ERROR;
-        return (SVP_RA_DEGRADE);
+        return (SVP_RA_FIND_VERSION);
 }
 
 static svp_conn_act_t
 svp_conn_pollout(svp_conn_t *scp)
 {

@@ -478,11 +456,11 @@
 
 static boolean_t
 svp_conn_pollin_validate(svp_conn_t *scp)
 {
         svp_query_t *sqp;
-        uint32_t nsize;
+        uint32_t nsize, expected_size = 0;
         uint16_t nvers, nop;
         svp_req_t *resp = &scp->sc_input.sci_req;
 
         assert(MUTEX_HELD(&scp->sc_lock));
 

@@ -492,11 +470,11 @@
 
         /*
          * A peer that's messing with post-connection version changes is
          * likely a broken peer.
          */
-        if (nvers != scp->sc_version) {
+        if (scp->sc_cstate != SVP_CS_VERSIONING && nvers != scp->sc_version) {
                 (void) bunyan_warn(svp_bunyan, "version mismatch",
                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
                     BUNYAN_T_INT32, "peer version", nvers,
                     BUNYAN_T_INT32, "our version", scp->sc_version,

@@ -504,12 +482,28 @@
                     BUNYAN_T_INT32, "response_id", resp->svp_id,
                     BUNYAN_T_END);
                 return (B_FALSE);
         }
 
-        if (nop != SVP_R_VL2_ACK && nop != SVP_R_VL3_ACK &&
-            nop != SVP_R_LOG_ACK && nop != SVP_R_LOG_RM_ACK) {
+        switch (nop) {
+        case SVP_R_VL2_ACK:
+                expected_size = sizeof (svp_vl2_ack_t);
+                break;
+        case SVP_R_VL3_ACK:
+                expected_size = sizeof (svp_vl3_ack_t);
+                break;
+        case SVP_R_LOG_RM_ACK:
+                expected_size = sizeof (svp_lrm_ack_t);
+                break;
+        case SVP_R_ROUTE_ACK:
+                expected_size = sizeof (svp_route_ack_t);
+                break;
+        case SVP_R_LOG_ACK:
+        case SVP_R_PONG:
+                /* No expected size (LOG_ACK) or size is 0 (PONG). */
+                break;
+        default:
                 (void) bunyan_warn(svp_bunyan, "unsupported operation",
                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
                     BUNYAN_T_INT32, "version", nvers,
                     BUNYAN_T_INT32, "operation", nop,

@@ -541,13 +535,11 @@
                     BUNYAN_T_INT32, "query_state", sqp->sq_state,
                     BUNYAN_T_END);
                 return (B_FALSE);
         }
 
-        if ((nop == SVP_R_VL2_ACK && nsize != sizeof (svp_vl2_ack_t)) ||
-            (nop == SVP_R_VL3_ACK && nsize != sizeof (svp_vl3_ack_t)) ||
-            (nop == SVP_R_LOG_RM_ACK && nsize != sizeof (svp_lrm_ack_t))) {
+        if (nop != SVP_R_LOG_RM_ACK && nsize != expected_size) {
                 (void) bunyan_warn(svp_bunyan, "response size too large",
                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
                     BUNYAN_T_INT32, "version", nvers,
                     BUNYAN_T_INT32, "operation", nop,

@@ -588,11 +580,12 @@
         }
 
         sqp->sq_size = nsize;
         scp->sc_input.sci_query = sqp;
         if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK ||
-            nop == SVP_R_LOG_RM_ACK) {
+            nop == SVP_R_LOG_RM_ACK || nop == SVP_R_ROUTE_ACK ||
+            nop == SVP_R_PONG) {
                 sqp->sq_wdata = &sqp->sq_wdun;
                 sqp->sq_wsize = sizeof (svp_query_data_t);
         } else {
                 VERIFY(nop == SVP_R_LOG_ACK);
                 assert(sqp->sq_wdata != NULL);

@@ -678,11 +671,11 @@
                         return (SVP_RA_ERROR);
                         break;
                 default:
                         libvarpd_panic("unexpeted read errno: %d", errno);
                 }
-        } else if (ret == 0) {
+        } else if (ret == 0 && total - off > 0) {
                 /* Try to reconnect to the remote host */
                 return (SVP_RA_ERROR);
         }
 
         if (ret + off < total) {

@@ -722,10 +715,24 @@
                 svp_log_ack_t *svla = sqp->sq_wdata;
                 sqp->sq_status = ntohl(svla->svla_status);
         } else if (nop == SVP_R_LOG_RM_ACK) {
                 svp_lrm_ack_t *svra = sqp->sq_wdata;
                 sqp->sq_status = ntohl(svra->svra_status);
+        } else if (nop == SVP_R_ROUTE_ACK) {
+                svp_route_ack_t *sra = sqp->sq_wdata;
+                sqp->sq_status = ntohl(sra->sra_status);
+        } else if (nop == SVP_R_PONG) {
+                /*
+                 * Handle the PONG versioning-capture here, as we need
+                 * the version number, the scp_lock held, and the ability
+                 * to error out.
+                 */
+                svp_conn_act_t cbret;
+
+                cbret = svp_conn_pong_handler(scp, sqp);
+                if (cbret != SVP_RA_NONE)
+                        return (cbret);
         } else {
                 libvarpd_panic("unhandled nop: %d", nop);
         }
 
         list_remove(&scp->sc_queries, sqp);

@@ -833,10 +840,11 @@
                 break;
         case SVP_CS_CONNECTING:
                 assert(pe != NULL);
                 ret = svp_conn_poll_connect(pe, scp);
                 break;
+        case SVP_CS_VERSIONING:
         case SVP_CS_ACTIVE:
         case SVP_CS_WINDDOWN:
                 assert(pe != NULL);
                 oldstate = scp->sc_cstate;
                 if (pe->portev_events & POLLOUT)

@@ -870,10 +878,13 @@
         if (ret == SVP_RA_NONE)
                 return;
 
         mutex_enter(&srp->sr_lock);
         mutex_enter(&scp->sc_lock);
+        if (ret == SVP_RA_FIND_VERSION)
+                ret = svp_conn_ping_version(scp);
+
         if (ret == SVP_RA_ERROR)
                 ret = svp_conn_reset(scp);
 
         if (ret == SVP_RA_DEGRADE)
                 svp_conn_degrade(scp);

@@ -1111,11 +1122,12 @@
 
 void
 svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp)
 {
         assert(MUTEX_HELD(&scp->sc_lock));
-        assert(scp->sc_cstate == SVP_CS_ACTIVE);
+        assert(scp->sc_cstate == SVP_CS_ACTIVE ||
+            scp->sc_cstate == SVP_CS_VERSIONING);
 
         sqp->sq_acttime = -1;
         list_insert_tail(&scp->sc_queries, sqp);
         if (!(scp->sc_event.se_events & POLLOUT)) {
                 scp->sc_event.se_events |= POLLOUT;