Print this page
Try versioning as a new state
*** 38,48 ****
typedef enum svp_conn_act {
SVP_RA_NONE = 0x00,
SVP_RA_DEGRADE = 0x01,
SVP_RA_RESTORE = 0x02,
SVP_RA_ERROR = 0x03,
! SVP_RA_CLEANUP = 0x04
} svp_conn_act_t;
static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
static void
--- 38,49 ----
typedef enum svp_conn_act {
SVP_RA_NONE = 0x00,
SVP_RA_DEGRADE = 0x01,
SVP_RA_RESTORE = 0x02,
SVP_RA_ERROR = 0x03,
! SVP_RA_CLEANUP = 0x04,
! SVP_RA_FIND_VERSION = 0x05
} svp_conn_act_t;
static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
static void
*** 90,100 ****
--- 91,169 ----
if (srp->sr_ndconns == srp->sr_tconns)
svp_remote_restore(srp, SVP_RD_REMOTE_FAIL);
srp->sr_ndconns--;
}
+ static svp_conn_act_t
+ svp_conn_pong_handler(svp_conn_t *scp, svp_query_t *sqp)
+ {
+ uint16_t remote_version = ntohs(scp->sc_input.sci_req.svp_ver);
+
+ if (scp->sc_cstate == SVP_CS_VERSIONING) {
+ /* Transition VERSIONING -> ACTIVE. */
+ assert(scp->sc_version == 0);
+ if (remote_version == 0 || remote_version > SVP_CURRENT_VERSION)
+ return (SVP_RA_ERROR);
+ scp->sc_version = remote_version;
+ scp->sc_cstate = SVP_CS_ACTIVE;
+ }
+
+ return (SVP_RA_NONE);
+ }
+
static void
+ svp_conn_ping_cb(svp_query_t *sqp, void *arg)
+ {
+ size_t len = (size_t)arg;
+
+ assert(len == sizeof (svp_query_t));
+ umem_free(sqp, len);
+ }
+
+ static svp_conn_act_t
+ svp_conn_ping_version(svp_conn_t *scp)
+ {
+ svp_remote_t *srp = scp->sc_remote;
+ svp_query_t *sqp = umem_zalloc(sizeof (svp_query_t), UMEM_DEFAULT);
+ int ret;
+
+ assert(MUTEX_HELD(&srp->sr_lock));
+ assert(MUTEX_HELD(&scp->sc_lock));
+ assert(scp->sc_cstate == SVP_CS_CONNECTING);
+
+ if (sqp == NULL)
+ return (SVP_RA_ERROR);
+
+ /* Only set things that need to be non-0/non-NULL. */
+ sqp->sq_state = SVP_QUERY_INIT;
+ sqp->sq_func = svp_conn_ping_cb;
+ sqp->sq_arg = (void *)sizeof (svp_query_t);
+ sqp->sq_header.svp_op = htons(SVP_R_PING);
+ sqp->sq_header.svp_ver = htons(SVP_CURRENT_VERSION);
+ sqp->sq_header.svp_id = svp_id_alloc();
+ if (sqp->sq_header.svp_id == -1) {
+ umem_free(sqp, sizeof (svp_query_t));
+ return (SVP_RA_ERROR);
+ }
+
+ scp->sc_cstate = SVP_CS_VERSIONING;
+ /* Set the event flags now... */
+ scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP | POLLOUT;
+ /* ...so I can just queue it up directly... */
+ svp_conn_queue(scp, sqp);
+ /* ... and then associate the event port myself. */
+ ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
+ if (ret == 0)
+ return (SVP_RA_RESTORE);
+ scp->sc_error = SVP_CE_ASSOCIATE;
+ scp->sc_errno = ret;
+ scp->sc_cstate = SVP_CS_ERROR;
+ umem_free(sqp, sizeof (svp_query_t));
+ return (SVP_RA_DEGRADE);
+ }
+
+ static void
svp_conn_add(svp_conn_t *scp)
{
svp_remote_t *srp = scp->sc_remote;
assert(MUTEX_HELD(&srp->sr_lock));
*** 167,243 ****
if (scp->sc_nbackoff > svp_conn_nbackoff)
return (SVP_RA_DEGRADE);
return (SVP_RA_NONE);
}
- /*
- * Think of this as an extension to the connect() call in svp_conn_connect().
- * Send a message, receive it, and set the version here. If the response is
- * too slow or the socket throws an error, indicate a socket error, which
- * will cause the caller to backoff (i.e. close the socket and try again).
- *
- * Version mismatch (corrupt SVP server or too-advanced SVP server) is its
- * own error type.
- */
- static svp_conn_error_t
- svp_conn_version_set(svp_conn_t *scp)
- {
- svp_req_t ping;
- ssize_t ret;
- uint32_t save_crc;
- uint16_t peer_version;
- int ntries = 3; /* One second between tries. 3secs should be enough. */
-
- ping.svp_ver = htons(SVP_CURRENT_VERSION);
- ping.svp_op = htons(SVP_R_PING);
- ping.svp_size = 0; /* Header-only... */
- ping.svp_id = 0;
- /* 0-length data... just use the req buffer for the pointer. */
- svp_query_crc32(&ping, &ping, 0);
-
- ret = write(scp->sc_socket, &ping, sizeof (ping));
- if (ret == -1) {
- /*
- * A failed write() call right after connect probably
- * indicates a larger connection failure. Restart the
- * connection from scratch.
- */
- return (SVP_CE_SOCKET);
- }
- assert(ret == sizeof (ping));
- do {
- /*
- * Asynch read. We may loop here once or twice.
- * Wait a bit, but don't loop too many times...
- */
- (void) sleep(1);
- ret = read(scp->sc_socket, &ping, sizeof (ping));
- } while (--ntries > 0 &&
- ret == -1 && (errno == EINTR || errno == EAGAIN));
- if (ret == -1) {
- /*
- * This is actually a failed read() call. Restart the
- * connection from scratch.
- */
- return (SVP_CE_SOCKET);
- }
-
- save_crc = ping.svp_crc32;
- svp_query_crc32(&ping, &ping, 0);
- peer_version = htons(ping.svp_ver);
- if (ping.svp_op != htons(SVP_R_PONG) ||
- ping.svp_size != 0 || ping.svp_id != 0 ||
- ping.svp_crc32 != save_crc ||
- peer_version == 0 || peer_version > SVP_CURRENT_VERSION) {
- return (SVP_CE_VERSION_PONG);
- }
-
- /* This connection now has a version! */
- scp->sc_version = peer_version;
- return (SVP_CE_NONE);
- }
-
static svp_conn_act_t
svp_conn_connect(svp_conn_t *scp)
{
int ret;
struct sockaddr_in6 in6;
--- 236,245 ----
*** 322,331 ****
--- 324,334 ----
*/
return (svp_conn_backoff(scp));
}
}
+ /* Immediately successful connection, move to SVP_CS_VERSIONING. */
return (svp_conn_poll_connect(NULL, scp));
}
/*
* This should be the first call we get after a successful synchronous
*** 368,403 ****
if (err != 0) {
return (svp_conn_backoff(scp));
}
}
! /* Use a single SVP_R_PING to determine the version. */
! version_error = svp_conn_version_set(scp);
! switch (version_error) {
! case SVP_CE_SOCKET:
! /* Use this to signal read/write errors... */
! return (svp_conn_backoff(scp));
! case SVP_CE_NONE:
! assert(scp->sc_version > 0 &&
! scp->sc_version <= SVP_CURRENT_VERSION);
! break;
! default:
! scp->sc_error = version_error;
! scp->sc_cstate = SVP_CS_ERROR;
! scp->sc_errno = EPROTONOSUPPORT; /* Protocol error... */
! return (SVP_RA_DEGRADE);
! }
!
! scp->sc_cstate = SVP_CS_ACTIVE;
! scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
! ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
! if (ret == 0)
! return (SVP_RA_RESTORE);
! scp->sc_error = SVP_CE_ASSOCIATE;
! scp->sc_errno = ret;
! scp->sc_cstate = SVP_CS_ERROR;
! return (SVP_RA_DEGRADE);
}
static svp_conn_act_t
svp_conn_pollout(svp_conn_t *scp)
{
--- 371,381 ----
if (err != 0) {
return (svp_conn_backoff(scp));
}
}
! return (SVP_RA_FIND_VERSION);
}
static svp_conn_act_t
svp_conn_pollout(svp_conn_t *scp)
{
*** 478,488 ****
static boolean_t
svp_conn_pollin_validate(svp_conn_t *scp)
{
svp_query_t *sqp;
! uint32_t nsize;
uint16_t nvers, nop;
svp_req_t *resp = &scp->sc_input.sci_req;
assert(MUTEX_HELD(&scp->sc_lock));
--- 456,466 ----
static boolean_t
svp_conn_pollin_validate(svp_conn_t *scp)
{
svp_query_t *sqp;
! uint32_t nsize, expected_size = 0;
uint16_t nvers, nop;
svp_req_t *resp = &scp->sc_input.sci_req;
assert(MUTEX_HELD(&scp->sc_lock));
*** 492,502 ****
/*
* A peer that's messing with post-connection version changes is
* likely a broken peer.
*/
! if (nvers != scp->sc_version) {
(void) bunyan_warn(svp_bunyan, "version mismatch",
BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
BUNYAN_T_INT32, "peer version", nvers,
BUNYAN_T_INT32, "our version", scp->sc_version,
--- 470,480 ----
/*
* A peer that's messing with post-connection version changes is
* likely a broken peer.
*/
! if (scp->sc_cstate != SVP_CS_VERSIONING && nvers != scp->sc_version) {
(void) bunyan_warn(svp_bunyan, "version mismatch",
BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
BUNYAN_T_INT32, "peer version", nvers,
BUNYAN_T_INT32, "our version", scp->sc_version,
*** 504,515 ****
BUNYAN_T_INT32, "response_id", resp->svp_id,
BUNYAN_T_END);
return (B_FALSE);
}
! if (nop != SVP_R_VL2_ACK && nop != SVP_R_VL3_ACK &&
! nop != SVP_R_LOG_ACK && nop != SVP_R_LOG_RM_ACK) {
(void) bunyan_warn(svp_bunyan, "unsupported operation",
BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
BUNYAN_T_INT32, "version", nvers,
BUNYAN_T_INT32, "operation", nop,
--- 482,509 ----
BUNYAN_T_INT32, "response_id", resp->svp_id,
BUNYAN_T_END);
return (B_FALSE);
}
! switch (nop) {
! case SVP_R_VL2_ACK:
! expected_size = sizeof (svp_vl2_ack_t);
! break;
! case SVP_R_VL3_ACK:
! expected_size = sizeof (svp_vl3_ack_t);
! break;
! case SVP_R_LOG_RM_ACK:
! expected_size = sizeof (svp_lrm_ack_t);
! break;
! case SVP_R_ROUTE_ACK:
! expected_size = sizeof (svp_route_ack_t);
! break;
! case SVP_R_LOG_ACK:
! case SVP_R_PONG:
! /* No expected size (LOG_ACK) or size is 0 (PONG). */
! break;
! default:
(void) bunyan_warn(svp_bunyan, "unsupported operation",
BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
BUNYAN_T_INT32, "version", nvers,
BUNYAN_T_INT32, "operation", nop,
*** 541,553 ****
BUNYAN_T_INT32, "query_state", sqp->sq_state,
BUNYAN_T_END);
return (B_FALSE);
}
! if ((nop == SVP_R_VL2_ACK && nsize != sizeof (svp_vl2_ack_t)) ||
! (nop == SVP_R_VL3_ACK && nsize != sizeof (svp_vl3_ack_t)) ||
! (nop == SVP_R_LOG_RM_ACK && nsize != sizeof (svp_lrm_ack_t))) {
(void) bunyan_warn(svp_bunyan, "response size too large",
BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
BUNYAN_T_INT32, "version", nvers,
BUNYAN_T_INT32, "operation", nop,
--- 535,545 ----
BUNYAN_T_INT32, "query_state", sqp->sq_state,
BUNYAN_T_END);
return (B_FALSE);
}
! if (nop != SVP_R_LOG_RM_ACK && nsize != expected_size) {
(void) bunyan_warn(svp_bunyan, "response size too large",
BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
BUNYAN_T_INT32, "version", nvers,
BUNYAN_T_INT32, "operation", nop,
*** 588,598 ****
}
sqp->sq_size = nsize;
scp->sc_input.sci_query = sqp;
if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK ||
! nop == SVP_R_LOG_RM_ACK) {
sqp->sq_wdata = &sqp->sq_wdun;
sqp->sq_wsize = sizeof (svp_query_data_t);
} else {
VERIFY(nop == SVP_R_LOG_ACK);
assert(sqp->sq_wdata != NULL);
--- 580,591 ----
}
sqp->sq_size = nsize;
scp->sc_input.sci_query = sqp;
if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK ||
! nop == SVP_R_LOG_RM_ACK || nop == SVP_R_ROUTE_ACK ||
! nop == SVP_R_PONG) {
sqp->sq_wdata = &sqp->sq_wdun;
sqp->sq_wsize = sizeof (svp_query_data_t);
} else {
VERIFY(nop == SVP_R_LOG_ACK);
assert(sqp->sq_wdata != NULL);
*** 678,688 ****
return (SVP_RA_ERROR);
break;
default:
libvarpd_panic("unexpeted read errno: %d", errno);
}
! } else if (ret == 0) {
/* Try to reconnect to the remote host */
return (SVP_RA_ERROR);
}
if (ret + off < total) {
--- 671,681 ----
return (SVP_RA_ERROR);
break;
default:
libvarpd_panic("unexpeted read errno: %d", errno);
}
! } else if (ret == 0 && total - off > 0) {
/* Try to reconnect to the remote host */
return (SVP_RA_ERROR);
}
if (ret + off < total) {
*** 722,731 ****
--- 715,738 ----
svp_log_ack_t *svla = sqp->sq_wdata;
sqp->sq_status = ntohl(svla->svla_status);
} else if (nop == SVP_R_LOG_RM_ACK) {
svp_lrm_ack_t *svra = sqp->sq_wdata;
sqp->sq_status = ntohl(svra->svra_status);
+ } else if (nop == SVP_R_ROUTE_ACK) {
+ svp_route_ack_t *sra = sqp->sq_wdata;
+ sqp->sq_status = ntohl(sra->sra_status);
+ } else if (nop == SVP_R_PONG) {
+ /*
+ * Handle the PONG versioning-capture here, as we need
+ * the version number, the scp_lock held, and the ability
+ * to error out.
+ */
+ svp_conn_act_t cbret;
+
+ cbret = svp_conn_pong_handler(scp, sqp);
+ if (cbret != SVP_RA_NONE)
+ return (cbret);
} else {
libvarpd_panic("unhandled nop: %d", nop);
}
list_remove(&scp->sc_queries, sqp);
*** 833,842 ****
--- 840,850 ----
break;
case SVP_CS_CONNECTING:
assert(pe != NULL);
ret = svp_conn_poll_connect(pe, scp);
break;
+ case SVP_CS_VERSIONING:
case SVP_CS_ACTIVE:
case SVP_CS_WINDDOWN:
assert(pe != NULL);
oldstate = scp->sc_cstate;
if (pe->portev_events & POLLOUT)
*** 870,879 ****
--- 878,890 ----
if (ret == SVP_RA_NONE)
return;
mutex_enter(&srp->sr_lock);
mutex_enter(&scp->sc_lock);
+ if (ret == SVP_RA_FIND_VERSION)
+ ret = svp_conn_ping_version(scp);
+
if (ret == SVP_RA_ERROR)
ret = svp_conn_reset(scp);
if (ret == SVP_RA_DEGRADE)
svp_conn_degrade(scp);
*** 1111,1121 ****
void
svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp)
{
assert(MUTEX_HELD(&scp->sc_lock));
! assert(scp->sc_cstate == SVP_CS_ACTIVE);
sqp->sq_acttime = -1;
list_insert_tail(&scp->sc_queries, sqp);
if (!(scp->sc_event.se_events & POLLOUT)) {
scp->sc_event.se_events |= POLLOUT;
--- 1122,1133 ----
void
svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp)
{
assert(MUTEX_HELD(&scp->sc_lock));
! assert(scp->sc_cstate == SVP_CS_ACTIVE ||
! scp->sc_cstate == SVP_CS_VERSIONING);
sqp->sq_acttime = -1;
list_insert_tail(&scp->sc_queries, sqp);
if (!(scp->sc_event.se_events & POLLOUT)) {
scp->sc_event.se_events |= POLLOUT;