26 #include <strings.h>
27 #include <unistd.h>
28 #include <stddef.h>
29 #include <sys/uio.h>
30 #include <sys/debug.h>
31
32 #include <libvarpd_svp.h>
33
34 int svp_conn_query_timeout = 30;
35 static int svp_conn_backoff_tbl[] = { 1, 2, 4, 8, 16, 32 };
36 static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int);
37
38 typedef enum svp_conn_act {
39 SVP_RA_NONE = 0x00,
40 SVP_RA_DEGRADE = 0x01,
41 SVP_RA_RESTORE = 0x02,
42 SVP_RA_ERROR = 0x03,
43 SVP_RA_CLEANUP = 0x04
44 } svp_conn_act_t;
45
46 static void
47 svp_conn_inject(svp_conn_t *scp)
48 {
49 int ret;
50 assert(MUTEX_HELD(&scp->sc_lock));
51
52 if (scp->sc_flags & SVP_CF_USER)
53 return;
54 scp->sc_flags |= SVP_CF_USER;
55 if ((ret = svp_event_inject(&scp->sc_event)) != 0)
56 libvarpd_panic("failed to inject event: %d\n", ret);
57 }
58
59 static void
60 svp_conn_degrade(svp_conn_t *scp)
61 {
62 svp_remote_t *srp = scp->sc_remote;
63
64 assert(MUTEX_HELD(&srp->sr_lock));
65 assert(MUTEX_HELD(&scp->sc_lock));
150 libvarpd_panic("failed to close socket %d: %d\n",
151 scp->sc_socket, errno);
152 scp->sc_socket = -1;
153
154 scp->sc_cstate = SVP_CS_BACKOFF;
155 scp->sc_nbackoff++;
156 if (scp->sc_nbackoff >= svp_conn_nbackoff) {
157 scp->sc_btimer.st_value =
158 svp_conn_backoff_tbl[svp_conn_nbackoff - 1];
159 } else {
160 scp->sc_btimer.st_value =
161 svp_conn_backoff_tbl[scp->sc_nbackoff - 1];
162 }
163 svp_timer_add(&scp->sc_btimer);
164
165 if (scp->sc_nbackoff > svp_conn_nbackoff)
166 return (SVP_RA_DEGRADE);
167 return (SVP_RA_NONE);
168 }
169
170 static svp_conn_act_t
171 svp_conn_connect(svp_conn_t *scp)
172 {
173 int ret;
174 struct sockaddr_in6 in6;
175
176 assert(MUTEX_HELD(&scp->sc_lock));
177 assert(scp->sc_cstate == SVP_CS_BACKOFF ||
178 scp->sc_cstate == SVP_CS_INITIAL);
179 assert(scp->sc_socket == -1);
180 if (scp->sc_cstate == SVP_CS_INITIAL)
181 scp->sc_nbackoff = 0;
182
183 scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
184 if (scp->sc_socket == -1) {
185 scp->sc_error = SVP_CE_SOCKET;
186 scp->sc_errno = errno;
187 scp->sc_cstate = SVP_CS_ERROR;
188 return (SVP_RA_DEGRADE);
189 }
190
191 bzero(&in6, sizeof (struct sockaddr_in6));
192 in6.sin6_family = AF_INET6;
193 in6.sin6_port = htons(scp->sc_remote->sr_rport);
194 bcopy(&scp->sc_addr, &in6.sin6_addr, sizeof (struct in6_addr));
195 ret = connect(scp->sc_socket, (struct sockaddr *)&in6,
196 sizeof (struct sockaddr_in6));
197 if (ret != 0) {
198 boolean_t async = B_FALSE;
199
200 switch (errno) {
201 case EACCES:
202 case EADDRINUSE:
235 } else {
236 /*
237 * This call failed, which means that we obtained one of
238 * the following:
239 *
240 * EADDRNOTAVAIL
241 * ECONNREFUSED
242 * EIO
243 * ENETUNREACH
244 * EHOSTUNREACH
245 * ENXIO
246 * ETIMEDOUT
247 *
248 * Therefore we need to set ourselves into backoff and
249 * wait for that to clear up.
250 */
251 return (svp_conn_backoff(scp));
252 }
253 }
254
255 /*
256 * We've connected. Successfully move ourselves to the bound
257 * state and start polling.
258 */
259 scp->sc_cstate = SVP_CS_ACTIVE;
260 scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
261 ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
262 if (ret == 0)
263 return (SVP_RA_RESTORE);
264 scp->sc_error = SVP_CE_ASSOCIATE;
265 scp->sc_cstate = SVP_CS_ERROR;
266
267 return (SVP_RA_DEGRADE);
268 }
269
270 /*
271 * This should be the first call we get after a connect. If we have successfully
272 * connected, we should see a writeable event. We may also see an error or a
273 * hang up. In either of these cases, we transition to error mode. If there is
274 * also a readable event, we ignore it at the moment and just let a
275 * reassociation pick it up so we can simplify the set of state transitions that
276 * we have.
277 */
278 static svp_conn_act_t
279 svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
280 {
281 int ret, err;
282 socklen_t sl = sizeof (err);
283 if (!(pe->portev_events & POLLOUT)) {
284 scp->sc_errno = 0;
285 scp->sc_error = SVP_CE_NOPOLLOUT;
286 scp->sc_cstate = SVP_CS_ERROR;
287 return (SVP_RA_DEGRADE);
288 }
289
290 ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err, &sl);
291 if (ret != 0)
292 libvarpd_panic("unanticipated getsockopt error");
293 if (err != 0) {
294 return (svp_conn_backoff(scp));
295 }
296
297 scp->sc_cstate = SVP_CS_ACTIVE;
298 scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
299 ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
300 if (ret == 0)
301 return (SVP_RA_RESTORE);
302 scp->sc_error = SVP_CE_ASSOCIATE;
303 scp->sc_errno = ret;
304 scp->sc_cstate = SVP_CS_ERROR;
305 return (SVP_RA_DEGRADE);
306 }
307
308 static svp_conn_act_t
309 svp_conn_pollout(svp_conn_t *scp)
310 {
311 svp_query_t *sqp;
312 svp_req_t *req;
313 size_t off;
314 struct iovec iov[2];
315 int nvecs = 0;
316 ssize_t ret;
340 }
341
342 sqp = scp->sc_output.sco_query;
343 req = &sqp->sq_header;
344 off = scp->sc_output.sco_offset;
345 if (off < sizeof (svp_req_t)) {
346 iov[nvecs].iov_base = (void *)((uintptr_t)req + off);
347 iov[nvecs].iov_len = sizeof (svp_req_t) - off;
348 nvecs++;
349 off = 0;
350 } else {
351 off -= sizeof (svp_req_t);
352 }
353
354 iov[nvecs].iov_base = (void *)((uintptr_t)sqp->sq_rdata + off);
355 iov[nvecs].iov_len = sqp->sq_rsize - off;
356 nvecs++;
357
358 do {
359 ret = writev(scp->sc_socket, iov, nvecs);
360 } while (ret == -1 && errno == EAGAIN);
361 if (ret == -1) {
362 switch (errno) {
363 case EAGAIN:
364 scp->sc_event.se_events |= POLLOUT;
365 return (SVP_RA_NONE);
366 case EIO:
367 case ENXIO:
368 case ECONNRESET:
369 return (SVP_RA_ERROR);
370 default:
371 libvarpd_panic("unexpected errno: %d", errno);
372 }
373 }
374
375 sqp->sq_acttime = gethrtime();
376 scp->sc_output.sco_offset += ret;
377 if (ret >= sizeof (svp_req_t) + sqp->sq_rsize) {
378 sqp->sq_state = SVP_QUERY_READING;
379 scp->sc_output.sco_query = NULL;
380 scp->sc_output.sco_offset = 0;
381 scp->sc_event.se_events |= POLLOUT;
382 }
383 return (SVP_RA_NONE);
384 }
385
386 static boolean_t
387 svp_conn_pollin_validate(svp_conn_t *scp)
388 {
389 svp_query_t *sqp;
390 uint32_t nsize;
391 uint16_t nvers, nop;
392 svp_req_t *resp = &scp->sc_input.sci_req;
393
394 assert(MUTEX_HELD(&scp->sc_lock));
395
396 nvers = ntohs(resp->svp_ver);
397 nop = ntohs(resp->svp_op);
398 nsize = ntohl(resp->svp_size);
399
400 if (nvers != SVP_CURRENT_VERSION) {
401 (void) bunyan_warn(svp_bunyan, "unsupported version",
402 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
403 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
404 BUNYAN_T_INT32, "version", nvers,
405 BUNYAN_T_INT32, "operation", nop,
406 BUNYAN_T_INT32, "response_id", resp->svp_id,
407 BUNYAN_T_END);
408 return (B_FALSE);
409 }
410
411 if (nop != SVP_R_VL2_ACK && nop != SVP_R_VL3_ACK &&
412 nop != SVP_R_LOG_ACK && nop != SVP_R_LOG_RM_ACK) {
413 (void) bunyan_warn(svp_bunyan, "unsupported operation",
414 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
415 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
416 BUNYAN_T_INT32, "version", nvers,
417 BUNYAN_T_INT32, "operation", nop,
418 BUNYAN_T_INT32, "response_id", resp->svp_id,
419 BUNYAN_T_END);
420 return (B_FALSE);
421 }
422
423 sqp = svp_conn_query_find(scp, resp->svp_id);
424 if (sqp == NULL) {
|
26 #include <strings.h>
27 #include <unistd.h>
28 #include <stddef.h>
29 #include <sys/uio.h>
30 #include <sys/debug.h>
31
32 #include <libvarpd_svp.h>
33
34 int svp_conn_query_timeout = 30;
35 static int svp_conn_backoff_tbl[] = { 1, 2, 4, 8, 16, 32 };
36 static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int);
37
38 typedef enum svp_conn_act {
39 SVP_RA_NONE = 0x00,
40 SVP_RA_DEGRADE = 0x01,
41 SVP_RA_RESTORE = 0x02,
42 SVP_RA_ERROR = 0x03,
43 SVP_RA_CLEANUP = 0x04
44 } svp_conn_act_t;
45
46 static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
47
48 static void
49 svp_conn_inject(svp_conn_t *scp)
50 {
51 int ret;
52 assert(MUTEX_HELD(&scp->sc_lock));
53
54 if (scp->sc_flags & SVP_CF_USER)
55 return;
56 scp->sc_flags |= SVP_CF_USER;
57 if ((ret = svp_event_inject(&scp->sc_event)) != 0)
58 libvarpd_panic("failed to inject event: %d\n", ret);
59 }
60
61 static void
62 svp_conn_degrade(svp_conn_t *scp)
63 {
64 svp_remote_t *srp = scp->sc_remote;
65
66 assert(MUTEX_HELD(&srp->sr_lock));
67 assert(MUTEX_HELD(&scp->sc_lock));
152 libvarpd_panic("failed to close socket %d: %d\n",
153 scp->sc_socket, errno);
154 scp->sc_socket = -1;
155
156 scp->sc_cstate = SVP_CS_BACKOFF;
157 scp->sc_nbackoff++;
158 if (scp->sc_nbackoff >= svp_conn_nbackoff) {
159 scp->sc_btimer.st_value =
160 svp_conn_backoff_tbl[svp_conn_nbackoff - 1];
161 } else {
162 scp->sc_btimer.st_value =
163 svp_conn_backoff_tbl[scp->sc_nbackoff - 1];
164 }
165 svp_timer_add(&scp->sc_btimer);
166
167 if (scp->sc_nbackoff > svp_conn_nbackoff)
168 return (SVP_RA_DEGRADE);
169 return (SVP_RA_NONE);
170 }
171
172 /*
173 * Think of this as an extension to the connect() call in svp_conn_connect().
174 * Send a message, receive it, and set the version here. If the response is
175 * too slow or the socket throws an error, indicate a socket error, which
176 * will cause the caller to backoff (i.e. close the socket and try again).
177 *
178 * Version mismatch (corrupt SVP server or too-advanced SVP server) is its
179 * own error type.
180 */
181 static svp_conn_error_t
182 svp_conn_version_set(svp_conn_t *scp)
183 {
184 svp_req_t ping;
185 ssize_t ret;
186 uint32_t save_crc;
187 uint16_t peer_version;
188 int ntries = 3; /* One second between tries. 3secs should be enough. */
189
190 ping.svp_ver = htons(SVP_CURRENT_VERSION);
191 ping.svp_op = htons(SVP_R_PING);
192 ping.svp_size = 0; /* Header-only... */
193 ping.svp_id = 0;
194 /* 0-length data... just use the req buffer for the pointer. */
195 svp_query_crc32(&ping, &ping, 0);
196
197 ret = write(scp->sc_socket, &ping, sizeof (ping));
198 if (ret == -1) {
199 /*
200 * A failed write() call right after connect probably
201 * indicates a larger connection failure. Restart the
202 * connection from scratch.
203 */
204 return (SVP_CE_SOCKET);
205 }
206 assert(ret == sizeof (ping));
207 do {
208 /*
209 * Asynch read. We may loop here once or twice.
210 * Wait a bit, but don't loop too many times...
211 */
212 (void) sleep(1);
213 ret = read(scp->sc_socket, &ping, sizeof (ping));
214 } while (--ntries > 0 &&
215 ret == -1 && (errno == EINTR || errno == EAGAIN));
216 if (ret == -1) {
217 /*
218 * This is actually a failed read() call. Restart the
219 * connection from scratch.
220 */
221 return (SVP_CE_SOCKET);
222 }
223
224 save_crc = ping.svp_crc32;
225 svp_query_crc32(&ping, &ping, 0);
226 peer_version = htons(ping.svp_ver);
227 if (ping.svp_op != htons(SVP_R_PONG) ||
228 ping.svp_size != 0 || ping.svp_id != 0 ||
229 ping.svp_crc32 != save_crc ||
230 peer_version == 0 || peer_version > SVP_CURRENT_VERSION) {
231 return (SVP_CE_VERSION_PONG);
232 }
233
234 /* This connection now has a version! */
235 scp->sc_version = peer_version;
236 return (SVP_CE_NONE);
237 }
238
239 static svp_conn_act_t
240 svp_conn_connect(svp_conn_t *scp)
241 {
242 int ret;
243 struct sockaddr_in6 in6;
244
245 assert(MUTEX_HELD(&scp->sc_lock));
246 assert(scp->sc_cstate == SVP_CS_BACKOFF ||
247 scp->sc_cstate == SVP_CS_INITIAL);
248 assert(scp->sc_socket == -1);
249 if (scp->sc_cstate == SVP_CS_INITIAL)
250 scp->sc_nbackoff = 0;
251
252 /* New connect means we need to know the version. */
253 scp->sc_version = 0;
254
255 scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
256 if (scp->sc_socket == -1) {
257 scp->sc_error = SVP_CE_SOCKET;
258 scp->sc_errno = errno;
259 scp->sc_cstate = SVP_CS_ERROR;
260 return (SVP_RA_DEGRADE);
261 }
262
263 bzero(&in6, sizeof (struct sockaddr_in6));
264 in6.sin6_family = AF_INET6;
265 in6.sin6_port = htons(scp->sc_remote->sr_rport);
266 bcopy(&scp->sc_addr, &in6.sin6_addr, sizeof (struct in6_addr));
267 ret = connect(scp->sc_socket, (struct sockaddr *)&in6,
268 sizeof (struct sockaddr_in6));
269 if (ret != 0) {
270 boolean_t async = B_FALSE;
271
272 switch (errno) {
273 case EACCES:
274 case EADDRINUSE:
307 } else {
308 /*
309 * This call failed, which means that we obtained one of
310 * the following:
311 *
312 * EADDRNOTAVAIL
313 * ECONNREFUSED
314 * EIO
315 * ENETUNREACH
316 * EHOSTUNREACH
317 * ENXIO
318 * ETIMEDOUT
319 *
320 * Therefore we need to set ourselves into backoff and
321 * wait for that to clear up.
322 */
323 return (svp_conn_backoff(scp));
324 }
325 }
326
327 return (svp_conn_poll_connect(NULL, scp));
328 }
329
330 /*
331 * This should be the first call we get after a successful synchronous
332 * connect, or a completed (failed or successful) asynchronous connect. A
333 * non-NULL port-event indicates asynchronous completion, a NULL port-event
334 * indicates a successful synchronous connect.
335 *
336 * If we have successfully connected, we should see a writeable event. In the
337 * asynchronous case, we may also see an error or a hang up. For either hang
338 * up or error, we transition to error mode. If there is also a readable event
339 * (i.e. incoming data), we ignore it at the moment and just let a
340 * reassociation pick it up so we can simplify the set of state transitions
341 * that we have.
342 */
343 static svp_conn_act_t
344 svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
345 {
346 int ret;
347 svp_conn_error_t version_error;
348
349 if (pe != NULL) {
350 int err;
351 socklen_t sl = sizeof (err);
352
353 /*
354 * These bits only matter if we're notified of an
355 * asynchronous connection completion.
356 */
357 if (!(pe->portev_events & POLLOUT)) {
358 scp->sc_errno = 0;
359 scp->sc_error = SVP_CE_NOPOLLOUT;
360 scp->sc_cstate = SVP_CS_ERROR;
361 return (SVP_RA_DEGRADE);
362 }
363
364 ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err,
365 &sl);
366 if (ret != 0)
367 libvarpd_panic("unanticipated getsockopt error");
368 if (err != 0) {
369 return (svp_conn_backoff(scp));
370 }
371 }
372
373 /* Use a single SVP_R_PING to determine the version. */
374 version_error = svp_conn_version_set(scp);
375 switch (version_error) {
376 case SVP_CE_SOCKET:
377 /* Use this to signal read/write errors... */
378 return (svp_conn_backoff(scp));
379 case SVP_CE_NONE:
380 assert(scp->sc_version > 0 &&
381 scp->sc_version <= SVP_CURRENT_VERSION);
382 break;
383 default:
384 scp->sc_error = version_error;
385 scp->sc_cstate = SVP_CS_ERROR;
386 scp->sc_errno = EPROTONOSUPPORT; /* Protocol error... */
387 return (SVP_RA_DEGRADE);
388 }
389
390 scp->sc_cstate = SVP_CS_ACTIVE;
391 scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
392 ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
393 if (ret == 0)
394 return (SVP_RA_RESTORE);
395 scp->sc_error = SVP_CE_ASSOCIATE;
396 scp->sc_errno = ret;
397 scp->sc_cstate = SVP_CS_ERROR;
398 return (SVP_RA_DEGRADE);
399 }
400
401 static svp_conn_act_t
402 svp_conn_pollout(svp_conn_t *scp)
403 {
404 svp_query_t *sqp;
405 svp_req_t *req;
406 size_t off;
407 struct iovec iov[2];
408 int nvecs = 0;
409 ssize_t ret;
433 }
434
435 sqp = scp->sc_output.sco_query;
436 req = &sqp->sq_header;
437 off = scp->sc_output.sco_offset;
438 if (off < sizeof (svp_req_t)) {
439 iov[nvecs].iov_base = (void *)((uintptr_t)req + off);
440 iov[nvecs].iov_len = sizeof (svp_req_t) - off;
441 nvecs++;
442 off = 0;
443 } else {
444 off -= sizeof (svp_req_t);
445 }
446
447 iov[nvecs].iov_base = (void *)((uintptr_t)sqp->sq_rdata + off);
448 iov[nvecs].iov_len = sqp->sq_rsize - off;
449 nvecs++;
450
451 do {
452 ret = writev(scp->sc_socket, iov, nvecs);
453 } while (ret == -1 && errno == EINTR);
454 if (ret == -1) {
455 switch (errno) {
456 case EAGAIN:
457 scp->sc_event.se_events |= POLLOUT;
458 return (SVP_RA_NONE);
459 case EIO:
460 case ENXIO:
461 case ECONNRESET:
462 return (SVP_RA_ERROR);
463 default:
464 libvarpd_panic("unexpected errno: %d", errno);
465 }
466 }
467
468 sqp->sq_acttime = gethrtime();
469 scp->sc_output.sco_offset += ret;
470 if (ret >= sizeof (svp_req_t) + sqp->sq_rsize) {
471 sqp->sq_state = SVP_QUERY_READING;
472 scp->sc_output.sco_query = NULL;
473 scp->sc_output.sco_offset = 0;
474 scp->sc_event.se_events |= POLLOUT;
475 }
476 return (SVP_RA_NONE);
477 }
478
479 static boolean_t
480 svp_conn_pollin_validate(svp_conn_t *scp)
481 {
482 svp_query_t *sqp;
483 uint32_t nsize;
484 uint16_t nvers, nop;
485 svp_req_t *resp = &scp->sc_input.sci_req;
486
487 assert(MUTEX_HELD(&scp->sc_lock));
488
489 nvers = ntohs(resp->svp_ver);
490 nop = ntohs(resp->svp_op);
491 nsize = ntohl(resp->svp_size);
492
493 /*
494 * A peer that's messing with post-connection version changes is
495 * likely a broken peer.
496 */
497 if (nvers != scp->sc_version) {
498 (void) bunyan_warn(svp_bunyan, "version mismatch",
499 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
500 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
501 BUNYAN_T_INT32, "peer version", nvers,
502 BUNYAN_T_INT32, "our version", scp->sc_version,
503 BUNYAN_T_INT32, "operation", nop,
504 BUNYAN_T_INT32, "response_id", resp->svp_id,
505 BUNYAN_T_END);
506 return (B_FALSE);
507 }
508
509 if (nop != SVP_R_VL2_ACK && nop != SVP_R_VL3_ACK &&
510 nop != SVP_R_LOG_ACK && nop != SVP_R_LOG_RM_ACK) {
511 (void) bunyan_warn(svp_bunyan, "unsupported operation",
512 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
513 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
514 BUNYAN_T_INT32, "version", nvers,
515 BUNYAN_T_INT32, "operation", nop,
516 BUNYAN_T_INT32, "response_id", resp->svp_id,
517 BUNYAN_T_END);
518 return (B_FALSE);
519 }
520
521 sqp = svp_conn_query_find(scp, resp->svp_id);
522 if (sqp == NULL) {
|