Print this page
Try versioning as a new state
| Split |
Close |
| Expand all |
| Collapse all |
--- old/usr/src/lib/varpd/svp/common/libvarpd_svp_conn.c
+++ new/usr/src/lib/varpd/svp/common/libvarpd_svp_conn.c
1 1 /*
2 2 * This file and its contents are supplied under the terms of the
3 3 * Common Development and Distribution License ("CDDL"), version 1.0.
4 4 * You may only use this file in accordance with the terms of version
5 5 * 1.0 of the CDDL.
6 6 *
7 7 * A full copy of the text of the CDDL should have accompanied this
8 8 * source. A copy of the CDDL is also available via the Internet at
9 9 * http://www.illumos.org/license/CDDL.
10 10 */
11 11
12 12 /*
13 13 * Copyright 2015 Joyent, Inc.
14 14 */
15 15
16 16 /*
17 17 * Logic to manage an individual connection to a remote host.
18 18 *
19 19 * For more information, see the big theory statement in
20 20 * lib/varpd/svp/common/libvarpd_svp.c.
21 21 */
22 22
23 23 #include <assert.h>
24 24 #include <umem.h>
25 25 #include <errno.h>
26 26 #include <strings.h>
27 27 #include <unistd.h>
28 28 #include <stddef.h>
29 29 #include <sys/uio.h>
30 30 #include <sys/debug.h>
31 31
32 32 #include <libvarpd_svp.h>
|
↓ open down ↓ |
32 lines elided |
↑ open up ↑ |
33 33
34 34 int svp_conn_query_timeout = 30;
35 35 static int svp_conn_backoff_tbl[] = { 1, 2, 4, 8, 16, 32 };
36 36 static int svp_conn_nbackoff = sizeof (svp_conn_backoff_tbl) / sizeof (int);
37 37
38 38 typedef enum svp_conn_act {
39 39 SVP_RA_NONE = 0x00,
40 40 SVP_RA_DEGRADE = 0x01,
41 41 SVP_RA_RESTORE = 0x02,
42 42 SVP_RA_ERROR = 0x03,
43 - SVP_RA_CLEANUP = 0x04
43 + SVP_RA_CLEANUP = 0x04,
44 + SVP_RA_FIND_VERSION = 0x05
44 45 } svp_conn_act_t;
45 46
46 47 static svp_conn_act_t svp_conn_poll_connect(port_event_t *, svp_conn_t *);
47 48
48 49 static void
49 50 svp_conn_inject(svp_conn_t *scp)
50 51 {
51 52 int ret;
52 53 assert(MUTEX_HELD(&scp->sc_lock));
53 54
54 55 if (scp->sc_flags & SVP_CF_USER)
55 56 return;
56 57 scp->sc_flags |= SVP_CF_USER;
57 58 if ((ret = svp_event_inject(&scp->sc_event)) != 0)
58 59 libvarpd_panic("failed to inject event: %d\n", ret);
59 60 }
60 61
61 62 static void
62 63 svp_conn_degrade(svp_conn_t *scp)
63 64 {
64 65 svp_remote_t *srp = scp->sc_remote;
65 66
66 67 assert(MUTEX_HELD(&srp->sr_lock));
67 68 assert(MUTEX_HELD(&scp->sc_lock));
68 69
69 70 if (scp->sc_flags & SVP_CF_DEGRADED)
70 71 return;
71 72
72 73 scp->sc_flags |= SVP_CF_DEGRADED;
73 74 srp->sr_ndconns++;
74 75 if (srp->sr_ndconns == srp->sr_tconns)
75 76 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
76 77 }
77 78
78 79 static void
79 80 svp_conn_restore(svp_conn_t *scp)
80 81 {
81 82 svp_remote_t *srp = scp->sc_remote;
82 83
83 84 assert(MUTEX_HELD(&srp->sr_lock));
84 85 assert(MUTEX_HELD(&scp->sc_lock));
|
↓ open down ↓ |
31 lines elided |
↑ open up ↑ |
85 86
86 87 if (!(scp->sc_flags & SVP_CF_DEGRADED))
87 88 return;
88 89
89 90 scp->sc_flags &= ~SVP_CF_DEGRADED;
90 91 if (srp->sr_ndconns == srp->sr_tconns)
91 92 svp_remote_restore(srp, SVP_RD_REMOTE_FAIL);
92 93 srp->sr_ndconns--;
93 94 }
94 95
96 +static svp_conn_act_t
97 +svp_conn_pong_handler(svp_conn_t *scp, svp_query_t *sqp)
98 +{
99 + uint16_t remote_version = ntohs(scp->sc_input.sci_req.svp_ver);
100 +
101 + if (scp->sc_cstate == SVP_CS_VERSIONING) {
102 + /* Transition VERSIONING -> ACTIVE. */
103 + assert(scp->sc_version == 0);
104 + if (remote_version == 0 || remote_version > SVP_CURRENT_VERSION)
105 + return (SVP_RA_ERROR);
106 + scp->sc_version = remote_version;
107 + scp->sc_cstate = SVP_CS_ACTIVE;
108 + }
109 +
110 + return (SVP_RA_NONE);
111 +}
112 +
95 113 static void
114 +svp_conn_ping_cb(svp_query_t *sqp, void *arg)
115 +{
116 + size_t len = (size_t)arg;
117 +
118 + assert(len == sizeof (svp_query_t));
119 + umem_free(sqp, len);
120 +}
121 +
122 +static svp_conn_act_t
123 +svp_conn_ping_version(svp_conn_t *scp)
124 +{
125 + svp_remote_t *srp = scp->sc_remote;
126 + svp_query_t *sqp = umem_zalloc(sizeof (svp_query_t), UMEM_DEFAULT);
127 + int ret;
128 +
129 + assert(MUTEX_HELD(&srp->sr_lock));
130 + assert(MUTEX_HELD(&scp->sc_lock));
131 + assert(scp->sc_cstate == SVP_CS_CONNECTING);
132 +
133 + if (sqp == NULL)
134 + return (SVP_RA_ERROR);
135 +
136 + /* Only set things that need to be non-0/non-NULL. */
137 + sqp->sq_state = SVP_QUERY_INIT;
138 + sqp->sq_func = svp_conn_ping_cb;
139 + sqp->sq_arg = (void *)sizeof (svp_query_t);
140 + sqp->sq_header.svp_op = htons(SVP_R_PING);
141 + sqp->sq_header.svp_ver = htons(SVP_CURRENT_VERSION);
142 + sqp->sq_header.svp_id = svp_id_alloc();
143 + if (sqp->sq_header.svp_id == -1) {
144 + umem_free(sqp, sizeof (svp_query_t));
145 + return (SVP_RA_ERROR);
146 + }
147 +
148 + scp->sc_cstate = SVP_CS_VERSIONING;
149 + /* Set the event flags now... */
150 + scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP | POLLOUT;
151 + /* ...so I can just queue it up directly... */
152 + svp_conn_queue(scp, sqp);
153 + /* ... and then associate the event port myself. */
154 + ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
155 + if (ret == 0)
156 + return (SVP_RA_RESTORE);
157 + scp->sc_error = SVP_CE_ASSOCIATE;
158 + scp->sc_errno = ret;
159 + scp->sc_cstate = SVP_CS_ERROR;
160 + umem_free(sqp, sizeof (svp_query_t));
161 + return (SVP_RA_DEGRADE);
162 +}
163 +
164 +static void
96 165 svp_conn_add(svp_conn_t *scp)
97 166 {
98 167 svp_remote_t *srp = scp->sc_remote;
99 168
100 169 assert(MUTEX_HELD(&srp->sr_lock));
101 170 assert(MUTEX_HELD(&scp->sc_lock));
102 171
103 172 if (scp->sc_flags & SVP_CF_ADDED)
104 173 return;
105 174
106 175 list_insert_tail(&srp->sr_conns, scp);
107 176 scp->sc_flags |= SVP_CF_ADDED;
108 177 srp->sr_tconns++;
109 178 }
110 179
111 180 static void
112 181 svp_conn_remove(svp_conn_t *scp)
113 182 {
114 183 svp_remote_t *srp = scp->sc_remote;
115 184
116 185 assert(MUTEX_HELD(&srp->sr_lock));
117 186 assert(MUTEX_HELD(&scp->sc_lock));
118 187
119 188 if (!(scp->sc_flags & SVP_CF_ADDED))
120 189 return;
121 190
122 191 scp->sc_flags &= ~SVP_CF_ADDED;
123 192 if (scp->sc_flags & SVP_CF_DEGRADED)
124 193 srp->sr_ndconns--;
125 194 srp->sr_tconns--;
126 195 if (srp->sr_tconns == srp->sr_ndconns)
127 196 svp_remote_degrade(srp, SVP_RD_REMOTE_FAIL);
128 197 }
129 198
130 199 static svp_query_t *
131 200 svp_conn_query_find(svp_conn_t *scp, uint32_t id)
132 201 {
133 202 svp_query_t *sqp;
134 203
135 204 assert(MUTEX_HELD(&scp->sc_lock));
136 205
137 206 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
138 207 sqp = list_next(&scp->sc_queries, sqp)) {
139 208 if (sqp->sq_header.svp_id == id)
140 209 break;
141 210 }
142 211
143 212 return (sqp);
144 213 }
145 214
146 215 static svp_conn_act_t
147 216 svp_conn_backoff(svp_conn_t *scp)
148 217 {
149 218 assert(MUTEX_HELD(&scp->sc_lock));
150 219
151 220 if (close(scp->sc_socket) != 0)
152 221 libvarpd_panic("failed to close socket %d: %d\n",
153 222 scp->sc_socket, errno);
154 223 scp->sc_socket = -1;
155 224
156 225 scp->sc_cstate = SVP_CS_BACKOFF;
157 226 scp->sc_nbackoff++;
158 227 if (scp->sc_nbackoff >= svp_conn_nbackoff) {
159 228 scp->sc_btimer.st_value =
160 229 svp_conn_backoff_tbl[svp_conn_nbackoff - 1];
161 230 } else {
|
↓ open down ↓ |
56 lines elided |
↑ open up ↑ |
162 231 scp->sc_btimer.st_value =
163 232 svp_conn_backoff_tbl[scp->sc_nbackoff - 1];
164 233 }
165 234 svp_timer_add(&scp->sc_btimer);
166 235
167 236 if (scp->sc_nbackoff > svp_conn_nbackoff)
168 237 return (SVP_RA_DEGRADE);
169 238 return (SVP_RA_NONE);
170 239 }
171 240
172 -/*
173 - * Think of this as an extension to the connect() call in svp_conn_connect().
174 - * Send a message, receive it, and set the version here. If the response is
175 - * too slow or the socket throws an error, indicate a socket error, which
176 - * will cause the caller to backoff (i.e. close the socket and try again).
177 - *
178 - * Version mismatch (corrupt SVP server or too-advanced SVP server) is its
179 - * own error type.
180 - */
181 -static svp_conn_error_t
182 -svp_conn_version_set(svp_conn_t *scp)
183 -{
184 - svp_req_t ping;
185 - ssize_t ret;
186 - uint32_t save_crc;
187 - uint16_t peer_version;
188 - int ntries = 3; /* One second between tries. 3secs should be enough. */
189 -
190 - ping.svp_ver = htons(SVP_CURRENT_VERSION);
191 - ping.svp_op = htons(SVP_R_PING);
192 - ping.svp_size = 0; /* Header-only... */
193 - ping.svp_id = 0;
194 - /* 0-length data... just use the req buffer for the pointer. */
195 - svp_query_crc32(&ping, &ping, 0);
196 -
197 - ret = write(scp->sc_socket, &ping, sizeof (ping));
198 - if (ret == -1) {
199 - /*
200 - * A failed write() call right after connect probably
201 - * indicates a larger connection failure. Restart the
202 - * connection from scratch.
203 - */
204 - return (SVP_CE_SOCKET);
205 - }
206 - assert(ret == sizeof (ping));
207 - do {
208 - /*
209 - * Asynch read. We may loop here once or twice.
210 - * Wait a bit, but don't loop too many times...
211 - */
212 - (void) sleep(1);
213 - ret = read(scp->sc_socket, &ping, sizeof (ping));
214 - } while (--ntries > 0 &&
215 - ret == -1 && (errno == EINTR || errno == EAGAIN));
216 - if (ret == -1) {
217 - /*
218 - * This is actually a failed read() call. Restart the
219 - * connection from scratch.
220 - */
221 - return (SVP_CE_SOCKET);
222 - }
223 -
224 - save_crc = ping.svp_crc32;
225 - svp_query_crc32(&ping, &ping, 0);
226 - peer_version = htons(ping.svp_ver);
227 - if (ping.svp_op != htons(SVP_R_PONG) ||
228 - ping.svp_size != 0 || ping.svp_id != 0 ||
229 - ping.svp_crc32 != save_crc ||
230 - peer_version == 0 || peer_version > SVP_CURRENT_VERSION) {
231 - return (SVP_CE_VERSION_PONG);
232 - }
233 -
234 - /* This connection now has a version! */
235 - scp->sc_version = peer_version;
236 - return (SVP_CE_NONE);
237 -}
238 -
239 241 static svp_conn_act_t
240 242 svp_conn_connect(svp_conn_t *scp)
241 243 {
242 244 int ret;
243 245 struct sockaddr_in6 in6;
244 246
245 247 assert(MUTEX_HELD(&scp->sc_lock));
246 248 assert(scp->sc_cstate == SVP_CS_BACKOFF ||
247 249 scp->sc_cstate == SVP_CS_INITIAL);
248 250 assert(scp->sc_socket == -1);
249 251 if (scp->sc_cstate == SVP_CS_INITIAL)
250 252 scp->sc_nbackoff = 0;
251 253
252 254 /* New connect means we need to know the version. */
253 255 scp->sc_version = 0;
254 256
255 257 scp->sc_socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK, 0);
256 258 if (scp->sc_socket == -1) {
257 259 scp->sc_error = SVP_CE_SOCKET;
258 260 scp->sc_errno = errno;
259 261 scp->sc_cstate = SVP_CS_ERROR;
260 262 return (SVP_RA_DEGRADE);
261 263 }
262 264
263 265 bzero(&in6, sizeof (struct sockaddr_in6));
264 266 in6.sin6_family = AF_INET6;
265 267 in6.sin6_port = htons(scp->sc_remote->sr_rport);
266 268 bcopy(&scp->sc_addr, &in6.sin6_addr, sizeof (struct in6_addr));
267 269 ret = connect(scp->sc_socket, (struct sockaddr *)&in6,
268 270 sizeof (struct sockaddr_in6));
269 271 if (ret != 0) {
270 272 boolean_t async = B_FALSE;
271 273
272 274 switch (errno) {
273 275 case EACCES:
274 276 case EADDRINUSE:
275 277 case EAFNOSUPPORT:
276 278 case EALREADY:
277 279 case EBADF:
278 280 case EISCONN:
279 281 case ELOOP:
280 282 case ENOENT:
281 283 case ENOSR:
282 284 case EWOULDBLOCK:
283 285 libvarpd_panic("unanticipated connect errno %d", errno);
284 286 break;
285 287 case EINPROGRESS:
286 288 case EINTR:
287 289 async = B_TRUE;
288 290 default:
289 291 break;
290 292 }
291 293
292 294 /*
293 295 * So, we will be connecting to this in the future, advance our
294 296 * state and make sure that we poll for the next round.
295 297 */
296 298 if (async == B_TRUE) {
297 299 scp->sc_cstate = SVP_CS_CONNECTING;
298 300 scp->sc_event.se_events = POLLOUT | POLLHUP;
299 301 ret = svp_event_associate(&scp->sc_event,
300 302 scp->sc_socket);
301 303 if (ret == 0)
302 304 return (SVP_RA_NONE);
303 305 scp->sc_error = SVP_CE_ASSOCIATE;
304 306 scp->sc_errno = ret;
305 307 scp->sc_cstate = SVP_CS_ERROR;
306 308 return (SVP_RA_DEGRADE);
307 309 } else {
308 310 /*
309 311 * This call failed, which means that we obtained one of
310 312 * the following:
311 313 *
312 314 * EADDRNOTAVAIL
313 315 * ECONNREFUSED
314 316 * EIO
315 317 * ENETUNREACH
316 318 * EHOSTUNREACH
|
↓ open down ↓ |
68 lines elided |
↑ open up ↑ |
317 319 * ENXIO
318 320 * ETIMEDOUT
319 321 *
320 322 * Therefore we need to set ourselves into backoff and
321 323 * wait for that to clear up.
322 324 */
323 325 return (svp_conn_backoff(scp));
324 326 }
325 327 }
326 328
329 + /* Immediately successful connection, move to SVP_CS_VERSIONING. */
327 330 return (svp_conn_poll_connect(NULL, scp));
328 331 }
329 332
330 333 /*
331 334 * This should be the first call we get after a successful synchronous
332 335 * connect, or a completed (failed or successful) asynchronous connect. A
333 336 * non-NULL port-event indicates asynchronous completion, a NULL port-event
334 337 * indicates a successful synchronous connect.
335 338 *
336 339 * If we have successfully connected, we should see a writeable event. In the
337 340 * asynchronous case, we may also see an error or a hang up. For either hang
338 341 * up or error, we transition to error mode. If there is also a readable event
339 342 * (i.e. incoming data), we ignore it at the moment and just let a
340 343 * reassociation pick it up so we can simplify the set of state transitions
341 344 * that we have.
342 345 */
343 346 static svp_conn_act_t
344 347 svp_conn_poll_connect(port_event_t *pe, svp_conn_t *scp)
345 348 {
346 349 int ret;
347 350 svp_conn_error_t version_error;
348 351
349 352 if (pe != NULL) {
350 353 int err;
351 354 socklen_t sl = sizeof (err);
352 355
353 356 /*
354 357 * These bits only matter if we're notified of an
355 358 * asynchronous connection completion.
356 359 */
357 360 if (!(pe->portev_events & POLLOUT)) {
358 361 scp->sc_errno = 0;
359 362 scp->sc_error = SVP_CE_NOPOLLOUT;
360 363 scp->sc_cstate = SVP_CS_ERROR;
361 364 return (SVP_RA_DEGRADE);
362 365 }
|
↓ open down ↓ |
26 lines elided |
↑ open up ↑ |
363 366
364 367 ret = getsockopt(scp->sc_socket, SOL_SOCKET, SO_ERROR, &err,
365 368 &sl);
366 369 if (ret != 0)
367 370 libvarpd_panic("unanticipated getsockopt error");
368 371 if (err != 0) {
369 372 return (svp_conn_backoff(scp));
370 373 }
371 374 }
372 375
373 - /* Use a single SVP_R_PING to determine the version. */
374 - version_error = svp_conn_version_set(scp);
375 - switch (version_error) {
376 - case SVP_CE_SOCKET:
377 - /* Use this to signal read/write errors... */
378 - return (svp_conn_backoff(scp));
379 - case SVP_CE_NONE:
380 - assert(scp->sc_version > 0 &&
381 - scp->sc_version <= SVP_CURRENT_VERSION);
382 - break;
383 - default:
384 - scp->sc_error = version_error;
385 - scp->sc_cstate = SVP_CS_ERROR;
386 - scp->sc_errno = EPROTONOSUPPORT; /* Protocol error... */
387 - return (SVP_RA_DEGRADE);
388 - }
389 -
390 - scp->sc_cstate = SVP_CS_ACTIVE;
391 - scp->sc_event.se_events = POLLIN | POLLRDNORM | POLLHUP;
392 - ret = svp_event_associate(&scp->sc_event, scp->sc_socket);
393 - if (ret == 0)
394 - return (SVP_RA_RESTORE);
395 - scp->sc_error = SVP_CE_ASSOCIATE;
396 - scp->sc_errno = ret;
397 - scp->sc_cstate = SVP_CS_ERROR;
398 - return (SVP_RA_DEGRADE);
376 + return (SVP_RA_FIND_VERSION);
399 377 }
400 378
401 379 static svp_conn_act_t
402 380 svp_conn_pollout(svp_conn_t *scp)
403 381 {
404 382 svp_query_t *sqp;
405 383 svp_req_t *req;
406 384 size_t off;
407 385 struct iovec iov[2];
408 386 int nvecs = 0;
409 387 ssize_t ret;
410 388
411 389 assert(MUTEX_HELD(&scp->sc_lock));
412 390
413 391 /*
414 392 * We need to find a query and start writing it out.
415 393 */
416 394 if (scp->sc_output.sco_query == NULL) {
417 395 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
418 396 sqp = list_next(&scp->sc_queries, sqp)) {
419 397 if (sqp->sq_state != SVP_QUERY_INIT)
420 398 continue;
421 399 break;
422 400 }
423 401
424 402 if (sqp == NULL) {
425 403 scp->sc_event.se_events &= ~POLLOUT;
426 404 return (SVP_RA_NONE);
427 405 }
428 406
429 407 scp->sc_output.sco_query = sqp;
430 408 scp->sc_output.sco_offset = 0;
431 409 sqp->sq_state = SVP_QUERY_WRITING;
432 410 svp_query_crc32(&sqp->sq_header, sqp->sq_rdata, sqp->sq_rsize);
433 411 }
434 412
435 413 sqp = scp->sc_output.sco_query;
436 414 req = &sqp->sq_header;
437 415 off = scp->sc_output.sco_offset;
438 416 if (off < sizeof (svp_req_t)) {
439 417 iov[nvecs].iov_base = (void *)((uintptr_t)req + off);
440 418 iov[nvecs].iov_len = sizeof (svp_req_t) - off;
441 419 nvecs++;
442 420 off = 0;
443 421 } else {
444 422 off -= sizeof (svp_req_t);
445 423 }
446 424
447 425 iov[nvecs].iov_base = (void *)((uintptr_t)sqp->sq_rdata + off);
448 426 iov[nvecs].iov_len = sqp->sq_rsize - off;
449 427 nvecs++;
450 428
451 429 do {
452 430 ret = writev(scp->sc_socket, iov, nvecs);
453 431 } while (ret == -1 && errno == EINTR);
454 432 if (ret == -1) {
455 433 switch (errno) {
456 434 case EAGAIN:
457 435 scp->sc_event.se_events |= POLLOUT;
458 436 return (SVP_RA_NONE);
459 437 case EIO:
460 438 case ENXIO:
461 439 case ECONNRESET:
462 440 return (SVP_RA_ERROR);
463 441 default:
464 442 libvarpd_panic("unexpected errno: %d", errno);
465 443 }
466 444 }
467 445
468 446 sqp->sq_acttime = gethrtime();
469 447 scp->sc_output.sco_offset += ret;
470 448 if (ret >= sizeof (svp_req_t) + sqp->sq_rsize) {
471 449 sqp->sq_state = SVP_QUERY_READING;
472 450 scp->sc_output.sco_query = NULL;
|
↓ open down ↓ |
64 lines elided |
↑ open up ↑ |
473 451 scp->sc_output.sco_offset = 0;
474 452 scp->sc_event.se_events |= POLLOUT;
475 453 }
476 454 return (SVP_RA_NONE);
477 455 }
478 456
479 457 static boolean_t
480 458 svp_conn_pollin_validate(svp_conn_t *scp)
481 459 {
482 460 svp_query_t *sqp;
483 - uint32_t nsize;
461 + uint32_t nsize, expected_size = 0;
484 462 uint16_t nvers, nop;
485 463 svp_req_t *resp = &scp->sc_input.sci_req;
486 464
487 465 assert(MUTEX_HELD(&scp->sc_lock));
488 466
489 467 nvers = ntohs(resp->svp_ver);
490 468 nop = ntohs(resp->svp_op);
491 469 nsize = ntohl(resp->svp_size);
492 470
493 471 /*
494 472 * A peer that's messing with post-connection version changes is
495 473 * likely a broken peer.
496 474 */
497 - if (nvers != scp->sc_version) {
475 + if (scp->sc_cstate != SVP_CS_VERSIONING && nvers != scp->sc_version) {
498 476 (void) bunyan_warn(svp_bunyan, "version mismatch",
499 477 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
500 478 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
501 479 BUNYAN_T_INT32, "peer version", nvers,
502 480 BUNYAN_T_INT32, "our version", scp->sc_version,
503 481 BUNYAN_T_INT32, "operation", nop,
504 482 BUNYAN_T_INT32, "response_id", resp->svp_id,
505 483 BUNYAN_T_END);
506 484 return (B_FALSE);
507 485 }
508 486
509 - if (nop != SVP_R_VL2_ACK && nop != SVP_R_VL3_ACK &&
510 - nop != SVP_R_LOG_ACK && nop != SVP_R_LOG_RM_ACK) {
487 + switch (nop) {
488 + case SVP_R_VL2_ACK:
489 + expected_size = sizeof (svp_vl2_ack_t);
490 + break;
491 + case SVP_R_VL3_ACK:
492 + expected_size = sizeof (svp_vl3_ack_t);
493 + break;
494 + case SVP_R_LOG_RM_ACK:
495 + expected_size = sizeof (svp_lrm_ack_t);
496 + break;
497 + case SVP_R_ROUTE_ACK:
498 + expected_size = sizeof (svp_route_ack_t);
499 + break;
500 + case SVP_R_LOG_ACK:
501 + case SVP_R_PONG:
502 + /* No expected size (LOG_ACK) or size is 0 (PONG). */
503 + break;
504 + default:
511 505 (void) bunyan_warn(svp_bunyan, "unsupported operation",
512 506 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
513 507 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
514 508 BUNYAN_T_INT32, "version", nvers,
515 509 BUNYAN_T_INT32, "operation", nop,
516 510 BUNYAN_T_INT32, "response_id", resp->svp_id,
517 511 BUNYAN_T_END);
518 512 return (B_FALSE);
519 513 }
520 514
521 515 sqp = svp_conn_query_find(scp, resp->svp_id);
522 516 if (sqp == NULL) {
523 517 (void) bunyan_warn(svp_bunyan, "unknown response id",
524 518 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
525 519 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
526 520 BUNYAN_T_INT32, "version", nvers,
527 521 BUNYAN_T_INT32, "operation", nop,
528 522 BUNYAN_T_INT32, "response_id", resp->svp_id,
529 523 BUNYAN_T_END);
530 524 return (B_FALSE);
531 525 }
532 526
533 527 if (sqp->sq_state != SVP_QUERY_READING) {
534 528 (void) bunyan_warn(svp_bunyan,
535 529 "got response for unexpecting query",
|
↓ open down ↓ |
15 lines elided |
↑ open up ↑ |
536 530 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
537 531 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
538 532 BUNYAN_T_INT32, "version", nvers,
539 533 BUNYAN_T_INT32, "operation", nop,
540 534 BUNYAN_T_INT32, "response_id", resp->svp_id,
541 535 BUNYAN_T_INT32, "query_state", sqp->sq_state,
542 536 BUNYAN_T_END);
543 537 return (B_FALSE);
544 538 }
545 539
546 - if ((nop == SVP_R_VL2_ACK && nsize != sizeof (svp_vl2_ack_t)) ||
547 - (nop == SVP_R_VL3_ACK && nsize != sizeof (svp_vl3_ack_t)) ||
548 - (nop == SVP_R_LOG_RM_ACK && nsize != sizeof (svp_lrm_ack_t))) {
540 + if (nop != SVP_R_LOG_RM_ACK && nsize != expected_size) {
549 541 (void) bunyan_warn(svp_bunyan, "response size too large",
550 542 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
551 543 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
552 544 BUNYAN_T_INT32, "version", nvers,
553 545 BUNYAN_T_INT32, "operation", nop,
554 546 BUNYAN_T_INT32, "response_id", resp->svp_id,
555 547 BUNYAN_T_INT32, "response_size", nsize,
556 548 BUNYAN_T_INT32, "expected_size", nop == SVP_R_VL2_ACK ?
557 549 sizeof (svp_vl2_ack_t) : sizeof (svp_vl3_ack_t),
558 550 BUNYAN_T_INT32, "query_state", sqp->sq_state,
559 551 BUNYAN_T_END);
560 552 return (B_FALSE);
561 553 }
562 554
563 555 /*
564 556 * The valid size is anything <= to what the user requested, but at
565 557 * least svp_log_ack_t bytes large.
566 558 */
567 559 if (nop == SVP_R_LOG_ACK) {
568 560 const char *msg = NULL;
569 561 if (nsize < sizeof (svp_log_ack_t))
570 562 msg = "response size too small";
571 563 else if (nsize > ((svp_log_req_t *)sqp->sq_rdata)->svlr_count)
572 564 msg = "response size too large";
573 565 if (msg != NULL) {
574 566 (void) bunyan_warn(svp_bunyan, msg,
575 567 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
576 568 BUNYAN_T_INT32, "remote_port",
577 569 scp->sc_remote->sr_rport,
578 570 BUNYAN_T_INT32, "version", nvers,
579 571 BUNYAN_T_INT32, "operation", nop,
580 572 BUNYAN_T_INT32, "response_id", resp->svp_id,
581 573 BUNYAN_T_INT32, "response_size", nsize,
582 574 BUNYAN_T_INT32, "expected_size",
|
↓ open down ↓ |
24 lines elided |
↑ open up ↑ |
583 575 ((svp_log_req_t *)sqp->sq_rdata)->svlr_count,
584 576 BUNYAN_T_INT32, "query_state", sqp->sq_state,
585 577 BUNYAN_T_END);
586 578 return (B_FALSE);
587 579 }
588 580 }
589 581
590 582 sqp->sq_size = nsize;
591 583 scp->sc_input.sci_query = sqp;
592 584 if (nop == SVP_R_VL2_ACK || nop == SVP_R_VL3_ACK ||
593 - nop == SVP_R_LOG_RM_ACK) {
585 + nop == SVP_R_LOG_RM_ACK || nop == SVP_R_ROUTE_ACK ||
586 + nop == SVP_R_PONG) {
594 587 sqp->sq_wdata = &sqp->sq_wdun;
595 588 sqp->sq_wsize = sizeof (svp_query_data_t);
596 589 } else {
597 590 VERIFY(nop == SVP_R_LOG_ACK);
598 591 assert(sqp->sq_wdata != NULL);
599 592 assert(sqp->sq_wsize != 0);
600 593 }
601 594
602 595 return (B_TRUE);
603 596 }
604 597
605 598 static svp_conn_act_t
606 599 svp_conn_pollin(svp_conn_t *scp)
607 600 {
608 601 size_t off, total;
609 602 ssize_t ret;
610 603 svp_query_t *sqp;
611 604 uint32_t crc;
612 605 uint16_t nop;
613 606
614 607 assert(MUTEX_HELD(&scp->sc_lock));
615 608
616 609 /*
617 610 * No query implies that we're reading in the header and that the offset
618 611 * is associted with it.
619 612 */
620 613 off = scp->sc_input.sci_offset;
621 614 sqp = scp->sc_input.sci_query;
622 615 if (scp->sc_input.sci_query == NULL) {
623 616 svp_req_t *resp = &scp->sc_input.sci_req;
624 617
625 618 assert(off < sizeof (svp_req_t));
626 619
627 620 do {
628 621 ret = read(scp->sc_socket,
629 622 (void *)((uintptr_t)resp + off),
630 623 sizeof (svp_req_t) - off);
631 624 } while (ret == -1 && errno == EINTR);
632 625 if (ret == -1) {
633 626 switch (errno) {
634 627 case EAGAIN:
635 628 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
636 629 return (SVP_RA_NONE);
637 630 case EIO:
638 631 case ECONNRESET:
639 632 return (SVP_RA_ERROR);
640 633 break;
641 634 default:
642 635 libvarpd_panic("unexpeted read errno: %d",
643 636 errno);
644 637 }
645 638 } else if (ret == 0) {
646 639 /* Try to reconnect to the remote host */
647 640 return (SVP_RA_ERROR);
648 641 }
649 642
650 643 /* Didn't get all the data we need */
651 644 if (off + ret < sizeof (svp_req_t)) {
652 645 scp->sc_input.sci_offset += ret;
653 646 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
654 647 return (SVP_RA_NONE);
655 648 }
656 649
657 650 if (svp_conn_pollin_validate(scp) != B_TRUE)
658 651 return (SVP_RA_ERROR);
659 652 }
660 653
661 654 sqp = scp->sc_input.sci_query;
662 655 assert(sqp != NULL);
663 656 sqp->sq_acttime = gethrtime();
664 657 total = ntohl(scp->sc_input.sci_req.svp_size);
665 658 do {
666 659 ret = read(scp->sc_socket,
667 660 (void *)((uintptr_t)sqp->sq_wdata + off),
668 661 total - off);
669 662 } while (ret == -1 && errno == EINTR);
670 663
671 664 if (ret == -1) {
672 665 switch (errno) {
|
↓ open down ↓ |
69 lines elided |
↑ open up ↑ |
673 666 case EAGAIN:
674 667 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
675 668 return (SVP_RA_NONE);
676 669 case EIO:
677 670 case ECONNRESET:
678 671 return (SVP_RA_ERROR);
679 672 break;
680 673 default:
681 674 libvarpd_panic("unexpeted read errno: %d", errno);
682 675 }
683 - } else if (ret == 0) {
676 + } else if (ret == 0 && total - off > 0) {
684 677 /* Try to reconnect to the remote host */
685 678 return (SVP_RA_ERROR);
686 679 }
687 680
688 681 if (ret + off < total) {
689 682 scp->sc_input.sci_offset += ret;
690 683 return (SVP_RA_NONE);
691 684 }
692 685
693 686 nop = ntohs(scp->sc_input.sci_req.svp_op);
694 687 crc = scp->sc_input.sci_req.svp_crc32;
695 688 svp_query_crc32(&scp->sc_input.sci_req, sqp->sq_wdata, total);
696 689 if (crc != scp->sc_input.sci_req.svp_crc32) {
697 690 (void) bunyan_info(svp_bunyan, "crc32 mismatch",
698 691 BUNYAN_T_IP, "remote ip", &scp->sc_addr,
699 692 BUNYAN_T_INT32, "remote port", scp->sc_remote->sr_rport,
700 693 BUNYAN_T_INT32, "version",
701 694 ntohs(scp->sc_input.sci_req.svp_ver),
702 695 BUNYAN_T_INT32, "operation", nop,
703 696 BUNYAN_T_INT32, "response id",
704 697 ntohl(scp->sc_input.sci_req.svp_id),
705 698 BUNYAN_T_INT32, "query state", sqp->sq_state,
706 699 BUNYAN_T_UINT32, "msg_crc", ntohl(crc),
707 700 BUNYAN_T_UINT32, "calc_crc",
708 701 ntohl(scp->sc_input.sci_req.svp_crc32),
709 702 BUNYAN_T_END);
710 703 return (SVP_RA_ERROR);
711 704 }
712 705 scp->sc_input.sci_query = NULL;
713 706 scp->sc_input.sci_offset = 0;
714 707
715 708 if (nop == SVP_R_VL2_ACK) {
716 709 svp_vl2_ack_t *sl2a = sqp->sq_wdata;
|
↓ open down ↓ |
23 lines elided |
↑ open up ↑ |
717 710 sqp->sq_status = ntohl(sl2a->sl2a_status);
718 711 } else if (nop == SVP_R_VL3_ACK) {
719 712 svp_vl3_ack_t *sl3a = sqp->sq_wdata;
720 713 sqp->sq_status = ntohl(sl3a->sl3a_status);
721 714 } else if (nop == SVP_R_LOG_ACK) {
722 715 svp_log_ack_t *svla = sqp->sq_wdata;
723 716 sqp->sq_status = ntohl(svla->svla_status);
724 717 } else if (nop == SVP_R_LOG_RM_ACK) {
725 718 svp_lrm_ack_t *svra = sqp->sq_wdata;
726 719 sqp->sq_status = ntohl(svra->svra_status);
720 + } else if (nop == SVP_R_ROUTE_ACK) {
721 + svp_route_ack_t *sra = sqp->sq_wdata;
722 + sqp->sq_status = ntohl(sra->sra_status);
723 + } else if (nop == SVP_R_PONG) {
724 + /*
725 + * Handle the PONG versioning-capture here, as we need
726 + * the version number, the scp_lock held, and the ability
727 + * to error out.
728 + */
729 + svp_conn_act_t cbret;
730 +
731 + cbret = svp_conn_pong_handler(scp, sqp);
732 + if (cbret != SVP_RA_NONE)
733 + return (cbret);
727 734 } else {
728 735 libvarpd_panic("unhandled nop: %d", nop);
729 736 }
730 737
731 738 list_remove(&scp->sc_queries, sqp);
732 739 mutex_exit(&scp->sc_lock);
733 740
734 741 /*
735 742 * We have to release all of our resources associated with this entry
736 743 * before we call the callback. After we call it, the memory will be
737 744 * lost to time.
738 745 */
739 746 svp_query_release(sqp);
740 747 sqp->sq_func(sqp, sqp->sq_arg);
741 748 mutex_enter(&scp->sc_lock);
742 749 scp->sc_event.se_events |= POLLIN | POLLRDNORM;
743 750
744 751 return (SVP_RA_NONE);
745 752 }
746 753
747 754 static svp_conn_act_t
748 755 svp_conn_reset(svp_conn_t *scp)
749 756 {
750 757 svp_remote_t *srp = scp->sc_remote;
751 758
752 759 assert(MUTEX_HELD(&srp->sr_lock));
753 760 assert(MUTEX_HELD(&scp->sc_lock));
754 761
755 762 assert(svp_event_dissociate(&scp->sc_event, scp->sc_socket) ==
756 763 ENOENT);
757 764 if (close(scp->sc_socket) != 0)
758 765 libvarpd_panic("failed to close socket %d: %d", scp->sc_socket,
759 766 errno);
760 767 scp->sc_flags &= ~SVP_CF_TEARDOWN;
761 768 scp->sc_socket = -1;
762 769 scp->sc_cstate = SVP_CS_INITIAL;
763 770 scp->sc_input.sci_query = NULL;
764 771 scp->sc_output.sco_query = NULL;
765 772
766 773 svp_remote_reassign(srp, scp);
767 774
768 775 return (svp_conn_connect(scp));
769 776 }
770 777
771 778 /*
772 779 * This is our general state transition function. We're called here when we want
773 780 * to advance part of our state machine as well as to re-arm ourselves. We can
774 781 * also end up here from the standard event loop as a result of having a user
775 782 * event posted.
776 783 */
777 784 static void
778 785 svp_conn_handler(port_event_t *pe, void *arg)
779 786 {
780 787 svp_conn_t *scp = arg;
781 788 svp_remote_t *srp = scp->sc_remote;
782 789 svp_conn_act_t ret = SVP_RA_NONE;
783 790 svp_conn_state_t oldstate;
784 791
785 792 mutex_enter(&scp->sc_lock);
786 793
787 794 /*
788 795 * Check if one of our event interrupts is set. An event interrupt, such
789 796 * as having to be reaped or be torndown is notified by a
790 797 * PORT_SOURCE_USER event that tries to take care of this. However,
791 798 * because of the fact that the event loop can be ongoing despite this,
792 799 * we may get here before the PORT_SOURCE_USER has casued us to get
793 800 * here. In such a case, if the PORT_SOURCE_USER event is tagged, then
794 801 * we're going to opt to do nothing here and wait for it to come and
795 802 * tear us down. That will also indicate to us that we have nothing to
796 803 * worry about as far as general timing and the like goes.
797 804 */
798 805 if ((scp->sc_flags & SVP_CF_UFLAG) != 0 &&
799 806 (scp->sc_flags & SVP_CF_USER) != 0 &&
800 807 pe != NULL &&
801 808 pe->portev_source != PORT_SOURCE_USER) {
802 809 mutex_exit(&scp->sc_lock);
803 810 return;
804 811 }
805 812
806 813 if (pe != NULL && pe->portev_source == PORT_SOURCE_USER) {
807 814 scp->sc_flags &= ~SVP_CF_USER;
808 815 if ((scp->sc_flags & SVP_CF_UFLAG) == 0) {
809 816 mutex_exit(&scp->sc_lock);
810 817 return;
811 818 }
812 819 }
813 820
814 821 /* Check if this needs to be freed */
815 822 if (scp->sc_flags & SVP_CF_REAP) {
816 823 mutex_exit(&scp->sc_lock);
817 824 svp_conn_destroy(scp);
818 825 return;
819 826 }
820 827
821 828 /* Check if this needs to be reset */
822 829 if (scp->sc_flags & SVP_CF_TEARDOWN) {
823 830 /* Make sure any other users of this are disassociated */
824 831 ret = SVP_RA_ERROR;
825 832 goto out;
826 833 }
827 834
|
↓ open down ↓ |
91 lines elided |
↑ open up ↑ |
828 835 switch (scp->sc_cstate) {
829 836 case SVP_CS_INITIAL:
830 837 case SVP_CS_BACKOFF:
831 838 assert(pe == NULL);
832 839 ret = svp_conn_connect(scp);
833 840 break;
834 841 case SVP_CS_CONNECTING:
835 842 assert(pe != NULL);
836 843 ret = svp_conn_poll_connect(pe, scp);
837 844 break;
845 + case SVP_CS_VERSIONING:
838 846 case SVP_CS_ACTIVE:
839 847 case SVP_CS_WINDDOWN:
840 848 assert(pe != NULL);
841 849 oldstate = scp->sc_cstate;
842 850 if (pe->portev_events & POLLOUT)
843 851 ret = svp_conn_pollout(scp);
844 852 if (ret == SVP_RA_NONE && (pe->portev_events & POLLIN))
845 853 ret = svp_conn_pollin(scp);
846 854
847 855 if (oldstate == SVP_CS_WINDDOWN &&
848 856 (list_is_empty(&scp->sc_queries) || ret != SVP_RA_NONE)) {
849 857 ret = SVP_RA_CLEANUP;
850 858 }
851 859
852 860 if (ret == SVP_RA_NONE) {
853 861 int err;
854 862 if ((err = svp_event_associate(&scp->sc_event,
855 863 scp->sc_socket)) != 0) {
856 864 scp->sc_error = SVP_CE_ASSOCIATE;
857 865 scp->sc_errno = err;
858 866 scp->sc_cstate = SVP_CS_ERROR;
859 867 ret = SVP_RA_DEGRADE;
860 868 }
861 869 }
862 870 break;
863 871 default:
864 872 libvarpd_panic("svp_conn_handler encountered unexpected "
|
↓ open down ↓ |
17 lines elided |
↑ open up ↑ |
865 873 "state: %d", scp->sc_cstate);
866 874 }
867 875 out:
868 876 mutex_exit(&scp->sc_lock);
869 877
870 878 if (ret == SVP_RA_NONE)
871 879 return;
872 880
873 881 mutex_enter(&srp->sr_lock);
874 882 mutex_enter(&scp->sc_lock);
883 + if (ret == SVP_RA_FIND_VERSION)
884 + ret = svp_conn_ping_version(scp);
885 +
875 886 if (ret == SVP_RA_ERROR)
876 887 ret = svp_conn_reset(scp);
877 888
878 889 if (ret == SVP_RA_DEGRADE)
879 890 svp_conn_degrade(scp);
880 891 if (ret == SVP_RA_RESTORE)
881 892 svp_conn_restore(scp);
882 893
883 894 if (ret == SVP_RA_CLEANUP) {
884 895 svp_conn_remove(scp);
885 896 scp->sc_flags |= SVP_CF_REAP;
886 897 svp_conn_inject(scp);
887 898 }
888 899 mutex_exit(&scp->sc_lock);
889 900 mutex_exit(&srp->sr_lock);
890 901 }
891 902
892 903 static void
893 904 svp_conn_backtimer(void *arg)
894 905 {
895 906 svp_conn_t *scp = arg;
896 907
897 908 svp_conn_handler(NULL, scp);
898 909 }
899 910
900 911 /*
901 912 * This fires every svp_conn_query_timeout seconds. Its purpos is to determine
902 913 * if we haven't heard back on a request with in svp_conn_query_timeout seconds.
903 914 * If any of the svp_conn_query_t's that have been started (indicated by
904 915 * svp_query_t`sq_acttime != -1), and more than svp_conn_query_timeout seconds
905 916 * have passed, we basically tear this connection down and reassign outstanding
906 917 * queries.
907 918 */
908 919 static void
909 920 svp_conn_querytimer(void *arg)
910 921 {
911 922 int ret;
912 923 svp_query_t *sqp;
913 924 svp_conn_t *scp = arg;
914 925 hrtime_t now = gethrtime();
915 926
916 927 mutex_enter(&scp->sc_lock);
917 928
918 929 /*
919 930 * If we're not in the active state, then we don't care about this as
920 931 * we're already either going to die or we have no connections to worry
921 932 * about.
922 933 */
923 934 if (scp->sc_cstate != SVP_CS_ACTIVE) {
924 935 mutex_exit(&scp->sc_lock);
925 936 return;
926 937 }
927 938
928 939 for (sqp = list_head(&scp->sc_queries); sqp != NULL;
929 940 sqp = list_next(&scp->sc_queries, sqp)) {
930 941 if (sqp->sq_acttime == -1)
931 942 continue;
932 943 if ((now - sqp->sq_acttime) / NANOSEC > svp_conn_query_timeout)
933 944 break;
934 945 }
935 946
936 947 /* Nothing timed out, we're good here */
937 948 if (sqp == NULL) {
938 949 mutex_exit(&scp->sc_lock);
939 950 return;
940 951 }
941 952
942 953 (void) bunyan_warn(svp_bunyan, "query timed out on connection",
943 954 BUNYAN_T_IP, "remote_ip", &scp->sc_addr,
944 955 BUNYAN_T_INT32, "remote_port", scp->sc_remote->sr_rport,
945 956 BUNYAN_T_INT32, "operation", ntohs(sqp->sq_header.svp_op),
946 957 BUNYAN_T_END);
947 958
948 959 /*
949 960 * Begin the tear down process for this connect. If we lose the
950 961 * disassociate, then we don't inject an event. See the big theory
951 962 * statement in libvarpd_svp.c for more information.
952 963 */
953 964 scp->sc_flags |= SVP_CF_TEARDOWN;
954 965
955 966 ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket);
956 967 if (ret == 0)
957 968 svp_conn_inject(scp);
958 969 else
959 970 VERIFY(ret == ENOENT);
960 971
961 972 mutex_exit(&scp->sc_lock);
962 973 }
963 974
964 975 /*
965 976 * This connection has fallen out of DNS, figure out what we need to do with it.
966 977 */
967 978 void
968 979 svp_conn_fallout(svp_conn_t *scp)
969 980 {
970 981 svp_remote_t *srp = scp->sc_remote;
971 982
972 983 assert(MUTEX_HELD(&srp->sr_lock));
973 984
974 985 mutex_enter(&scp->sc_lock);
975 986 switch (scp->sc_cstate) {
976 987 case SVP_CS_ERROR:
977 988 /*
978 989 * Connection is already inactive, so it's safe to tear down.
979 990 * Fire it off through the state machine to tear down via the
980 991 * backoff timer.
981 992 */
982 993 svp_conn_remove(scp);
983 994 scp->sc_flags |= SVP_CF_REAP;
984 995 svp_conn_inject(scp);
985 996 break;
986 997 case SVP_CS_INITIAL:
987 998 case SVP_CS_BACKOFF:
988 999 case SVP_CS_CONNECTING:
989 1000 /*
990 1001 * Here, we have something actively going on, so we'll let it be
991 1002 * clean up the next time we hit the event loop by the event
992 1003 * loop itself. As it has no connections, there isn't much to
993 1004 * really do, though we'll take this chance to go ahead and
994 1005 * remove it from the remote.
995 1006 */
996 1007 svp_conn_remove(scp);
997 1008 scp->sc_flags |= SVP_CF_REAP;
998 1009 svp_conn_inject(scp);
999 1010 break;
1000 1011 case SVP_CS_ACTIVE:
1001 1012 case SVP_CS_WINDDOWN:
1002 1013 /*
1003 1014 * If there are no outstanding queries, then we should simply
1004 1015 * clean this up now,t he same way we would with the others.
1005 1016 * Othewrise, as we know the event loop is ongoing, we'll make
1006 1017 * sure that these entries get cleaned up once they're done.
1007 1018 */
1008 1019 scp->sc_cstate = SVP_CS_WINDDOWN;
1009 1020 if (list_is_empty(&scp->sc_queries)) {
1010 1021 svp_conn_remove(scp);
1011 1022 scp->sc_flags |= SVP_CF_REAP;
1012 1023 svp_conn_inject(scp);
1013 1024 }
1014 1025 break;
1015 1026 default:
1016 1027 libvarpd_panic("svp_conn_fallout encountered"
1017 1028 "unkonwn state");
1018 1029 }
1019 1030 mutex_exit(&scp->sc_lock);
1020 1031 mutex_exit(&srp->sr_lock);
1021 1032 }
1022 1033
1023 1034 int
1024 1035 svp_conn_create(svp_remote_t *srp, const struct in6_addr *addr)
1025 1036 {
1026 1037 int ret;
1027 1038 svp_conn_t *scp;
1028 1039
1029 1040 assert(MUTEX_HELD(&srp->sr_lock));
1030 1041 scp = umem_zalloc(sizeof (svp_conn_t), UMEM_DEFAULT);
1031 1042 if (scp == NULL)
1032 1043 return (ENOMEM);
1033 1044
1034 1045 if ((ret = mutex_init(&scp->sc_lock, USYNC_THREAD | LOCK_ERRORCHECK,
1035 1046 NULL)) != 0) {
1036 1047 umem_free(scp, sizeof (svp_conn_t));
1037 1048 return (ret);
1038 1049 }
1039 1050
1040 1051 scp->sc_remote = srp;
1041 1052 scp->sc_event.se_func = svp_conn_handler;
1042 1053 scp->sc_event.se_arg = scp;
1043 1054 scp->sc_btimer.st_func = svp_conn_backtimer;
1044 1055 scp->sc_btimer.st_arg = scp;
1045 1056 scp->sc_btimer.st_oneshot = B_TRUE;
1046 1057 scp->sc_btimer.st_value = 1;
1047 1058
1048 1059 scp->sc_qtimer.st_func = svp_conn_querytimer;
1049 1060 scp->sc_qtimer.st_arg = scp;
1050 1061 scp->sc_qtimer.st_oneshot = B_FALSE;
1051 1062 scp->sc_qtimer.st_value = svp_conn_query_timeout;
1052 1063
1053 1064 scp->sc_socket = -1;
1054 1065
1055 1066 list_create(&scp->sc_queries, sizeof (svp_query_t),
1056 1067 offsetof(svp_query_t, sq_lnode));
1057 1068 scp->sc_gen = srp->sr_gen;
1058 1069 bcopy(addr, &scp->sc_addr, sizeof (struct in6_addr));
1059 1070 scp->sc_cstate = SVP_CS_INITIAL;
1060 1071 mutex_enter(&scp->sc_lock);
1061 1072 svp_conn_add(scp);
1062 1073 mutex_exit(&scp->sc_lock);
1063 1074
1064 1075 /* Now that we're locked and loaded, add our timers */
1065 1076 svp_timer_add(&scp->sc_qtimer);
1066 1077 svp_timer_add(&scp->sc_btimer);
1067 1078
1068 1079 return (0);
1069 1080 }
1070 1081
1071 1082 /*
1072 1083 * At the time of calling, the entry has been removed from all lists. In
1073 1084 * addition, the entries state should be SVP_CS_ERROR, therefore, we know that
1074 1085 * the fd should not be associated with the event loop. We'll double check that
1075 1086 * just in case. We should also have already been removed from the remote's
1076 1087 * list.
1077 1088 */
1078 1089 void
1079 1090 svp_conn_destroy(svp_conn_t *scp)
1080 1091 {
1081 1092 int ret;
1082 1093
1083 1094 mutex_enter(&scp->sc_lock);
1084 1095 if (scp->sc_cstate != SVP_CS_ERROR)
1085 1096 libvarpd_panic("asked to tear down an active connection");
1086 1097 if (scp->sc_flags & SVP_CF_ADDED)
1087 1098 libvarpd_panic("asked to remove a connection still in "
1088 1099 "the remote list\n");
1089 1100 if (!list_is_empty(&scp->sc_queries))
1090 1101 libvarpd_panic("asked to remove a connection with non-empty "
1091 1102 "query list");
1092 1103
1093 1104 if ((ret = svp_event_dissociate(&scp->sc_event, scp->sc_socket)) !=
1094 1105 ENOENT) {
1095 1106 libvarpd_panic("dissociate failed or was actually "
1096 1107 "associated: %d", ret);
1097 1108 }
1098 1109 mutex_exit(&scp->sc_lock);
1099 1110
1100 1111 /* Verify our timers are killed */
1101 1112 svp_timer_remove(&scp->sc_btimer);
1102 1113 svp_timer_remove(&scp->sc_qtimer);
1103 1114
1104 1115 if (scp->sc_socket != -1 && close(scp->sc_socket) != 0)
1105 1116 libvarpd_panic("failed to close svp_conn_t`scp_socket fd "
|
↓ open down ↓ |
221 lines elided |
↑ open up ↑ |
1106 1117 "%d: %d", scp->sc_socket, errno);
1107 1118
1108 1119 list_destroy(&scp->sc_queries);
1109 1120 umem_free(scp, sizeof (svp_conn_t));
1110 1121 }
1111 1122
1112 1123 void
1113 1124 svp_conn_queue(svp_conn_t *scp, svp_query_t *sqp)
1114 1125 {
1115 1126 assert(MUTEX_HELD(&scp->sc_lock));
1116 - assert(scp->sc_cstate == SVP_CS_ACTIVE);
1127 + assert(scp->sc_cstate == SVP_CS_ACTIVE ||
1128 + scp->sc_cstate == SVP_CS_VERSIONING);
1117 1129
1118 1130 sqp->sq_acttime = -1;
1119 1131 list_insert_tail(&scp->sc_queries, sqp);
1120 1132 if (!(scp->sc_event.se_events & POLLOUT)) {
1121 1133 scp->sc_event.se_events |= POLLOUT;
1122 1134 /*
1123 1135 * If this becomes frequent, we should instead give up on this
1124 1136 * set of connections instead of aborting.
1125 1137 */
1126 1138 if (svp_event_associate(&scp->sc_event, scp->sc_socket) != 0)
1127 1139 libvarpd_panic("svp_event_associate failed somehow");
1128 1140 }
1129 1141 }
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX