Print this page
Version bump SVP to 2

@@ -41,10 +41,12 @@
         SVP_RA_RESTORE  = 0x02,
         SVP_RA_ERROR    = 0x03,
         SVP_RA_CLEANUP  = 0x04
 } svp_conn_act_t;
 
+static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
+
 static void
 svp_conn_inject(svp_conn_t *scp)
 {
         int ret;
         assert(MUTEX_HELD(&scp->sc_lock));

@@ -165,10 +167,77 @@
         if (scp->sc_nbackoff > svp_conn_nbackoff)
                 return (SVP_RA_DEGRADE);
         return (SVP_RA_NONE);
 }
 
+/*
+ * Think of this as an extension to the connect() call in svp_conn_connect().
+ * Send a message, receive it, and set the version here.  If the response is
+ * too slow or the socket throws an error, indicate a socket error, which
+ * will cause the caller to backoff (i.e. close the socket and try again).
+ *
+ * Version mismatch (corrupt SVP server or too-advanced SVP server) is its
+ * own error type.
+ */
+static svp_conn_error_t
+svp_conn_version_set(svp_conn_t *scp)
+{
+        svp_req_t ping;
+        ssize_t ret;
+        uint32_t save_crc;
+        uint16_t peer_version;
+        int ntries = 3; /* One second between tries. 3secs should be enough. */
+
+        ping.svp_ver = htons(SVP_CURRENT_VERSION);
+        ping.svp_op = htons(SVP_R_PING);
+        ping.svp_size = 0;      /* Header-only... */
+        ping.svp_id = 0;
+        /* 0-length data... just use the req buffer for the pointer. */
+        svp_query_crc32(&ping, &ping, 0);
+
+        ret = write(scp->sc_socket, &ping, sizeof (ping));
+        if (ret == -1) {
+                /*
+                 * A failed write() call right after connect probably
+                 * indicates a larger connection failure.  Restart the
+                 * connection from scratch.
+                 */
+                return (SVP_CE_SOCKET);
+        }
+        assert(ret == sizeof (ping));
+        do {
+                /*
+                 * Asynch read.  We may loop here once or twice.
+                 * Wait a bit, but don't loop too many times...
+                 */
+                (void) sleep(1);
+                ret = read(scp->sc_socket, &ping, sizeof (ping));
+        } while (--ntries > 0 &&
+            ret == -1 && (errno == EINTR || errno == EAGAIN));
+        if (ret == -1) {
+                /*
+                 * This is actually a failed read() call.  Restart the
+                 * connection from scratch.
+                 */
+                return (SVP_CE_SOCKET);
+        }
+
+        save_crc = ping.svp_crc32;
+        svp_query_crc32(&ping, &ping, 0);
+        peer_version = htons(ping.svp_ver);
+        if (ping.svp_op != htons(SVP_R_PONG) ||
+            ping.svp_size != 0 || ping.svp_id != 0 ||
+            ping.svp_crc32 != save_crc ||
+            peer_version == 0 || peer_version > SVP_CURRENT_VERSION) {
+                return (SVP_CE_VERSION_PONG);
+        }
+
+        /* This connection now has a version! */
+        scp->sc_version = peer_version;
+        return (SVP_CE_NONE);
+}
+
 static svp_conn_act_t
 svp_conn_connect(svp_conn_t *scp)
 {
         int ret;
         struct sockaddr_in6 in6;

@@ -178,10 +247,13 @@
             scp->sc_cstate == SVP_CS_INITIAL);
         assert(scp->sc_socket == -1);
         if (scp->sc_cstate == SVP_CS_INITIAL)
                 scp->sc_nbackoff = 0;
 
+        /* New connect means we need to know the version. */
+        scp->sc_version = 0;
+
         scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
         if (scp->sc_socket == -1) {
                 scp->sc_error = SVP_CE_SOCKET;
                 scp->sc_errno = errno;
                 scp->sc_cstate = SVP_CS_ERROR;

@@ -250,52 +322,73 @@
                          */
                         return (svp_conn_backoff(scp));
                 }
         }
 
-        /*
-         * We've connected. Successfully move ourselves to the bound
-         * state and start polling.
-         */
-        scp->sc_cstate = SVP_CS_ACTIVE;
-        scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
-        ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
-        if (ret == 0)
-                return (SVP_RA_RESTORE);
-        scp->sc_error = SVP_CE_ASSOCIATE;
-        scp->sc_cstate = SVP_CS_ERROR;
-
-        return (SVP_RA_DEGRADE);
+        return (svp_conn_poll_connect(NULL, scp));
 }
 
 /*
- * This should be the first call we get after a connect. If we have successfully
- * connected, we should see a writeable event. We may also see an error or a
- * hang up. In either of these cases, we transition to error mode. If there is
- * also a readable event, we ignore it at the moment and just let a
- * reassociation pick it up so we can simplify the set of state transitions that
- * we have.
+ * This should be the first call we get after a successful synchronous
+ * connect, or a completed (failed or successful) asynchronous connect.  A
+ * non-NULL port-event indicates asynchronous completion, a NULL port-event
+ * indicates a successful synchronous connect.
+ * 
+ * If we have successfully connected, we should see a writeable event.  In the
+ * asynchronous case, we may also see an error or a hang up. For either hang
+ * up or error, we transition to error mode. If there is also a readable event
+ * (i.e. incoming data), we ignore it at the moment and just let a
+ * reassociation pick it up so we can simplify the set of state transitions
+ * that we have.
  */
 static svp_conn_act_t
 svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
 {
-        int ret, err;
+        int ret;
+        svp_conn_error_t version_error;
+
+        if (pe != NULL) {
+                int err;
         socklen_t sl = sizeof (err);
+
+                /*
+                 * These bits only matter if we're notified of an
+                 * asynchronous connection completion.
+                 */
         if (!(pe->portev_events & POLLOUT)) {
                 scp->sc_errno = 0;
                 scp->sc_error = SVP_CE_NOPOLLOUT;
                 scp->sc_cstate = SVP_CS_ERROR;
                 return (SVP_RA_DEGRADE);
         }
 
-        ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err, &sl);
+                ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err,
+                    &sl);
         if (ret != 0)
                 libvarpd_panic("unanticipated getsockopt error");
         if (err != 0) {
                 return (svp_conn_backoff(scp));
         }
+        }
 
+        /* Use a single SVP_R_PING to determine the version. */
+        version_error = svp_conn_version_set(scp);
+        switch (version_error) {
+        case SVP_CE_SOCKET:
+                /* Use this to signal read/write errors... */
+                return (svp_conn_backoff(scp));
+        case SVP_CE_NONE:
+                assert(scp->sc_version > 0 &&
+                    scp->sc_version <= SVP_CURRENT_VERSION);
+                break;
+        default:
+                scp->sc_error = version_error;
+                scp->sc_cstate = SVP_CS_ERROR;
+                scp->sc_errno = EPROTONOSUPPORT;        /* Protocol error... */
+                return (SVP_RA_DEGRADE);
+        }
+
         scp->sc_cstate = SVP_CS_ACTIVE;
         scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
         ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
         if (ret == 0)
                 return (SVP_RA_RESTORE);

@@ -355,11 +448,11 @@
         iov[nvecs].iov_len = sqp->sq_rsize - off;
         nvecs++;
 
         do {
                 ret = writev(scp->sc_socket, iov, nvecs);
-        } while (ret == -1 && errno == EAGAIN);
+        } while (ret == -1 && errno == EINTR);
         if (ret == -1) {
                 switch (errno) {
                 case EAGAIN:
                         scp->sc_event.se_events |= POLLOUT;
                         return (SVP_RA_NONE);

@@ -395,15 +488,20 @@
 
         nvers = ntohs(resp->svp_ver);
         nop = ntohs(resp->svp_op);
         nsize = ntohl(resp->svp_size);
 
-        if (nvers != SVP_CURRENT_VERSION) {
-                (void) bunyan_warn(svp_bunyan, "unsupported version",
+        /*
+         * A peer that's messing with post-connection version changes is
+         * likely a broken peer.
+         */
+        if (nvers != scp->sc_version) {
+                (void) bunyan_warn(svp_bunyan, "version mismatch",
                     BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
                     BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
-                    BUNYAN_T_INT32, "version", nvers,
+                    BUNYAN_T_INT32, "peer version", nvers,
+                    BUNYAN_T_INT32, "our version", scp->sc_version,
                     BUNYAN_T_INT32, "operation", nop,
                     BUNYAN_T_INT32, "response_id", resp->svp_id,
                     BUNYAN_T_END);
                 return (B_FALSE);
         }