Print this page
Version bump SVP to 2
| Split |
Close |
| Expand all |
| Collapse all |
--- old/usr/src/lib/varpd/svp/common/libvarpd_svp_conn.c
+++ new/usr/src/lib/varpd/svp/common/libvarpd_svp_conn.c
1 1 /*
2 2 * This file and its contents are supplied under the terms of the
3 3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 4 * You may only use this file in accordance with the terms of version
5 5 * 1.0 of the CDDL.
6 6 *
7 7 * A full copy of the text of the CDDL should have accompanied this
8 8 * source. A copy of the CDDL is also available via the Internet at
9 9 * http://www.illumos.org/license/CDDL.
10 10 */
11 11
12 12 /*
13 13 * Copyright 2015 Joyent, Inc.
14 14 */
15 15
16 16 /*
17 17 * Logic to manage an individual connection to a remote host.
18 18 *
19 19 * For more information, see the big theory statement in
20 20 * lib/varpd/svp/common/libvarpd_svp.c.
21 21 */
22 22
23 23 #include <assert.h>
24 24 #include <umem.h>
25 25 #include <errno.h>
26 26 #include <strings.h>
27 27 #include <unistd.h>
28 28 #include <stddef.h>
29 29 #include <sys/uio.h>
30 30 #include <sys/debug.h>
31 31
32 32 #include <libvarpd_svp.h>
33 33
34 34 int svp_conn_query_timeout = 30;
35 35 static int svp_conn_backoff_tbl[] = { 1, 2, 4, 8, 16, 32 };
|
↓ open down ↓ |
35 lines elided |
↑ open up ↑ |
36 36 static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int);
37 37
38 38 typedef enum svp_conn_act {
39 39 SVP_RA_NONE = 0x00,
40 40 SVP_RA_DEGRADE = 0x01,
41 41 SVP_RA_RESTORE = 0x02,
42 42 SVP_RA_ERROR = 0x03,
43 43 SVP_RA_CLEANUP = 0x04
44 44 } svp_conn_act_t;
45 45
46 +static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
47 +
46 48 static void
47 49 svp_conn_inject(svp_conn_t *scp)
48 50 {
49 51 int ret;
50 52 assert(MUTEX_HELD(&scp->sc_lock));
51 53
52 54 if (scp->sc_flags & SVP_CF_USER)
53 55 return;
54 56 scp->sc_flags |= SVP_CF_USER;
55 57 if ((ret = svp_event_inject(&scp->sc_event)) != 0)
56 58 libvarpd_panic("failed to inject event: %d\n", ret);
57 59 }
58 60
59 61 static void
60 62 svp_conn_degrade(svp_conn_t *scp)
61 63 {
62 64 svp_remote_t *srp = scp->sc_remote;
63 65
64 66 assert(MUTEX_HELD(&srp->sr_lock));
65 67 assert(MUTEX_HELD(&scp->sc_lock));
66 68
67 69 if (scp->sc_flags & SVP_CF_DEGRADED)
68 70 return;
69 71
70 72 scp->sc_flags |= SVP_CF_DEGRADED;
71 73 srp->sr_ndconns++;
72 74 if (srp->sr_ndconns == srp->sr_tconns)
73 75 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
74 76 }
75 77
76 78 static void
77 79 svp_conn_restore(svp_conn_t *scp)
78 80 {
79 81 svp_remote_t *srp = scp->sc_remote;
80 82
81 83 assert(MUTEX_HELD(&srp->sr_lock));
82 84 assert(MUTEX_HELD(&scp->sc_lock));
83 85
84 86 if (!(scp->sc_flags & SVP_CF_DEGRADED))
85 87 return;
86 88
87 89 scp->sc_flags &= ~SVP_CF_DEGRADED;
88 90 if (srp->sr_ndconns == srp->sr_tconns)
89 91 svp_remote_restore(srp, SVP_RD_REMOTE_FAIL);
90 92 srp->sr_ndconns--;
91 93 }
92 94
93 95 static void
94 96 svp_conn_add(svp_conn_t *scp)
95 97 {
96 98 svp_remote_t *srp = scp->sc_remote;
97 99
98 100 assert(MUTEX_HELD(&srp->sr_lock));
99 101 assert(MUTEX_HELD(&scp->sc_lock));
100 102
101 103 if (scp->sc_flags & SVP_CF_ADDED)
102 104 return;
103 105
104 106 list_insert_tail(&srp->sr_conns, scp);
105 107 scp->sc_flags |= SVP_CF_ADDED;
106 108 srp->sr_tconns++;
107 109 }
108 110
109 111 static void
110 112 svp_conn_remove(svp_conn_t *scp)
111 113 {
112 114 svp_remote_t *srp = scp->sc_remote;
113 115
114 116 assert(MUTEX_HELD(&srp->sr_lock));
115 117 assert(MUTEX_HELD(&scp->sc_lock));
116 118
117 119 if (!(scp->sc_flags & SVP_CF_ADDED))
118 120 return;
119 121
120 122 scp->sc_flags &= ~SVP_CF_ADDED;
121 123 if (scp->sc_flags & SVP_CF_DEGRADED)
122 124 srp->sr_ndconns--;
123 125 srp->sr_tconns--;
124 126 if (srp->sr_tconns == srp->sr_ndconns)
125 127 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
126 128 }
127 129
128 130 static svp_query_t *
129 131 svp_conn_query_find(svp_conn_t *scp, uint32_t id)
130 132 {
131 133 svp_query_t *sqp;
132 134
133 135 assert(MUTEX_HELD(&scp->sc_lock));
134 136
135 137 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
136 138 sqp = list_next(&scp->sc_queries, sqp)) {
137 139 if (sqp->sq_header.svp_id == id)
138 140 break;
139 141 }
140 142
141 143 return (sqp);
142 144 }
143 145
144 146 static svp_conn_act_t
145 147 svp_conn_backoff(svp_conn_t *scp)
146 148 {
147 149 assert(MUTEX_HELD(&scp->sc_lock));
148 150
149 151 if (close(scp->sc_socket) != 0)
150 152 libvarpd_panic("failed to close socket %d: %d\n",
151 153 scp->sc_socket, errno);
152 154 scp->sc_socket = -1;
153 155
154 156 scp->sc_cstate = SVP_CS_BACKOFF;
155 157 scp->sc_nbackoff++;
156 158 if (scp->sc_nbackoff >= svp_conn_nbackoff) {
157 159 scp->sc_btimer.st_value =
158 160 svp_conn_backoff_tbl[svp_conn_nbackoff - 1];
159 161 } else {
|
↓ open down ↓ |
104 lines elided |
↑ open up ↑ |
160 162 scp->sc_btimer.st_value =
161 163 svp_conn_backoff_tbl[scp->sc_nbackoff - 1];
162 164 }
163 165 svp_timer_add(&scp->sc_btimer);
164 166
165 167 if (scp->sc_nbackoff > svp_conn_nbackoff)
166 168 return (SVP_RA_DEGRADE);
167 169 return (SVP_RA_NONE);
168 170 }
169 171
172 +/*
173 + * Think of this as an extension to the connect() call in svp_conn_connect().
174 + * Send a message, receive it, and set the version here. If the response is
175 + * too slow or the socket throws an error, indicate a socket error, which
176 + * will cause the caller to backoff (i.e. close the socket and try again).
177 + *
178 + * Version mismatch (corrupt SVP server or too-advanced SVP server) is its
179 + * own error type.
180 + */
181 +static svp_conn_error_t
182 +svp_conn_version_set(svp_conn_t *scp)
183 +{
184 + svp_req_t ping;
185 + ssize_t ret;
186 + uint32_t save_crc;
187 + uint16_t peer_version;
188 + int ntries = 3; /* One second between tries. 3secs should be enough. */
189 +
190 + ping.svp_ver = htons(SVP_CURRENT_VERSION);
191 + ping.svp_op = htons(SVP_R_PING);
192 + ping.svp_size = 0; /* Header-only... */
193 + ping.svp_id = 0;
194 + /* 0-length data... just use the req buffer for the pointer. */
195 + svp_query_crc32(&ping, &ping, 0);
196 +
197 + ret = write(scp->sc_socket, &ping, sizeof (ping));
198 + if (ret == -1) {
199 + /*
200 + * A failed write() call right after connect probably
201 + * indicates a larger connection failure. Restart the
202 + * connection from scratch.
203 + */
204 + return (SVP_CE_SOCKET);
205 + }
206 + assert(ret == sizeof (ping));
207 + do {
208 + /*
209 + * Asynch read. We may loop here once or twice.
210 + * Wait a bit, but don't loop too many times...
211 + */
212 + (void) sleep(1);
213 + ret = read(scp->sc_socket, &ping, sizeof (ping));
214 + } while (--ntries > 0 &&
215 + ret == -1 && (errno == EINTR || errno == EAGAIN));
216 + if (ret == -1) {
217 + /*
218 + * This is actually a failed read() call. Restart the
219 + * connection from scratch.
220 + */
221 + return (SVP_CE_SOCKET);
222 + }
223 +
224 + save_crc = ping.svp_crc32;
225 + svp_query_crc32(&ping, &ping, 0);
226 + peer_version = htons(ping.svp_ver);
227 + if (ping.svp_op != htons(SVP_R_PONG) ||
228 + ping.svp_size != 0 || ping.svp_id != 0 ||
229 + ping.svp_crc32 != save_crc ||
230 + peer_version == 0 || peer_version > SVP_CURRENT_VERSION) {
231 + return (SVP_CE_VERSION_PONG);
232 + }
233 +
234 + /* This connection now has a version! */
235 + scp->sc_version = peer_version;
236 + return (SVP_CE_NONE);
237 +}
238 +
170 239 static svp_conn_act_t
171 240 svp_conn_connect(svp_conn_t *scp)
172 241 {
173 242 int ret;
174 243 struct sockaddr_in6 in6;
175 244
176 245 assert(MUTEX_HELD(&scp->sc_lock));
177 246 assert(scp->sc_cstate == SVP_CS_BACKOFF ||
178 247 scp->sc_cstate == SVP_CS_INITIAL);
179 248 assert(scp->sc_socket == -1);
180 249 if (scp->sc_cstate == SVP_CS_INITIAL)
181 250 scp->sc_nbackoff = 0;
182 251
252 + /* New connect means we need to know the version. */
253 + scp->sc_version = 0;
254 +
183 255 scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
184 256 if (scp->sc_socket == -1) {
185 257 scp->sc_error = SVP_CE_SOCKET;
186 258 scp->sc_errno = errno;
187 259 scp->sc_cstate = SVP_CS_ERROR;
188 260 return (SVP_RA_DEGRADE);
189 261 }
190 262
191 263 bzero(&in6, sizeof (struct sockaddr_in6));
192 264 in6.sin6_family = AF_INET6;
193 265 in6.sin6_port = htons(scp->sc_remote->sr_rport);
194 266 bcopy(&scp->sc_addr, &in6.sin6_addr, sizeof (struct in6_addr));
195 267 ret = connect(scp->sc_socket, (struct sockaddr *)&in6,
196 268 sizeof (struct sockaddr_in6));
197 269 if (ret != 0) {
198 270 boolean_t async = B_FALSE;
199 271
200 272 switch (errno) {
201 273 case EACCES:
202 274 case EADDRINUSE:
203 275 case EAFNOSUPPORT:
204 276 case EALREADY:
205 277 case EBADF:
206 278 case EISCONN:
207 279 case ELOOP:
208 280 case ENOENT:
209 281 case ENOSR:
210 282 case EWOULDBLOCK:
211 283 libvarpd_panic("unanticipated connect errno %d", errno);
212 284 break;
213 285 case EINPROGRESS:
214 286 case EINTR:
215 287 async = B_TRUE;
216 288 default:
217 289 break;
218 290 }
219 291
220 292 /*
221 293 * So, we will be connecting to this in the future, advance our
222 294 * state and make sure that we poll for the next round.
223 295 */
224 296 if (async == B_TRUE) {
225 297 scp->sc_cstate = SVP_CS_CONNECTING;
226 298 scp->sc_event.se_events = POLLOUT | POLLHUP;
227 299 ret = svp_event_associate(&scp->sc_event,
228 300 scp->sc_socket);
229 301 if (ret == 0)
230 302 return (SVP_RA_NONE);
231 303 scp->sc_error = SVP_CE_ASSOCIATE;
232 304 scp->sc_errno = ret;
233 305 scp->sc_cstate = SVP_CS_ERROR;
234 306 return (SVP_RA_DEGRADE);
235 307 } else {
236 308 /*
237 309 * This call failed, which means that we obtained one of
238 310 * the following:
239 311 *
240 312 * EADDRNOTAVAIL
241 313 * ECONNREFUSED
242 314 * EIO
243 315 * ENETUNREACH
244 316 * EHOSTUNREACH
|
↓ open down ↓ |
52 lines elided |
↑ open up ↑ |
245 317 * ENXIO
246 318 * ETIMEDOUT
247 319 *
248 320 * Therefore we need to set ourselves into backoff and
249 321 * wait for that to clear up.
250 322 */
251 323 return (svp_conn_backoff(scp));
252 324 }
253 325 }
254 326
255 - /*
256 - * We've connected. Successfully move ourselves to the bound
257 - * state and start polling.
258 - */
259 - scp->sc_cstate = SVP_CS_ACTIVE;
260 - scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
261 - ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
262 - if (ret == 0)
263 - return (SVP_RA_RESTORE);
264 - scp->sc_error = SVP_CE_ASSOCIATE;
265 - scp->sc_cstate = SVP_CS_ERROR;
266 -
267 - return (SVP_RA_DEGRADE);
327 + return (svp_conn_poll_connect(NULL, scp));
268 328 }
269 329
270 330 /*
271 - * This should be the first call we get after a connect. If we have successfully
272 - * connected, we should see a writeable event. We may also see an error or a
273 - * hang up. In either of these cases, we transition to error mode. If there is
274 - * also a readable event, we ignore it at the moment and just let a
275 - * reassociation pick it up so we can simplify the set of state transitions that
276 - * we have.
331 + * This should be the first call we get after a successful synchronous
332 + * connect, or a completed (failed or successful) asynchronous connect. A
333 + * non-NULL port-event indicates asynchronous completion, a NULL port-event
334 + * indicates a successful synchronous connect.
335 + *
336 + * If we have successfully connected, we should see a writeable event. In the
337 + * asynchronous case, we may also see an error or a hang up. For either hang
338 + * up or error, we transition to error mode. If there is also a readable event
339 + * (i.e. incoming data), we ignore it at the moment and just let a
340 + * reassociation pick it up so we can simplify the set of state transitions
341 + * that we have.
277 342 */
278 343 static svp_conn_act_t
279 344 svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
280 345 {
281 - int ret, err;
282 - socklen_t sl = sizeof (err);
283 - if (!(pe->portev_events & POLLOUT)) {
284 - scp->sc_errno = 0;
285 - scp->sc_error = SVP_CE_NOPOLLOUT;
286 - scp->sc_cstate = SVP_CS_ERROR;
287 - return (SVP_RA_DEGRADE);
346 + int ret;
347 + svp_conn_error_t version_error;
348 +
349 + if (pe != NULL) {
350 + int err;
351 + socklen_t sl = sizeof (err);
352 +
353 + /*
354 + * These bits only matter if we're notified of an
355 + * asynchronous connection completion.
356 + */
357 + if (!(pe->portev_events & POLLOUT)) {
358 + scp->sc_errno = 0;
359 + scp->sc_error = SVP_CE_NOPOLLOUT;
360 + scp->sc_cstate = SVP_CS_ERROR;
361 + return (SVP_RA_DEGRADE);
362 + }
363 +
364 + ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err,
365 + &sl);
366 + if (ret != 0)
367 + libvarpd_panic("unanticipated getsockopt error");
368 + if (err != 0) {
369 + return (svp_conn_backoff(scp));
370 + }
288 371 }
289 372
290 - ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err, &sl);
291 - if (ret != 0)
292 - libvarpd_panic("unanticipated getsockopt error");
293 - if (err != 0) {
373 + /* Use a single SVP_R_PING to determine the version. */
374 + version_error = svp_conn_version_set(scp);
375 + switch (version_error) {
376 + case SVP_CE_SOCKET:
377 + /* Use this to signal read/write errors... */
294 378 return (svp_conn_backoff(scp));
379 + case SVP_CE_NONE:
380 + assert(scp->sc_version > 0 &&
381 + scp->sc_version <= SVP_CURRENT_VERSION);
382 + break;
383 + default:
384 + scp->sc_error = version_error;
385 + scp->sc_cstate = SVP_CS_ERROR;
386 + scp->sc_errno = EPROTONOSUPPORT; /* Protocol error... */
387 + return (SVP_RA_DEGRADE);
295 388 }
296 389
297 390 scp->sc_cstate = SVP_CS_ACTIVE;
298 391 scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
299 392 ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
300 393 if (ret == 0)
301 394 return (SVP_RA_RESTORE);
302 395 scp->sc_error = SVP_CE_ASSOCIATE;
303 396 scp->sc_errno = ret;
304 397 scp->sc_cstate = SVP_CS_ERROR;
305 398 return (SVP_RA_DEGRADE);
306 399 }
307 400
308 401 static svp_conn_act_t
309 402 svp_conn_pollout(svp_conn_t *scp)
310 403 {
311 404 svp_query_t *sqp;
312 405 svp_req_t *req;
313 406 size_t off;
314 407 struct iovec iov[2];
315 408 int nvecs = 0;
316 409 ssize_t ret;
317 410
318 411 assert(MUTEX_HELD(&scp->sc_lock));
319 412
320 413 /*
321 414 * We need to find a query and start writing it out.
322 415 */
323 416 if (scp->sc_output.sco_query == NULL) {
324 417 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
325 418 sqp = list_next(&scp->sc_queries, sqp)) {
326 419 if (sqp->sq_state != SVP_QUERY_INIT)
327 420 continue;
328 421 break;
329 422 }
330 423
331 424 if (sqp == NULL) {
332 425 scp->sc_event.se_events &= ~POLLOUT;
333 426 return (SVP_RA_NONE);
334 427 }
335 428
336 429 scp->sc_output.sco_query = sqp;
337 430 scp->sc_output.sco_offset = 0;
338 431 sqp->sq_state = SVP_QUERY_WRITING;
339 432 svp_query_crc32(&sqp->sq_header, sqp->sq_rdata, sqp->sq_rsize);
340 433 }
341 434
342 435 sqp = scp->sc_output.sco_query;
343 436 req = &sqp->sq_header;
344 437 off = scp->sc_output.sco_offset;
345 438 if (off < sizeof (svp_req_t)) {
346 439 iov[nvecs].iov_base = (void *)((uintptr_t)req + off);
347 440 iov[nvecs].iov_len = sizeof (svp_req_t) - off;
348 441 nvecs++;
349 442 off = 0;
|
↓ open down ↓ |
45 lines elided |
↑ open up ↑ |
350 443 } else {
351 444 off -= sizeof (svp_req_t);
352 445 }
353 446
354 447 iov[nvecs].iov_base = (void *)((uintptr_t)sqp->sq_rdata + off);
355 448 iov[nvecs].iov_len = sqp->sq_rsize - off;
356 449 nvecs++;
357 450
358 451 do {
359 452 ret = writev(scp->sc_socket, iov, nvecs);
360 - } while (ret == -1 && errno == EAGAIN);
453 + } while (ret == -1 && errno == EINTR);
361 454 if (ret == -1) {
362 455 switch (errno) {
363 456 case EAGAIN:
364 457 scp->sc_event.se_events |= POLLOUT;
365 458 return (SVP_RA_NONE);
366 459 case EIO:
367 460 case ENXIO:
368 461 case ECONNRESET:
369 462 return (SVP_RA_ERROR);
370 463 default:
371 464 libvarpd_panic("unexpected errno: %d", errno);
372 465 }
373 466 }
374 467
375 468 sqp->sq_acttime = gethrtime();
376 469 scp->sc_output.sco_offset += ret;
377 470 if (ret >= sizeof (svp_req_t) + sqp->sq_rsize) {
378 471 sqp->sq_state = SVP_QUERY_READING;
379 472 scp->sc_output.sco_query = NULL;
380 473 scp->sc_output.sco_offset = 0;
381 474 scp->sc_event.se_events |= POLLOUT;
382 475 }
383 476 return (SVP_RA_NONE);
384 477 }
385 478
386 479 static boolean_t
387 480 svp_conn_pollin_validate(svp_conn_t *scp)
388 481 {
389 482 svp_query_t *sqp;
|
↓ open down ↓ |
19 lines elided |
↑ open up ↑ |
390 483 uint32_t nsize;
391 484 uint16_t nvers, nop;
392 485 svp_req_t *resp = &scp->sc_input.sci_req;
393 486
394 487 assert(MUTEX_HELD(&scp->sc_lock));
395 488
396 489 nvers = ntohs(resp->svp_ver);
397 490 nop = ntohs(resp->svp_op);
398 491 nsize = ntohl(resp->svp_size);
399 492
400 - if (nvers != SVP_CURRENT_VERSION) {
401 - (void) bunyan_warn(svp_bunyan, "unsupported version",
493 + /*
494 + * A peer that's messing with post-connection version changes is
495 + * likely a broken peer.
496 + */
497 + if (nvers != scp->sc_version) {
498 + (void) bunyan_warn(svp_bunyan, "version mismatch",
402 499 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
403 500 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
404 - BUNYAN_T_INT32, "version", nvers,
501 + BUNYAN_T_INT32, "peer version", nvers,
502 + BUNYAN_T_INT32, "our version", scp->sc_version,
405 503 BUNYAN_T_INT32, "operation", nop,
406 504 BUNYAN_T_INT32, "response_id", resp->svp_id,
407 505 BUNYAN_T_END);
408 506 return (B_FALSE);
409 507 }
410 508
411 509 if (nop != SVP_R_VL2_ACK && nop != SVP_R_VL3_ACK &&
412 510 nop != SVP_R_LOG_ACK && nop != SVP_R_LOG_RM_ACK) {
413 511 (void) bunyan_warn(svp_bunyan, "unsupported operation",
414 512 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
415 513 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
416 514 BUNYAN_T_INT32, "version", nvers,
417 515 BUNYAN_T_INT32, "operation", nop,
418 516 BUNYAN_T_INT32, "response_id", resp->svp_id,
419 517 BUNYAN_T_END);
420 518 return (B_FALSE);
421 519 }
422 520
423 521 sqp = svp_conn_query_find(scp, resp->svp_id);
424 522 if (sqp == NULL) {
425 523 (void) bunyan_warn(svp_bunyan, "unknown response id",
426 524 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
427 525 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
428 526 BUNYAN_T_INT32, "version", nvers,
429 527 BUNYAN_T_INT32, "operation", nop,
430 528 BUNYAN_T_INT32, "response_id", resp->svp_id,
431 529 BUNYAN_T_END);
432 530 return (B_FALSE);
433 531 }
434 532
435 533 if (sqp->sq_state != SVP_QUERY_READING) {
436 534 (void) bunyan_warn(svp_bunyan,
437 535 "got response for unexpecting query",
438 536 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
439 537 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
440 538 BUNYAN_T_INT32, "version", nvers,
441 539 BUNYAN_T_INT32, "operation", nop,
442 540 BUNYAN_T_INT32, "response_id", resp->svp_id,
443 541 BUNYAN_T_INT32, "query_state", sqp->sq_state,
444 542 BUNYAN_T_END);
445 543 return (B_FALSE);
446 544 }
447 545
448 546 if ((nop == SVP_R_VL2_ACK && nsize != sizeof (svp_vl2_ack_t)) ||
449 547 (nop == SVP_R_VL3_ACK && nsize != sizeof (svp_vl3_ack_t)) ||
450 548 (nop == SVP_R_LOG_RM_ACK && nsize != sizeof (svp_lrm_ack_t))) {
451 549 (void) bunyan_warn(svp_bunyan, "response size too large",
452 550 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
453 551 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
454 552 BUNYAN_T_INT32, "version", nvers,
455 553 BUNYAN_T_INT32, "operation", nop,
456 554 BUNYAN_T_INT32, "response_id", resp->svp_id,
457 555 BUNYAN_T_INT32, "response_size", nsize,
458 556 BUNYAN_T_INT32, "expected_size", nop == SVP_R_VL2_ACK ?
459 557 sizeof (svp_vl2_ack_t) : sizeof (svp_vl3_ack_t),
460 558 BUNYAN_T_INT32, "query_state", sqp->sq_state,
461 559 BUNYAN_T_END);
462 560 return (B_FALSE);
463 561 }
464 562
465 563 /*
466 564 * The valid size is anything <= to what the user requested, but at
467 565 * least svp_log_ack_t bytes large.
468 566 */
469 567 if (nop == SVP_R_LOG_ACK) {
470 568 const char *msg = NULL;
471 569 if (nsize < sizeof (svp_log_ack_t))
472 570 msg = "response size too small";
473 571 else if (nsize > ((svp_log_req_t *)sqp->sq_rdata)->svlr_count)
474 572 msg = "response size too large";
475 573 if (msg != NULL) {
476 574 (void) bunyan_warn(svp_bunyan, msg,
477 575 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
478 576 BUNYAN_T_INT32, "remote_port",
479 577 scp->sc_remote->sr_rport,
480 578 BUNYAN_T_INT32, "version", nvers,
481 579 BUNYAN_T_INT32, "operation", nop,
482 580 BUNYAN_T_INT32, "response_id", resp->svp_id,
483 581 BUNYAN_T_INT32, "response_size", nsize,
484 582 BUNYAN_T_INT32, "expected_size",
485 583 ((svp_log_req_t *)sqp->sq_rdata)->svlr_count,
486 584 BUNYAN_T_INT32, "query_state", sqp->sq_state,
487 585 BUNYAN_T_END);
488 586 return (B_FALSE);
489 587 }
490 588 }
491 589
492 590 sqp->sq_size = nsize;
493 591 scp->sc_input.sci_query = sqp;
494 592 if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK ||
495 593 nop == SVP_R_LOG_RM_ACK) {
496 594 sqp->sq_wdata = &sqp->sq_wdun;
497 595 sqp->sq_wsize = sizeof (svp_query_data_t);
498 596 } else {
499 597 VERIFY(nop == SVP_R_LOG_ACK);
500 598 assert(sqp->sq_wdata != NULL);
501 599 assert(sqp->sq_wsize != 0);
502 600 }
503 601
504 602 return (B_TRUE);
505 603 }
506 604
507 605 static svp_conn_act_t
508 606 svp_conn_pollin(svp_conn_t *scp)
509 607 {
510 608 size_t off, total;
511 609 ssize_t ret;
512 610 svp_query_t *sqp;
513 611 uint32_t crc;
514 612 uint16_t nop;
515 613
516 614 assert(MUTEX_HELD(&scp->sc_lock));
517 615
518 616 /*
519 617 * No query implies that we're reading in the header and that the offset
520 618 * is associted with it.
521 619 */
522 620 off = scp->sc_input.sci_offset;
523 621 sqp = scp->sc_input.sci_query;
524 622 if (scp->sc_input.sci_query == NULL) {
525 623 svp_req_t *resp = &scp->sc_input.sci_req;
526 624
527 625 assert(off < sizeof (svp_req_t));
528 626
529 627 do {
530 628 ret = read(scp->sc_socket,
531 629 (void *)((uintptr_t)resp + off),
532 630 sizeof (svp_req_t) - off);
533 631 } while (ret == -1 && errno == EINTR);
534 632 if (ret == -1) {
535 633 switch (errno) {
536 634 case EAGAIN:
537 635 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
538 636 return (SVP_RA_NONE);
539 637 case EIO:
540 638 case ECONNRESET:
541 639 return (SVP_RA_ERROR);
542 640 break;
543 641 default:
544 642 libvarpd_panic("unexpeted read errno: %d",
545 643 errno);
546 644 }
547 645 } else if (ret == 0) {
548 646 /* Try to reconnect to the remote host */
549 647 return (SVP_RA_ERROR);
550 648 }
551 649
552 650 /* Didn't get all the data we need */
553 651 if (off + ret < sizeof (svp_req_t)) {
554 652 scp->sc_input.sci_offset += ret;
555 653 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
556 654 return (SVP_RA_NONE);
557 655 }
558 656
559 657 if (svp_conn_pollin_validate(scp) != B_TRUE)
560 658 return (SVP_RA_ERROR);
561 659 }
562 660
563 661 sqp = scp->sc_input.sci_query;
564 662 assert(sqp != NULL);
565 663 sqp->sq_acttime = gethrtime();
566 664 total = ntohl(scp->sc_input.sci_req.svp_size);
567 665 do {
568 666 ret = read(scp->sc_socket,
569 667 (void *)((uintptr_t)sqp->sq_wdata + off),
570 668 total - off);
571 669 } while (ret == -1 && errno == EINTR);
572 670
573 671 if (ret == -1) {
574 672 switch (errno) {
575 673 case EAGAIN:
576 674 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
577 675 return (SVP_RA_NONE);
578 676 case EIO:
579 677 case ECONNRESET:
580 678 return (SVP_RA_ERROR);
581 679 break;
582 680 default:
583 681 libvarpd_panic("unexpeted read errno: %d", errno);
584 682 }
585 683 } else if (ret == 0) {
586 684 /* Try to reconnect to the remote host */
587 685 return (SVP_RA_ERROR);
588 686 }
589 687
590 688 if (ret + off < total) {
591 689 scp->sc_input.sci_offset += ret;
592 690 return (SVP_RA_NONE);
593 691 }
594 692
595 693 nop = ntohs(scp->sc_input.sci_req.svp_op);
596 694 crc = scp->sc_input.sci_req.svp_crc32;
597 695 svp_query_crc32(&scp->sc_input.sci_req, sqp->sq_wdata, total);
598 696 if (crc != scp->sc_input.sci_req.svp_crc32) {
599 697 (void) bunyan_info(svp_bunyan, "crc32 mismatch",
600 698 BUNYAN_T_IP, "remote ip", &scp->sc_addr,
601 699 BUNYAN_T_INT32, "remote port", scp->sc_remote->sr_rport,
602 700 BUNYAN_T_INT32, "version",
603 701 ntohs(scp->sc_input.sci_req.svp_ver),
604 702 BUNYAN_T_INT32, "operation", nop,
605 703 BUNYAN_T_INT32, "response id",
606 704 ntohl(scp->sc_input.sci_req.svp_id),
607 705 BUNYAN_T_INT32, "query state", sqp->sq_state,
608 706 BUNYAN_T_UINT32, "msg_crc", ntohl(crc),
609 707 BUNYAN_T_UINT32, "calc_crc",
610 708 ntohl(scp->sc_input.sci_req.svp_crc32),
611 709 BUNYAN_T_END);
612 710 return (SVP_RA_ERROR);
613 711 }
614 712 scp->sc_input.sci_query = NULL;
615 713 scp->sc_input.sci_offset = 0;
616 714
617 715 if (nop == SVP_R_VL2_ACK) {
618 716 svp_vl2_ack_t *sl2a = sqp->sq_wdata;
619 717 sqp->sq_status = ntohl(sl2a->sl2a_status);
620 718 } else if (nop == SVP_R_VL3_ACK) {
621 719 svp_vl3_ack_t *sl3a = sqp->sq_wdata;
622 720 sqp->sq_status = ntohl(sl3a->sl3a_status);
623 721 } else if (nop == SVP_R_LOG_ACK) {
624 722 svp_log_ack_t *svla = sqp->sq_wdata;
625 723 sqp->sq_status = ntohl(svla->svla_status);
626 724 } else if (nop == SVP_R_LOG_RM_ACK) {
627 725 svp_lrm_ack_t *svra = sqp->sq_wdata;
628 726 sqp->sq_status = ntohl(svra->svra_status);
629 727 } else {
630 728 libvarpd_panic("unhandled nop: %d", nop);
631 729 }
632 730
633 731 list_remove(&scp->sc_queries, sqp);
634 732 mutex_exit(&scp->sc_lock);
635 733
636 734 /*
637 735 * We have to release all of our resources associated with this entry
638 736 * before we call the callback. After we call it, the memory will be
639 737 * lost to time.
640 738 */
641 739 svp_query_release(sqp);
642 740 sqp->sq_func(sqp, sqp->sq_arg);
643 741 mutex_enter(&scp->sc_lock);
644 742 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
645 743
646 744 return (SVP_RA_NONE);
647 745 }
648 746
649 747 static svp_conn_act_t
650 748 svp_conn_reset(svp_conn_t *scp)
651 749 {
652 750 svp_remote_t *srp = scp->sc_remote;
653 751
654 752 assert(MUTEX_HELD(&srp->sr_lock));
655 753 assert(MUTEX_HELD(&scp->sc_lock));
656 754
657 755 assert(svp_event_dissociate(&scp->sc_event, scp->sc_socket) ==
658 756 ENOENT);
659 757 if (close(scp->sc_socket) != 0)
660 758 libvarpd_panic("failed to close socket %d: %d", scp->sc_socket,
661 759 errno);
662 760 scp->sc_flags &= ~SVP_CF_TEARDOWN;
663 761 scp->sc_socket = -1;
664 762 scp->sc_cstate = SVP_CS_INITIAL;
665 763 scp->sc_input.sci_query = NULL;
666 764 scp->sc_output.sco_query = NULL;
667 765
668 766 svp_remote_reassign(srp, scp);
669 767
670 768 return (svp_conn_connect(scp));
671 769 }
672 770
673 771 /*
674 772 * This is our general state transition function. We're called here when we want
675 773 * to advance part of our state machine as well as to re-arm ourselves. We can
676 774 * also end up here from the standard event loop as a result of having a user
677 775 * event posted.
678 776 */
679 777 static void
680 778 svp_conn_handler(port_event_t *pe, void *arg)
681 779 {
682 780 svp_conn_t *scp = arg;
683 781 svp_remote_t *srp = scp->sc_remote;
684 782 svp_conn_act_t ret = SVP_RA_NONE;
685 783 svp_conn_state_t oldstate;
686 784
687 785 mutex_enter(&scp->sc_lock);
688 786
689 787 /*
690 788 * Check if one of our event interrupts is set. An event interrupt, such
691 789 * as having to be reaped or be torndown is notified by a
692 790 * PORT_SOURCE_USER event that tries to take care of this. However,
693 791 * because of the fact that the event loop can be ongoing despite this,
694 792 * we may get here before the PORT_SOURCE_USER has casued us to get
695 793 * here. In such a case, if the PORT_SOURCE_USER event is tagged, then
696 794 * we're going to opt to do nothing here and wait for it to come and
697 795 * tear us down. That will also indicate to us that we have nothing to
698 796 * worry about as far as general timing and the like goes.
699 797 */
700 798 if ((scp->sc_flags & SVP_CF_UFLAG) != 0 &&
701 799 (scp->sc_flags & SVP_CF_USER) != 0 &&
702 800 pe != NULL &&
703 801 pe->portev_source != PORT_SOURCE_USER) {
704 802 mutex_exit(&scp->sc_lock);
705 803 return;
706 804 }
707 805
708 806 if (pe != NULL && pe->portev_source == PORT_SOURCE_USER) {
709 807 scp->sc_flags &= ~SVP_CF_USER;
710 808 if ((scp->sc_flags & SVP_CF_UFLAG) == 0) {
711 809 mutex_exit(&scp->sc_lock);
712 810 return;
713 811 }
714 812 }
715 813
716 814 /* Check if this needs to be freed */
717 815 if (scp->sc_flags & SVP_CF_REAP) {
718 816 mutex_exit(&scp->sc_lock);
719 817 svp_conn_destroy(scp);
720 818 return;
721 819 }
722 820
723 821 /* Check if this needs to be reset */
724 822 if (scp->sc_flags & SVP_CF_TEARDOWN) {
725 823 /* Make sure any other users of this are disassociated */
726 824 ret = SVP_RA_ERROR;
727 825 goto out;
728 826 }
729 827
730 828 switch (scp->sc_cstate) {
731 829 case SVP_CS_INITIAL:
732 830 case SVP_CS_BACKOFF:
733 831 assert(pe == NULL);
734 832 ret = svp_conn_connect(scp);
735 833 break;
736 834 case SVP_CS_CONNECTING:
737 835 assert(pe != NULL);
738 836 ret = svp_conn_poll_connect(pe, scp);
739 837 break;
740 838 case SVP_CS_ACTIVE:
741 839 case SVP_CS_WINDDOWN:
742 840 assert(pe != NULL);
743 841 oldstate = scp->sc_cstate;
744 842 if (pe->portev_events & POLLOUT)
745 843 ret = svp_conn_pollout(scp);
746 844 if (ret == SVP_RA_NONE && (pe->portev_events & POLLIN))
747 845 ret = svp_conn_pollin(scp);
748 846
749 847 if (oldstate == SVP_CS_WINDDOWN &&
750 848 (list_is_empty(&scp->sc_queries) || ret != SVP_RA_NONE)) {
751 849 ret = SVP_RA_CLEANUP;
752 850 }
753 851
754 852 if (ret == SVP_RA_NONE) {
755 853 int err;
756 854 if ((err = svp_event_associate(&scp->sc_event,
757 855 scp->sc_socket)) != 0) {
758 856 scp->sc_error = SVP_CE_ASSOCIATE;
759 857 scp->sc_errno = err;
760 858 scp->sc_cstate = SVP_CS_ERROR;
761 859 ret = SVP_RA_DEGRADE;
762 860 }
763 861 }
764 862 break;
765 863 default:
766 864 libvarpd_panic("svp_conn_handler encountered unexpected "
767 865 "state: %d", scp->sc_cstate);
768 866 }
769 867 out:
770 868 mutex_exit(&scp->sc_lock);
771 869
772 870 if (ret == SVP_RA_NONE)
773 871 return;
774 872
775 873 mutex_enter(&srp->sr_lock);
776 874 mutex_enter(&scp->sc_lock);
777 875 if (ret == SVP_RA_ERROR)
778 876 ret = svp_conn_reset(scp);
779 877
780 878 if (ret == SVP_RA_DEGRADE)
781 879 svp_conn_degrade(scp);
782 880 if (ret == SVP_RA_RESTORE)
783 881 svp_conn_restore(scp);
784 882
785 883 if (ret == SVP_RA_CLEANUP) {
786 884 svp_conn_remove(scp);
787 885 scp->sc_flags |= SVP_CF_REAP;
788 886 svp_conn_inject(scp);
789 887 }
790 888 mutex_exit(&scp->sc_lock);
791 889 mutex_exit(&srp->sr_lock);
792 890 }
793 891
794 892 static void
795 893 svp_conn_backtimer(void *arg)
796 894 {
797 895 svp_conn_t *scp = arg;
798 896
799 897 svp_conn_handler(NULL, scp);
800 898 }
801 899
802 900 /*
803 901 * This fires every svp_conn_query_timeout seconds. Its purpos is to determine
804 902 * if we haven't heard back on a request with in svp_conn_query_timeout seconds.
805 903 * If any of the svp_conn_query_t's that have been started (indicated by
806 904 * svp_query_t`sq_acttime != -1), and more than svp_conn_query_timeout seconds
807 905 * have passed, we basically tear this connection down and reassign outstanding
808 906 * queries.
809 907 */
810 908 static void
811 909 svp_conn_querytimer(void *arg)
812 910 {
813 911 int ret;
814 912 svp_query_t *sqp;
815 913 svp_conn_t *scp = arg;
816 914 hrtime_t now = gethrtime();
817 915
818 916 mutex_enter(&scp->sc_lock);
819 917
820 918 /*
821 919 * If we're not in the active state, then we don't care about this as
822 920 * we're already either going to die or we have no connections to worry
823 921 * about.
824 922 */
825 923 if (scp->sc_cstate != SVP_CS_ACTIVE) {
826 924 mutex_exit(&scp->sc_lock);
827 925 return;
828 926 }
829 927
830 928 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
831 929 sqp = list_next(&scp->sc_queries, sqp)) {
832 930 if (sqp->sq_acttime == -1)
833 931 continue;
834 932 if ((now - sqp->sq_acttime) / NANOSEC > svp_conn_query_timeout)
835 933 break;
836 934 }
837 935
838 936 /* Nothing timed out, we're good here */
839 937 if (sqp == NULL) {
840 938 mutex_exit(&scp->sc_lock);
841 939 return;
842 940 }
843 941
844 942 (void) bunyan_warn(svp_bunyan, "query timed out on connection",
845 943 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
846 944 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
847 945 BUNYAN_T_INT32, "operation", ntohs(sqp->sq_header.svp_op),
848 946 BUNYAN_T_END);
849 947
850 948 /*
851 949 * Begin the tear down process for this connect. If we lose the
852 950 * disassociate, then we don't inject an event. See the big theory
853 951 * statement in libvarpd_svp.c for more information.
854 952 */
855 953 scp->sc_flags |= SVP_CF_TEARDOWN;
856 954
857 955 ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket);
858 956 if (ret == 0)
859 957 svp_conn_inject(scp);
860 958 else
861 959 VERIFY(ret == ENOENT);
862 960
863 961 mutex_exit(&scp->sc_lock);
864 962 }
865 963
866 964 /*
867 965 * This connection has fallen out of DNS, figure out what we need to do with it.
868 966 */
869 967 void
870 968 svp_conn_fallout(svp_conn_t *scp)
871 969 {
872 970 svp_remote_t *srp = scp->sc_remote;
873 971
874 972 assert(MUTEX_HELD(&srp->sr_lock));
875 973
876 974 mutex_enter(&scp->sc_lock);
877 975 switch (scp->sc_cstate) {
878 976 case SVP_CS_ERROR:
879 977 /*
880 978 * Connection is already inactive, so it's safe to tear down.
881 979 * Fire it off through the state machine to tear down via the
882 980 * backoff timer.
883 981 */
884 982 svp_conn_remove(scp);
885 983 scp->sc_flags |= SVP_CF_REAP;
886 984 svp_conn_inject(scp);
887 985 break;
888 986 case SVP_CS_INITIAL:
889 987 case SVP_CS_BACKOFF:
890 988 case SVP_CS_CONNECTING:
891 989 /*
892 990 * Here, we have something actively going on, so we'll let it be
893 991 * clean up the next time we hit the event loop by the event
894 992 * loop itself. As it has no connections, there isn't much to
895 993 * really do, though we'll take this chance to go ahead and
896 994 * remove it from the remote.
897 995 */
898 996 svp_conn_remove(scp);
899 997 scp->sc_flags |= SVP_CF_REAP;
900 998 svp_conn_inject(scp);
901 999 break;
902 1000 case SVP_CS_ACTIVE:
903 1001 case SVP_CS_WINDDOWN:
904 1002 /*
905 1003 * If there are no outstanding queries, then we should simply
906 1004 * clean this up now,t he same way we would with the others.
907 1005 * Othewrise, as we know the event loop is ongoing, we'll make
908 1006 * sure that these entries get cleaned up once they're done.
909 1007 */
910 1008 scp->sc_cstate = SVP_CS_WINDDOWN;
911 1009 if (list_is_empty(&scp->sc_queries)) {
912 1010 svp_conn_remove(scp);
913 1011 scp->sc_flags |= SVP_CF_REAP;
914 1012 svp_conn_inject(scp);
915 1013 }
916 1014 break;
917 1015 default:
918 1016 libvarpd_panic("svp_conn_fallout encountered"
919 1017 "unkonwn state");
920 1018 }
921 1019 mutex_exit(&scp->sc_lock);
922 1020 mutex_exit(&srp->sr_lock);
923 1021 }
924 1022
925 1023 int
926 1024 svp_conn_create(svp_remote_t *srp, const struct in6_addr *addr)
927 1025 {
928 1026 int ret;
929 1027 svp_conn_t *scp;
930 1028
931 1029 assert(MUTEX_HELD(&srp->sr_lock));
932 1030 scp = umem_zalloc(sizeof (svp_conn_t), UMEM_DEFAULT);
933 1031 if (scp == NULL)
934 1032 return (ENOMEM);
935 1033
936 1034 if ((ret = mutex_init(&scp->sc_lock, USYNC_THREAD | LOCK_ERRORCHECK,
937 1035 NULL)) != 0) {
938 1036 umem_free(scp, sizeof (svp_conn_t));
939 1037 return (ret);
940 1038 }
941 1039
942 1040 scp->sc_remote = srp;
943 1041 scp->sc_event.se_func = svp_conn_handler;
944 1042 scp->sc_event.se_arg = scp;
945 1043 scp->sc_btimer.st_func = svp_conn_backtimer;
946 1044 scp->sc_btimer.st_arg = scp;
947 1045 scp->sc_btimer.st_oneshot = B_TRUE;
948 1046 scp->sc_btimer.st_value = 1;
949 1047
950 1048 scp->sc_qtimer.st_func = svp_conn_querytimer;
951 1049 scp->sc_qtimer.st_arg = scp;
952 1050 scp->sc_qtimer.st_oneshot = B_FALSE;
953 1051 scp->sc_qtimer.st_value = svp_conn_query_timeout;
954 1052
955 1053 scp->sc_socket = -1;
956 1054
957 1055 list_create(&scp->sc_queries, sizeof (svp_query_t),
958 1056 offsetof(svp_query_t, sq_lnode));
959 1057 scp->sc_gen = srp->sr_gen;
960 1058 bcopy(addr, &scp->sc_addr, sizeof (struct in6_addr));
961 1059 scp->sc_cstate = SVP_CS_INITIAL;
962 1060 mutex_enter(&scp->sc_lock);
963 1061 svp_conn_add(scp);
964 1062 mutex_exit(&scp->sc_lock);
965 1063
966 1064 /* Now that we're locked and loaded, add our timers */
967 1065 svp_timer_add(&scp->sc_qtimer);
968 1066 svp_timer_add(&scp->sc_btimer);
969 1067
970 1068 return (0);
971 1069 }
972 1070
973 1071 /*
974 1072 * At the time of calling, the entry has been removed from all lists. In
975 1073 * addition, the entries state should be SVP_CS_ERROR, therefore, we know that
976 1074 * the fd should not be associated with the event loop. We'll double check that
977 1075 * just in case. We should also have already been removed from the remote's
978 1076 * list.
979 1077 */
980 1078 void
981 1079 svp_conn_destroy(svp_conn_t *scp)
982 1080 {
983 1081 int ret;
984 1082
985 1083 mutex_enter(&scp->sc_lock);
986 1084 if (scp->sc_cstate != SVP_CS_ERROR)
987 1085 libvarpd_panic("asked to tear down an active connection");
988 1086 if (scp->sc_flags & SVP_CF_ADDED)
989 1087 libvarpd_panic("asked to remove a connection still in "
990 1088 "the remote list\n");
991 1089 if (!list_is_empty(&scp->sc_queries))
992 1090 libvarpd_panic("asked to remove a connection with non-empty "
993 1091 "query list");
994 1092
995 1093 if ((ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket)) !=
996 1094 ENOENT) {
997 1095 libvarpd_panic("dissociate failed or was actually "
998 1096 "associated: %d", ret);
999 1097 }
1000 1098 mutex_exit(&scp->sc_lock);
1001 1099
1002 1100 /* Verify our timers are killed */
1003 1101 svp_timer_remove(&scp->sc_btimer);
1004 1102 svp_timer_remove(&scp->sc_qtimer);
1005 1103
1006 1104 if (scp->sc_socket != -1 && close(scp->sc_socket) != 0)
1007 1105 libvarpd_panic("failed to close svp_conn_t`scp_socket fd "
1008 1106 "%d: %d", scp->sc_socket, errno);
1009 1107
1010 1108 list_destroy(&scp->sc_queries);
1011 1109 umem_free(scp, sizeof (svp_conn_t));
1012 1110 }
1013 1111
1014 1112 void
1015 1113 svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp)
1016 1114 {
1017 1115 assert(MUTEX_HELD(&scp->sc_lock));
1018 1116 assert(scp->sc_cstate == SVP_CS_ACTIVE);
1019 1117
1020 1118 sqp->sq_acttime = -1;
1021 1119 list_insert_tail(&scp->sc_queries, sqp);
1022 1120 if (!(scp->sc_event.se_events & POLLOUT)) {
1023 1121 scp->sc_event.se_events |= POLLOUT;
1024 1122 /*
1025 1123 * If this becomes frequent, we should instead give up on this
1026 1124 * set of connections instead of aborting.
1027 1125 */
1028 1126 if (svp_event_associate(&scp->sc_event, scp->sc_socket) != 0)
1029 1127 libvarpd_panic("svp_event_associate failed somehow");
1030 1128 }
1031 1129 }
|
↓ open down ↓ |
617 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX