Print this page
Version bump SVP to 2
*** 41,50 ****
--- 41,52 ----
SVP_RA_RESTORE = 0x02,
SVP_RA_ERROR = 0x03,
SVP_RA_CLEANUP = 0x04
} svp_conn_act_t;
+ static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
+
static void
svp_conn_inject(svp_conn_t *scp)
{
int ret;
assert(MUTEX_HELD(&scp->sc_lock));
*** 165,174 ****
--- 167,243 ----
if (scp->sc_nbackoff > svp_conn_nbackoff)
return (SVP_RA_DEGRADE);
return (SVP_RA_NONE);
}
+ /*
+ * Think of this as an extension to the connect() call in svp_conn_connect().
+ * Send a message, receive it, and set the version here. If the response is
+ * too slow or the socket throws an error, indicate a socket error, which
+ * will cause the caller to backoff (i.e. close the socket and try again).
+ *
+ * Version mismatch (corrupt SVP server or too-advanced SVP server) is its
+ * own error type.
+ */
+ static svp_conn_error_t
+ svp_conn_version_set(svp_conn_t *scp)
+ {
+ svp_req_t ping;
+ ssize_t ret;
+ uint32_t save_crc;
+ uint16_t peer_version;
+ int ntries = 3; /* One second between tries. 3secs should be enough. */
+
+ ping.svp_ver = htons(SVP_CURRENT_VERSION);
+ ping.svp_op = htons(SVP_R_PING);
+ ping.svp_size = 0; /* Header-only... */
+ ping.svp_id = 0;
+ /* 0-length data... just use the req buffer for the pointer. */
+ svp_query_crc32(&ping, &ping, 0);
+
+ ret = write(scp->sc_socket, &ping, sizeof (ping));
+ if (ret == -1) {
+ /*
+ * A failed write() call right after connect probably
+ * indicates a larger connection failure. Restart the
+ * connection from scratch.
+ */
+ return (SVP_CE_SOCKET);
+ }
+ assert(ret == sizeof (ping));
+ do {
+ /*
+ * Asynch read. We may loop here once or twice.
+ * Wait a bit, but don't loop too many times...
+ */
+ (void) sleep(1);
+ ret = read(scp->sc_socket, &ping, sizeof (ping));
+ } while (--ntries > 0 &&
+ ret == -1 && (errno == EINTR || errno == EAGAIN));
+ if (ret == -1) {
+ /*
+ * This is actually a failed read() call. Restart the
+ * connection from scratch.
+ */
+ return (SVP_CE_SOCKET);
+ }
+
+ save_crc = ping.svp_crc32;
+ svp_query_crc32(&ping, &ping, 0);
+ peer_version = htons(ping.svp_ver);
+ if (ping.svp_op != htons(SVP_R_PONG) ||
+ ping.svp_size != 0 || ping.svp_id != 0 ||
+ ping.svp_crc32 != save_crc ||
+ peer_version == 0 || peer_version > SVP_CURRENT_VERSION) {
+ return (SVP_CE_VERSION_PONG);
+ }
+
+ /* This connection now has a version! */
+ scp->sc_version = peer_version;
+ return (SVP_CE_NONE);
+ }
+
static svp_conn_act_t
svp_conn_connect(svp_conn_t *scp)
{
int ret;
struct sockaddr_in6 in6;
*** 178,187 ****
--- 247,259 ----
scp->sc_cstate == SVP_CS_INITIAL);
assert(scp->sc_socket == -1);
if (scp->sc_cstate == SVP_CS_INITIAL)
scp->sc_nbackoff = 0;
+ /* New connect means we need to know the version. */
+ scp->sc_version = 0;
+
scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (scp->sc_socket == -1) {
scp->sc_error = SVP_CE_SOCKET;
scp->sc_errno = errno;
scp->sc_cstate = SVP_CS_ERROR;
*** 250,301 ****
*/
return (svp_conn_backoff(scp));
}
}
! /*
! * We've connected. Successfully move ourselves to the bound
! * state and start polling.
! */
! scp->sc_cstate = SVP_CS_ACTIVE;
! scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
! ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
! if (ret == 0)
! return (SVP_RA_RESTORE);
! scp->sc_error = SVP_CE_ASSOCIATE;
! scp->sc_cstate = SVP_CS_ERROR;
!
! return (SVP_RA_DEGRADE);
}
/*
! * This should be the first call we get after a connect. If we have successfully
! * connected, we should see a writeable event. We may also see an error or a
! * hang up. In either of these cases, we transition to error mode. If there is
! * also a readable event, we ignore it at the moment and just let a
! * reassociation pick it up so we can simplify the set of state transitions that
! * we have.
*/
static svp_conn_act_t
svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
{
! int ret, err;
socklen_t sl = sizeof (err);
if (!(pe->portev_events & POLLOUT)) {
scp->sc_errno = 0;
scp->sc_error = SVP_CE_NOPOLLOUT;
scp->sc_cstate = SVP_CS_ERROR;
return (SVP_RA_DEGRADE);
}
! ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err, &sl);
if (ret != 0)
libvarpd_panic("unanticipated getsockopt error");
if (err != 0) {
return (svp_conn_backoff(scp));
}
scp->sc_cstate = SVP_CS_ACTIVE;
scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
if (ret == 0)
return (SVP_RA_RESTORE);
--- 322,394 ----
*/
return (svp_conn_backoff(scp));
}
}
! return (svp_conn_poll_connect(NULL, scp));
}
/*
! * This should be the first call we get after a successful synchronous
! * connect, or a completed (failed or successful) asynchronous connect. A
! * non-NULL port-event indicates asynchronous completion, a NULL port-event
! * indicates a successful synchronous connect.
! *
! * If we have successfully connected, we should see a writeable event. In the
! * asynchronous case, we may also see an error or a hang up. For either hang
! * up or error, we transition to error mode. If there is also a readable event
! * (i.e. incoming data), we ignore it at the moment and just let a
! * reassociation pick it up so we can simplify the set of state transitions
! * that we have.
*/
static svp_conn_act_t
svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
{
! int ret;
! svp_conn_error_t version_error;
!
! if (pe != NULL) {
! int err;
socklen_t sl = sizeof (err);
+
+ /*
+ * These bits only matter if we're notified of an
+ * asynchronous connection completion.
+ */
if (!(pe->portev_events & POLLOUT)) {
scp->sc_errno = 0;
scp->sc_error = SVP_CE_NOPOLLOUT;
scp->sc_cstate = SVP_CS_ERROR;
return (SVP_RA_DEGRADE);
}
! ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err,
! &sl);
if (ret != 0)
libvarpd_panic("unanticipated getsockopt error");
if (err != 0) {
return (svp_conn_backoff(scp));
}
+ }
+ /* Use a single SVP_R_PING to determine the version. */
+ version_error = svp_conn_version_set(scp);
+ switch (version_error) {
+ case SVP_CE_SOCKET:
+ /* Use this to signal read/write errors... */
+ return (svp_conn_backoff(scp));
+ case SVP_CE_NONE:
+ assert(scp->sc_version > 0 &&
+ scp->sc_version <= SVP_CURRENT_VERSION);
+ break;
+ default:
+ scp->sc_error = version_error;
+ scp->sc_cstate = SVP_CS_ERROR;
+ scp->sc_errno = EPROTONOSUPPORT; /* Protocol error... */
+ return (SVP_RA_DEGRADE);
+ }
+
scp->sc_cstate = SVP_CS_ACTIVE;
scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
if (ret == 0)
return (SVP_RA_RESTORE);
*** 355,365 ****
iov[nvecs].iov_len = sqp->sq_rsize - off;
nvecs++;
do {
ret = writev(scp->sc_socket, iov, nvecs);
! } while (ret == -1 && errno == EAGAIN);
if (ret == -1) {
switch (errno) {
case EAGAIN:
scp->sc_event.se_events |= POLLOUT;
return (SVP_RA_NONE);
--- 448,458 ----
iov[nvecs].iov_len = sqp->sq_rsize - off;
nvecs++;
do {
ret = writev(scp->sc_socket, iov, nvecs);
! } while (ret == -1 && errno == EINTR);
if (ret == -1) {
switch (errno) {
case EAGAIN:
scp->sc_event.se_events |= POLLOUT;
return (SVP_RA_NONE);
*** 395,409 ****
nvers = ntohs(resp->svp_ver);
nop = ntohs(resp->svp_op);
nsize = ntohl(resp->svp_size);
! if (nvers != SVP_CURRENT_VERSION) {
! (void) bunyan_warn(svp_bunyan, "unsupported version",
BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
! BUNYAN_T_INT32, "version", nvers,
BUNYAN_T_INT32, "operation", nop,
BUNYAN_T_INT32, "response_id", resp->svp_id,
BUNYAN_T_END);
return (B_FALSE);
}
--- 488,507 ----
nvers = ntohs(resp->svp_ver);
nop = ntohs(resp->svp_op);
nsize = ntohl(resp->svp_size);
! /*
! * A peer that's messing with post-connection version changes is
! * likely a broken peer.
! */
! if (nvers != scp->sc_version) {
! (void) bunyan_warn(svp_bunyan, "version mismatch",
BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
! BUNYAN_T_INT32, "peer version", nvers,
! BUNYAN_T_INT32, "our version", scp->sc_version,
BUNYAN_T_INT32, "operation", nop,
BUNYAN_T_INT32, "response_id", resp->svp_id,
BUNYAN_T_END);
return (B_FALSE);
}