Print this page
Try versioning as a new state

*** 38,48 **** typedef enum svp_conn_act { SVP_RA_NONE = 0x00, SVP_RA_DEGRADE = 0x01, SVP_RA_RESTORE = 0x02, SVP_RA_ERROR = 0x03, ! SVP_RA_CLEANUP = 0x04 } svp_conn_act_t; static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *); static void --- 38,49 ---- typedef enum svp_conn_act { SVP_RA_NONE = 0x00, SVP_RA_DEGRADE = 0x01, SVP_RA_RESTORE = 0x02, SVP_RA_ERROR = 0x03, ! SVP_RA_CLEANUP = 0x04, ! SVP_RA_FIND_VERSION = 0x05 } svp_conn_act_t; static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *); static void
*** 90,100 **** --- 91,169 ---- if (srp->sr_ndconns == srp->sr_tconns) svp_remote_restore(srp, SVP_RD_REMOTE_FAIL); srp->sr_ndconns--; } + static svp_conn_act_t + svp_conn_pong_handler(svp_conn_t *scp, svp_query_t *sqp) + { + uint16_t remote_version = ntohs(scp->sc_input.sci_req.svp_ver); + + if (scp->sc_cstate == SVP_CS_VERSIONING) { + /* Transition VERSIONING -> ACTIVE. */ + assert(scp->sc_version == 0); + if (remote_version == 0 || remote_version > SVP_CURRENT_VERSION) + return (SVP_RA_ERROR); + scp->sc_version = remote_version; + scp->sc_cstate = SVP_CS_ACTIVE; + } + + return (SVP_RA_NONE); + } + static void + svp_conn_ping_cb(svp_query_t *sqp, void *arg) + { + size_t len = (size_t)arg; + + assert(len == sizeof (svp_query_t)); + umem_free(sqp, len); + } + + static svp_conn_act_t + svp_conn_ping_version(svp_conn_t *scp) + { + svp_remote_t *srp = scp->sc_remote; + svp_query_t *sqp = umem_zalloc(sizeof (svp_query_t), UMEM_DEFAULT); + int ret; + + assert(MUTEX_HELD(&srp->sr_lock)); + assert(MUTEX_HELD(&scp->sc_lock)); + assert(scp->sc_cstate == SVP_CS_CONNECTING); + + if (sqp == NULL) + return (SVP_RA_ERROR); + + /* Only set things that need to be non-0/non-NULL. */ + sqp->sq_state = SVP_QUERY_INIT; + sqp->sq_func = svp_conn_ping_cb; + sqp->sq_arg = (void *)sizeof (svp_query_t); + sqp->sq_header.svp_op = htons(SVP_R_PING); + sqp->sq_header.svp_ver = htons(SVP_CURRENT_VERSION); + sqp->sq_header.svp_id = svp_id_alloc(); + if (sqp->sq_header.svp_id == -1) { + umem_free(sqp, sizeof (svp_query_t)); + return (SVP_RA_ERROR); + } + + scp->sc_cstate = SVP_CS_VERSIONING; + /* Set the event flags now... */ + scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP | POLLOUT; + /* ...so I can just queue it up directly... */ + svp_conn_queue(scp, sqp); + /* ... and then associate the event port myself. */ + ret = svp_event_associate(&scp->sc_event, scp->sc_socket); + if (ret == 0) + return (SVP_RA_RESTORE); + scp->sc_error = SVP_CE_ASSOCIATE; + scp->sc_errno = ret; + scp->sc_cstate = SVP_CS_ERROR; + umem_free(sqp, sizeof (svp_query_t)); + return (SVP_RA_DEGRADE); + } + + static void svp_conn_add(svp_conn_t *scp) { svp_remote_t *srp = scp->sc_remote; assert(MUTEX_HELD(&srp->sr_lock));
*** 167,243 **** if (scp->sc_nbackoff > svp_conn_nbackoff) return (SVP_RA_DEGRADE); return (SVP_RA_NONE); } - /* - * Think of this as an extension to the connect() call in svp_conn_connect(). - * Send a message, receive it, and set the version here. If the response is - * too slow or the socket throws an error, indicate a socket error, which - * will cause the caller to backoff (i.e. close the socket and try again). - * - * Version mismatch (corrupt SVP server or too-advanced SVP server) is its - * own error type. - */ - static svp_conn_error_t - svp_conn_version_set(svp_conn_t *scp) - { - svp_req_t ping; - ssize_t ret; - uint32_t save_crc; - uint16_t peer_version; - int ntries = 3; /* One second between tries. 3secs should be enough. */ - - ping.svp_ver = htons(SVP_CURRENT_VERSION); - ping.svp_op = htons(SVP_R_PING); - ping.svp_size = 0; /* Header-only... */ - ping.svp_id = 0; - /* 0-length data... just use the req buffer for the pointer. */ - svp_query_crc32(&ping, &ping, 0); - - ret = write(scp->sc_socket, &ping, sizeof (ping)); - if (ret == -1) { - /* - * A failed write() call right after connect probably - * indicates a larger connection failure. Restart the - * connection from scratch. - */ - return (SVP_CE_SOCKET); - } - assert(ret == sizeof (ping)); - do { - /* - * Asynch read. We may loop here once or twice. - * Wait a bit, but don't loop too many times... - */ - (void) sleep(1); - ret = read(scp->sc_socket, &ping, sizeof (ping)); - } while (--ntries > 0 && - ret == -1 && (errno == EINTR || errno == EAGAIN)); - if (ret == -1) { - /* - * This is actually a failed read() call. Restart the - * connection from scratch. - */ - return (SVP_CE_SOCKET); - } - - save_crc = ping.svp_crc32; - svp_query_crc32(&ping, &ping, 0); - peer_version = htons(ping.svp_ver); - if (ping.svp_op != htons(SVP_R_PONG) || - ping.svp_size != 0 || ping.svp_id != 0 || - ping.svp_crc32 != save_crc || - peer_version == 0 || peer_version > SVP_CURRENT_VERSION) { - return (SVP_CE_VERSION_PONG); - } - - /* This connection now has a version! */ - scp->sc_version = peer_version; - return (SVP_CE_NONE); - } - static svp_conn_act_t svp_conn_connect(svp_conn_t *scp) { int ret; struct sockaddr_in6 in6; --- 236,245 ----
*** 322,331 **** --- 324,334 ---- */ return (svp_conn_backoff(scp)); } } + /* Immediately successful connection, move to SVP_CS_VERSIONING. */ return (svp_conn_poll_connect(NULL, scp)); } /* * This should be the first call we get after a successful synchronous
*** 368,403 **** if (err != 0) { return (svp_conn_backoff(scp)); } } ! /* Use a single SVP_R_PING to determine the version. */ ! version_error = svp_conn_version_set(scp); ! switch (version_error) { ! case SVP_CE_SOCKET: ! /* Use this to signal read/write errors... */ ! return (svp_conn_backoff(scp)); ! case SVP_CE_NONE: ! assert(scp->sc_version > 0 && ! scp->sc_version <= SVP_CURRENT_VERSION); ! break; ! default: ! scp->sc_error = version_error; ! scp->sc_cstate = SVP_CS_ERROR; ! scp->sc_errno = EPROTONOSUPPORT; /* Protocol error... */ ! return (SVP_RA_DEGRADE); ! } ! ! scp->sc_cstate = SVP_CS_ACTIVE; ! scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP; ! ret = svp_event_associate(&scp->sc_event, scp->sc_socket); ! if (ret == 0) ! return (SVP_RA_RESTORE); ! scp->sc_error = SVP_CE_ASSOCIATE; ! scp->sc_errno = ret; ! scp->sc_cstate = SVP_CS_ERROR; ! return (SVP_RA_DEGRADE); } static svp_conn_act_t svp_conn_pollout(svp_conn_t *scp) { --- 371,381 ---- if (err != 0) { return (svp_conn_backoff(scp)); } } ! return (SVP_RA_FIND_VERSION); } static svp_conn_act_t svp_conn_pollout(svp_conn_t *scp) {
*** 478,488 **** static boolean_t svp_conn_pollin_validate(svp_conn_t *scp) { svp_query_t *sqp; ! uint32_t nsize; uint16_t nvers, nop; svp_req_t *resp = &scp->sc_input.sci_req; assert(MUTEX_HELD(&scp->sc_lock)); --- 456,466 ---- static boolean_t svp_conn_pollin_validate(svp_conn_t *scp) { svp_query_t *sqp; ! uint32_t nsize, expected_size = 0; uint16_t nvers, nop; svp_req_t *resp = &scp->sc_input.sci_req; assert(MUTEX_HELD(&scp->sc_lock));
*** 492,502 **** /* * A peer that's messing with post-connection version changes is * likely a broken peer. */ ! if (nvers != scp->sc_version) { (void) bunyan_warn(svp_bunyan, "version mismatch", BUNYAN_T_IP, "remote_ip", &scp->sc_addr, BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport, BUNYAN_T_INT32, "peer version", nvers, BUNYAN_T_INT32, "our version", scp->sc_version, --- 470,480 ---- /* * A peer that's messing with post-connection version changes is * likely a broken peer. */ ! if (scp->sc_cstate != SVP_CS_VERSIONING && nvers != scp->sc_version) { (void) bunyan_warn(svp_bunyan, "version mismatch", BUNYAN_T_IP, "remote_ip", &scp->sc_addr, BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport, BUNYAN_T_INT32, "peer version", nvers, BUNYAN_T_INT32, "our version", scp->sc_version,
*** 504,515 **** BUNYAN_T_INT32, "response_id", resp->svp_id, BUNYAN_T_END); return (B_FALSE); } ! if (nop != SVP_R_VL2_ACK && nop != SVP_R_VL3_ACK && ! nop != SVP_R_LOG_ACK && nop != SVP_R_LOG_RM_ACK) { (void) bunyan_warn(svp_bunyan, "unsupported operation", BUNYAN_T_IP, "remote_ip", &scp->sc_addr, BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport, BUNYAN_T_INT32, "version", nvers, BUNYAN_T_INT32, "operation", nop, --- 482,509 ---- BUNYAN_T_INT32, "response_id", resp->svp_id, BUNYAN_T_END); return (B_FALSE); } ! switch (nop) { ! case SVP_R_VL2_ACK: ! expected_size = sizeof (svp_vl2_ack_t); ! break; ! case SVP_R_VL3_ACK: ! expected_size = sizeof (svp_vl3_ack_t); ! break; ! case SVP_R_LOG_RM_ACK: ! expected_size = sizeof (svp_lrm_ack_t); ! break; ! case SVP_R_ROUTE_ACK: ! expected_size = sizeof (svp_route_ack_t); ! break; ! case SVP_R_LOG_ACK: ! case SVP_R_PONG: ! /* No expected size (LOG_ACK) or size is 0 (PONG). */ ! break; ! default: (void) bunyan_warn(svp_bunyan, "unsupported operation", BUNYAN_T_IP, "remote_ip", &scp->sc_addr, BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport, BUNYAN_T_INT32, "version", nvers, BUNYAN_T_INT32, "operation", nop,
*** 541,553 **** BUNYAN_T_INT32, "query_state", sqp->sq_state, BUNYAN_T_END); return (B_FALSE); } ! if ((nop == SVP_R_VL2_ACK && nsize != sizeof (svp_vl2_ack_t)) || ! (nop == SVP_R_VL3_ACK && nsize != sizeof (svp_vl3_ack_t)) || ! (nop == SVP_R_LOG_RM_ACK && nsize != sizeof (svp_lrm_ack_t))) { (void) bunyan_warn(svp_bunyan, "response size too large", BUNYAN_T_IP, "remote_ip", &scp->sc_addr, BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport, BUNYAN_T_INT32, "version", nvers, BUNYAN_T_INT32, "operation", nop, --- 535,545 ---- BUNYAN_T_INT32, "query_state", sqp->sq_state, BUNYAN_T_END); return (B_FALSE); } ! if (nop != SVP_R_LOG_RM_ACK && nsize != expected_size) { (void) bunyan_warn(svp_bunyan, "response size too large", BUNYAN_T_IP, "remote_ip", &scp->sc_addr, BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport, BUNYAN_T_INT32, "version", nvers, BUNYAN_T_INT32, "operation", nop,
*** 588,598 **** } sqp->sq_size = nsize; scp->sc_input.sci_query = sqp; if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK || ! nop == SVP_R_LOG_RM_ACK) { sqp->sq_wdata = &sqp->sq_wdun; sqp->sq_wsize = sizeof (svp_query_data_t); } else { VERIFY(nop == SVP_R_LOG_ACK); assert(sqp->sq_wdata != NULL); --- 580,591 ---- } sqp->sq_size = nsize; scp->sc_input.sci_query = sqp; if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK || ! nop == SVP_R_LOG_RM_ACK || nop == SVP_R_ROUTE_ACK || ! nop == SVP_R_PONG) { sqp->sq_wdata = &sqp->sq_wdun; sqp->sq_wsize = sizeof (svp_query_data_t); } else { VERIFY(nop == SVP_R_LOG_ACK); assert(sqp->sq_wdata != NULL);
*** 678,688 **** return (SVP_RA_ERROR); break; default: libvarpd_panic("unexpeted read errno: %d", errno); } ! } else if (ret == 0) { /* Try to reconnect to the remote host */ return (SVP_RA_ERROR); } if (ret + off < total) { --- 671,681 ---- return (SVP_RA_ERROR); break; default: libvarpd_panic("unexpeted read errno: %d", errno); } ! } else if (ret == 0 && total - off > 0) { /* Try to reconnect to the remote host */ return (SVP_RA_ERROR); } if (ret + off < total) {
*** 722,731 **** --- 715,738 ---- svp_log_ack_t *svla = sqp->sq_wdata; sqp->sq_status = ntohl(svla->svla_status); } else if (nop == SVP_R_LOG_RM_ACK) { svp_lrm_ack_t *svra = sqp->sq_wdata; sqp->sq_status = ntohl(svra->svra_status); + } else if (nop == SVP_R_ROUTE_ACK) { + svp_route_ack_t *sra = sqp->sq_wdata; + sqp->sq_status = ntohl(sra->sra_status); + } else if (nop == SVP_R_PONG) { + /* + * Handle the PONG versioning-capture here, as we need + * the version number, the scp_lock held, and the ability + * to error out. + */ + svp_conn_act_t cbret; + + cbret = svp_conn_pong_handler(scp, sqp); + if (cbret != SVP_RA_NONE) + return (cbret); } else { libvarpd_panic("unhandled nop: %d", nop); } list_remove(&scp->sc_queries, sqp);
*** 833,842 **** --- 840,850 ---- break; case SVP_CS_CONNECTING: assert(pe != NULL); ret = svp_conn_poll_connect(pe, scp); break; + case SVP_CS_VERSIONING: case SVP_CS_ACTIVE: case SVP_CS_WINDDOWN: assert(pe != NULL); oldstate = scp->sc_cstate; if (pe->portev_events & POLLOUT)
*** 870,879 **** --- 878,890 ---- if (ret == SVP_RA_NONE) return; mutex_enter(&srp->sr_lock); mutex_enter(&scp->sc_lock); + if (ret == SVP_RA_FIND_VERSION) + ret = svp_conn_ping_version(scp); + if (ret == SVP_RA_ERROR) ret = svp_conn_reset(scp); if (ret == SVP_RA_DEGRADE) svp_conn_degrade(scp);
*** 1111,1121 **** void svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp) { assert(MUTEX_HELD(&scp->sc_lock)); ! assert(scp->sc_cstate == SVP_CS_ACTIVE); sqp->sq_acttime = -1; list_insert_tail(&scp->sc_queries, sqp); if (!(scp->sc_event.se_events & POLLOUT)) { scp->sc_event.se_events |= POLLOUT; --- 1122,1133 ---- void svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp) { assert(MUTEX_HELD(&scp->sc_lock)); ! assert(scp->sc_cstate == SVP_CS_ACTIVE || ! scp->sc_cstate == SVP_CS_VERSIONING); sqp->sq_acttime = -1; list_insert_tail(&scp->sc_queries, sqp); if (!(scp->sc_event.se_events & POLLOUT)) { scp->sc_event.se_events |= POLLOUT;