Print this page
Version bump SVP to 2

*** 41,50 **** --- 41,52 ---- SVP_RA_RESTORE = 0x02, SVP_RA_ERROR = 0x03, SVP_RA_CLEANUP = 0x04 } svp_conn_act_t; + static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *); + static void svp_conn_inject(svp_conn_t *scp) { int ret; assert(MUTEX_HELD(&scp->sc_lock));
*** 165,174 **** --- 167,243 ---- if (scp->sc_nbackoff > svp_conn_nbackoff) return (SVP_RA_DEGRADE); return (SVP_RA_NONE); } + /* + * Think of this as an extension to the connect() call in svp_conn_connect(). + * Send a message, receive it, and set the version here. If the response is + * too slow or the socket throws an error, indicate a socket error, which + * will cause the caller to backoff (i.e. close the socket and try again). + * + * Version mismatch (corrupt SVP server or too-advanced SVP server) is its + * own error type. + */ + static svp_conn_error_t + svp_conn_version_set(svp_conn_t *scp) + { + svp_req_t ping; + ssize_t ret; + uint32_t save_crc; + uint16_t peer_version; + int ntries = 3; /* One second between tries. 3secs should be enough. */ + + ping.svp_ver = htons(SVP_CURRENT_VERSION); + ping.svp_op = htons(SVP_R_PING); + ping.svp_size = 0; /* Header-only... */ + ping.svp_id = 0; + /* 0-length data... just use the req buffer for the pointer. */ + svp_query_crc32(&ping, &ping, 0); + + ret = write(scp->sc_socket, &ping, sizeof (ping)); + if (ret == -1) { + /* + * A failed write() call right after connect probably + * indicates a larger connection failure. Restart the + * connection from scratch. + */ + return (SVP_CE_SOCKET); + } + assert(ret == sizeof (ping)); + do { + /* + * Asynch read. We may loop here once or twice. + * Wait a bit, but don't loop too many times... + */ + (void) sleep(1); + ret = read(scp->sc_socket, &ping, sizeof (ping)); + } while (--ntries > 0 && + ret == -1 && (errno == EINTR || errno == EAGAIN)); + if (ret == -1) { + /* + * This is actually a failed read() call. Restart the + * connection from scratch. + */ + return (SVP_CE_SOCKET); + } + + save_crc = ping.svp_crc32; + svp_query_crc32(&ping, &ping, 0); + peer_version = htons(ping.svp_ver); + if (ping.svp_op != htons(SVP_R_PONG) || + ping.svp_size != 0 || ping.svp_id != 0 || + ping.svp_crc32 != save_crc || + peer_version == 0 || peer_version > SVP_CURRENT_VERSION) { + return (SVP_CE_VERSION_PONG); + } + + /* This connection now has a version! */ + scp->sc_version = peer_version; + return (SVP_CE_NONE); + } + static svp_conn_act_t svp_conn_connect(svp_conn_t *scp) { int ret; struct sockaddr_in6 in6;
*** 178,187 **** --- 247,259 ---- scp->sc_cstate == SVP_CS_INITIAL); assert(scp->sc_socket == -1); if (scp->sc_cstate == SVP_CS_INITIAL) scp->sc_nbackoff = 0; + /* New connect means we need to know the version. */ + scp->sc_version = 0; + scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0); if (scp->sc_socket == -1) { scp->sc_error = SVP_CE_SOCKET; scp->sc_errno = errno; scp->sc_cstate = SVP_CS_ERROR;
*** 250,301 **** */ return (svp_conn_backoff(scp)); } } ! /* ! * We've connected. Successfully move ourselves to the bound ! * state and start polling. ! */ ! scp->sc_cstate = SVP_CS_ACTIVE; ! scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP; ! ret = svp_event_associate(&scp->sc_event, scp->sc_socket); ! if (ret == 0) ! return (SVP_RA_RESTORE); ! scp->sc_error = SVP_CE_ASSOCIATE; ! scp->sc_cstate = SVP_CS_ERROR; ! ! return (SVP_RA_DEGRADE); } /* ! * This should be the first call we get after a connect. If we have successfully ! * connected, we should see a writeable event. We may also see an error or a ! * hang up. In either of these cases, we transition to error mode. If there is ! * also a readable event, we ignore it at the moment and just let a ! * reassociation pick it up so we can simplify the set of state transitions that ! * we have. */ static svp_conn_act_t svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp) { ! int ret, err; socklen_t sl = sizeof (err); if (!(pe->portev_events & POLLOUT)) { scp->sc_errno = 0; scp->sc_error = SVP_CE_NOPOLLOUT; scp->sc_cstate = SVP_CS_ERROR; return (SVP_RA_DEGRADE); } ! ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err, &sl); if (ret != 0) libvarpd_panic("unanticipated getsockopt error"); if (err != 0) { return (svp_conn_backoff(scp)); } scp->sc_cstate = SVP_CS_ACTIVE; scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP; ret = svp_event_associate(&scp->sc_event, scp->sc_socket); if (ret == 0) return (SVP_RA_RESTORE); --- 322,394 ---- */ return (svp_conn_backoff(scp)); } } ! return (svp_conn_poll_connect(NULL, scp)); } /* ! * This should be the first call we get after a successful synchronous ! * connect, or a completed (failed or successful) asynchronous connect. A ! * non-NULL port-event indicates asynchronous completion, a NULL port-event ! * indicates a successful synchronous connect. ! * ! * If we have successfully connected, we should see a writeable event. In the ! * asynchronous case, we may also see an error or a hang up. For either hang ! * up or error, we transition to error mode. If there is also a readable event ! * (i.e. incoming data), we ignore it at the moment and just let a ! * reassociation pick it up so we can simplify the set of state transitions ! * that we have. */ static svp_conn_act_t svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp) { ! int ret; ! svp_conn_error_t version_error; ! ! if (pe != NULL) { ! int err; socklen_t sl = sizeof (err); + + /* + * These bits only matter if we're notified of an + * asynchronous connection completion. + */ if (!(pe->portev_events & POLLOUT)) { scp->sc_errno = 0; scp->sc_error = SVP_CE_NOPOLLOUT; scp->sc_cstate = SVP_CS_ERROR; return (SVP_RA_DEGRADE); } ! ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err, ! &sl); if (ret != 0) libvarpd_panic("unanticipated getsockopt error"); if (err != 0) { return (svp_conn_backoff(scp)); } + } + /* Use a single SVP_R_PING to determine the version. */ + version_error = svp_conn_version_set(scp); + switch (version_error) { + case SVP_CE_SOCKET: + /* Use this to signal read/write errors... */ + return (svp_conn_backoff(scp)); + case SVP_CE_NONE: + assert(scp->sc_version > 0 && + scp->sc_version <= SVP_CURRENT_VERSION); + break; + default: + scp->sc_error = version_error; + scp->sc_cstate = SVP_CS_ERROR; + scp->sc_errno = EPROTONOSUPPORT; /* Protocol error... */ + return (SVP_RA_DEGRADE); + } + scp->sc_cstate = SVP_CS_ACTIVE; scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP; ret = svp_event_associate(&scp->sc_event, scp->sc_socket); if (ret == 0) return (SVP_RA_RESTORE);
*** 355,365 **** iov[nvecs].iov_len = sqp->sq_rsize - off; nvecs++; do { ret = writev(scp->sc_socket, iov, nvecs); ! } while (ret == -1 && errno == EAGAIN); if (ret == -1) { switch (errno) { case EAGAIN: scp->sc_event.se_events |= POLLOUT; return (SVP_RA_NONE); --- 448,458 ---- iov[nvecs].iov_len = sqp->sq_rsize - off; nvecs++; do { ret = writev(scp->sc_socket, iov, nvecs); ! } while (ret == -1 && errno == EINTR); if (ret == -1) { switch (errno) { case EAGAIN: scp->sc_event.se_events |= POLLOUT; return (SVP_RA_NONE);
*** 395,409 **** nvers = ntohs(resp->svp_ver); nop = ntohs(resp->svp_op); nsize = ntohl(resp->svp_size); ! if (nvers != SVP_CURRENT_VERSION) { ! (void) bunyan_warn(svp_bunyan, "unsupported version", BUNYAN_T_IP, "remote_ip", &scp->sc_addr, BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport, ! BUNYAN_T_INT32, "version", nvers, BUNYAN_T_INT32, "operation", nop, BUNYAN_T_INT32, "response_id", resp->svp_id, BUNYAN_T_END); return (B_FALSE); } --- 488,507 ---- nvers = ntohs(resp->svp_ver); nop = ntohs(resp->svp_op); nsize = ntohl(resp->svp_size); ! /* ! * A peer that's messing with post-connection version changes is ! * likely a broken peer. ! */ ! if (nvers != scp->sc_version) { ! (void) bunyan_warn(svp_bunyan, "version mismatch", BUNYAN_T_IP, "remote_ip", &scp->sc_addr, BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport, ! BUNYAN_T_INT32, "peer version", nvers, ! BUNYAN_T_INT32, "our version", scp->sc_version, BUNYAN_T_INT32, "operation", nop, BUNYAN_T_INT32, "response_id", resp->svp_id, BUNYAN_T_END); return (B_FALSE); }