Print this page
3354 kernel crash in rpcsec_gss after using gsscred
Reviewed by: Toomas Soome <tsoome@me.com>
Reviewed by: Carlos Neira <cneirabustos@gmail.com>
Approved by: Robert Mustacchi <rm@joyent.com>
re #12783 rb4338 Flow control is needed in rpcmod when the NFS server is unable to keep up with the network
| Split |
Close |
| Expand all |
| Collapse all |
--- old/usr/src/uts/common/rpc/svc_rdma.c
+++ new/usr/src/uts/common/rpc/svc_rdma.c
1 1 /*
2 2 * CDDL HEADER START
3 3 *
4 4 * The contents of this file are subject to the terms of the
5 5 * Common Development and Distribution License (the "License").
6 6 * You may not use this file except in compliance with the License.
7 7 *
8 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 9 * or http://www.opensolaris.org/os/licensing.
10 10 * See the License for the specific language governing permissions
11 11 * and limitations under the License.
12 12 *
13 13 * When distributing Covered Code, include this CDDL HEADER in each
14 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
|
↓ open down ↓ |
14 lines elided |
↑ open up ↑ |
15 15 * If applicable, add the following below this CDDL HEADER, with the
16 16 * fields enclosed by brackets "[]" replaced with your own identifying
17 17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 18 *
19 19 * CDDL HEADER END
20 20 */
21 21 /*
22 22 * Copyright (c) 1983, 2010, Oracle and/or its affiliates. All rights reserved.
23 23 * Copyright (c) 2012 by Delphix. All rights reserved.
24 24 * Copyright 2013 Nexenta Systems, Inc. All rights reserved.
25 + * Copyright 2012 Marcel Telka <marcel@telka.sk>
26 + * Copyright 2018 OmniOS Community Edition (OmniOSce) Association.
25 27 */
26 28 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
27 29 /* All Rights Reserved */
28 30 /*
29 31 * Portions of this source code were derived from Berkeley
30 32 * 4.3 BSD under license from the Regents of the University of
31 33 * California.
32 34 */
33 35
34 36 /*
35 37 * Server side of RPC over RDMA in the kernel.
36 38 */
37 39
38 40 #include <sys/param.h>
39 41 #include <sys/types.h>
40 42 #include <sys/user.h>
41 43 #include <sys/sysmacros.h>
42 44 #include <sys/proc.h>
43 45 #include <sys/file.h>
44 46 #include <sys/errno.h>
45 47 #include <sys/kmem.h>
46 48 #include <sys/debug.h>
47 49 #include <sys/systm.h>
48 50 #include <sys/cmn_err.h>
49 51 #include <sys/kstat.h>
50 52 #include <sys/vtrace.h>
51 53 #include <sys/debug.h>
52 54
53 55 #include <rpc/types.h>
54 56 #include <rpc/xdr.h>
55 57 #include <rpc/auth.h>
56 58 #include <rpc/clnt.h>
57 59 #include <rpc/rpc_msg.h>
58 60 #include <rpc/svc.h>
59 61 #include <rpc/rpc_rdma.h>
60 62 #include <sys/ddi.h>
61 63 #include <sys/sunddi.h>
62 64
63 65 #include <inet/common.h>
64 66 #include <inet/ip.h>
65 67 #include <inet/ip6.h>
66 68
67 69 #include <nfs/nfs.h>
68 70 #include <sys/sdt.h>
69 71
70 72 #define SVC_RDMA_SUCCESS 0
71 73 #define SVC_RDMA_FAIL -1
72 74
73 75 #define SVC_CREDIT_FACTOR (0.5)
74 76
|
↓ open down ↓ |
40 lines elided |
↑ open up ↑ |
75 77 #define MSG_IS_RPCSEC_GSS(msg) \
76 78 ((msg)->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS)
77 79
78 80
79 81 uint32_t rdma_bufs_granted = RDMA_BUFS_GRANT;
80 82
81 83 /*
82 84 * RDMA transport specific data associated with SVCMASTERXPRT
83 85 */
84 86 struct rdma_data {
85 - SVCMASTERXPRT *rd_xprt; /* back ptr to SVCMASTERXPRT */
87 + SVCMASTERXPRT *rd_xprt; /* back ptr to SVCMASTERXPRT */
86 88 struct rdma_svc_data rd_data; /* rdma data */
87 89 rdma_mod_t *r_mod; /* RDMA module containing ops ptr */
88 90 };
89 91
90 92 /*
91 93 * Plugin connection specific data stashed away in clone SVCXPRT
92 94 */
93 95 struct clone_rdma_data {
94 96 bool_t cloned; /* xprt cloned for thread processing */
95 97 CONN *conn; /* RDMA connection */
96 98 rdma_buf_t rpcbuf; /* RPC req/resp buffer */
97 99 struct clist *cl_reply; /* reply chunk buffer info */
98 100 struct clist *cl_wlist; /* write list clist */
99 101 };
100 102
101 103
102 104 #define MAXADDRLEN 128 /* max length for address mask */
103 105
104 106 /*
105 107 * Routines exported through ops vector.
106 108 */
107 109 static bool_t svc_rdma_krecv(SVCXPRT *, mblk_t *, struct rpc_msg *);
108 110 static bool_t svc_rdma_ksend(SVCXPRT *, struct rpc_msg *);
109 111 static bool_t svc_rdma_kgetargs(SVCXPRT *, xdrproc_t, caddr_t);
110 112 static bool_t svc_rdma_kfreeargs(SVCXPRT *, xdrproc_t, caddr_t);
111 113 void svc_rdma_kdestroy(SVCMASTERXPRT *);
112 114 static int svc_rdma_kdup(struct svc_req *, caddr_t, int,
113 115 struct dupreq **, bool_t *);
114 116 static void svc_rdma_kdupdone(struct dupreq *, caddr_t,
115 117 void (*)(), int, int);
116 118 static int32_t *svc_rdma_kgetres(SVCXPRT *, int);
117 119 static void svc_rdma_kfreeres(SVCXPRT *);
118 120 static void svc_rdma_kclone_destroy(SVCXPRT *);
119 121 static void svc_rdma_kstart(SVCMASTERXPRT *);
120 122 void svc_rdma_kstop(SVCMASTERXPRT *);
121 123 static void svc_rdma_kclone_xprt(SVCXPRT *, SVCXPRT *);
122 124 static void svc_rdma_ktattrs(SVCXPRT *, int, void **);
123 125
124 126 static int svc_process_long_reply(SVCXPRT *, xdrproc_t,
125 127 caddr_t, struct rpc_msg *, bool_t, int *,
126 128 int *, int *, unsigned int *);
127 129
128 130 static int svc_compose_rpcmsg(SVCXPRT *, CONN *, xdrproc_t,
129 131 caddr_t, rdma_buf_t *, XDR **, struct rpc_msg *,
130 132 bool_t, uint_t *);
131 133 static bool_t rpcmsg_length(xdrproc_t,
132 134 caddr_t,
133 135 struct rpc_msg *, bool_t, int);
134 136
135 137 /*
136 138 * Server transport operations vector.
137 139 */
138 140 struct svc_ops rdma_svc_ops = {
139 141 svc_rdma_krecv, /* Get requests */
140 142 svc_rdma_kgetargs, /* Deserialize arguments */
|
↓ open down ↓ |
45 lines elided |
↑ open up ↑ |
141 143 svc_rdma_ksend, /* Send reply */
142 144 svc_rdma_kfreeargs, /* Free argument data space */
143 145 svc_rdma_kdestroy, /* Destroy transport handle */
144 146 svc_rdma_kdup, /* Check entry in dup req cache */
145 147 svc_rdma_kdupdone, /* Mark entry in dup req cache as done */
146 148 svc_rdma_kgetres, /* Get pointer to response buffer */
147 149 svc_rdma_kfreeres, /* Destroy pre-serialized response header */
148 150 svc_rdma_kclone_destroy, /* Destroy a clone xprt */
149 151 svc_rdma_kstart, /* Tell `ready-to-receive' to rpcmod */
150 152 svc_rdma_kclone_xprt, /* Transport specific clone xprt */
151 - svc_rdma_ktattrs /* Get Transport Attributes */
153 + svc_rdma_ktattrs, /* Get Transport Attributes */
154 + NULL, /* Increment transport reference count */
155 + NULL /* Decrement transport reference count */
152 156 };
153 157
154 158 /*
155 159 * Server statistics
156 160 * NOTE: This structure type is duplicated in the NFS fast path.
157 161 */
158 162 struct {
159 163 kstat_named_t rscalls;
160 164 kstat_named_t rsbadcalls;
161 165 kstat_named_t rsnullrecv;
162 166 kstat_named_t rsbadlen;
163 167 kstat_named_t rsxdrcall;
164 168 kstat_named_t rsdupchecks;
165 169 kstat_named_t rsdupreqs;
166 170 kstat_named_t rslongrpcs;
167 171 kstat_named_t rstotalreplies;
168 172 kstat_named_t rstotallongreplies;
169 173 kstat_named_t rstotalinlinereplies;
170 174 } rdmarsstat = {
171 175 { "calls", KSTAT_DATA_UINT64 },
172 176 { "badcalls", KSTAT_DATA_UINT64 },
173 177 { "nullrecv", KSTAT_DATA_UINT64 },
174 178 { "badlen", KSTAT_DATA_UINT64 },
175 179 { "xdrcall", KSTAT_DATA_UINT64 },
176 180 { "dupchecks", KSTAT_DATA_UINT64 },
177 181 { "dupreqs", KSTAT_DATA_UINT64 },
178 182 { "longrpcs", KSTAT_DATA_UINT64 },
179 183 { "totalreplies", KSTAT_DATA_UINT64 },
180 184 { "totallongreplies", KSTAT_DATA_UINT64 },
181 185 { "totalinlinereplies", KSTAT_DATA_UINT64 },
182 186 };
183 187
184 188 kstat_named_t *rdmarsstat_ptr = (kstat_named_t *)&rdmarsstat;
185 189 uint_t rdmarsstat_ndata = sizeof (rdmarsstat) / sizeof (kstat_named_t);
186 190
187 191 #define RSSTAT_INCR(x) atomic_inc_64(&rdmarsstat.x.value.ui64)
188 192 /*
189 193 * Create a transport record.
190 194 * The transport record, output buffer, and private data structure
191 195 * are allocated. The output buffer is serialized into using xdrmem.
192 196 * There is one transport record per user process which implements a
193 197 * set of services.
194 198 */
195 199 /* ARGSUSED */
196 200 int
197 201 svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id,
198 202 rdma_xprt_group_t *started_xprts)
199 203 {
200 204 int error;
201 205 SVCMASTERXPRT *xprt;
202 206 struct rdma_data *rd;
203 207 rdma_registry_t *rmod;
204 208 rdma_xprt_record_t *xprt_rec;
205 209 queue_t *q;
206 210 /*
207 211 * modload the RDMA plugins is not already done.
208 212 */
209 213 if (!rdma_modloaded) {
210 214 /*CONSTANTCONDITION*/
211 215 ASSERT(sizeof (struct clone_rdma_data) <= SVC_P2LEN);
212 216
213 217 mutex_enter(&rdma_modload_lock);
214 218 if (!rdma_modloaded) {
215 219 error = rdma_modload();
216 220 }
217 221 mutex_exit(&rdma_modload_lock);
218 222
219 223 if (error)
220 224 return (error);
221 225 }
222 226
223 227 /*
224 228 * master_xprt_count is the count of master transport handles
225 229 * that were successfully created and are ready to recieve for
226 230 * RDMA based access.
227 231 */
228 232 error = 0;
229 233 xprt_rec = NULL;
230 234 rw_enter(&rdma_lock, RW_READER);
231 235 if (rdma_mod_head == NULL) {
232 236 started_xprts->rtg_count = 0;
233 237 rw_exit(&rdma_lock);
234 238 if (rdma_dev_available)
235 239 return (EPROTONOSUPPORT);
236 240 else
237 241 return (ENODEV);
238 242 }
239 243
240 244 /*
241 245 * If we have reached here, then atleast one RDMA plugin has loaded.
242 246 * Create a master_xprt, make it start listenining on the device,
243 247 * if an error is generated, record it, we might need to shut
244 248 * the master_xprt.
245 249 * SVC_START() calls svc_rdma_kstart which calls plugin binding
246 250 * routines.
247 251 */
248 252 for (rmod = rdma_mod_head; rmod != NULL; rmod = rmod->r_next) {
249 253
250 254 /*
251 255 * One SVCMASTERXPRT per RDMA plugin.
252 256 */
253 257 xprt = kmem_zalloc(sizeof (*xprt), KM_SLEEP);
254 258 xprt->xp_ops = &rdma_svc_ops;
255 259 xprt->xp_sct = sct;
256 260 xprt->xp_type = T_RDMA;
257 261 mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL);
258 262 mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL);
259 263 xprt->xp_req_head = (mblk_t *)0;
260 264 xprt->xp_req_tail = (mblk_t *)0;
261 265 xprt->xp_full = FALSE;
262 266 xprt->xp_enable = FALSE;
263 267 xprt->xp_reqs = 0;
264 268 xprt->xp_size = 0;
265 269 xprt->xp_threads = 0;
266 270 xprt->xp_detached_threads = 0;
267 271
268 272 rd = kmem_zalloc(sizeof (*rd), KM_SLEEP);
269 273 xprt->xp_p2 = (caddr_t)rd;
270 274 rd->rd_xprt = xprt;
271 275 rd->r_mod = rmod->r_mod;
272 276
273 277 q = &rd->rd_data.q;
274 278 xprt->xp_wq = q;
275 279 q->q_ptr = &rd->rd_xprt;
276 280 xprt->xp_netid = NULL;
277 281
278 282 /*
279 283 * Each of the plugins will have their own Service ID
280 284 * to listener specific mapping, like port number for VI
281 285 * and service name for IB.
282 286 */
283 287 rd->rd_data.svcid = id;
284 288 error = svc_xprt_register(xprt, id);
285 289 if (error) {
286 290 DTRACE_PROBE(krpc__e__svcrdma__xprt__reg);
287 291 goto cleanup;
288 292 }
289 293
290 294 SVC_START(xprt);
291 295 if (!rd->rd_data.active) {
292 296 svc_xprt_unregister(xprt);
293 297 error = rd->rd_data.err_code;
294 298 goto cleanup;
295 299 }
296 300
297 301 /*
298 302 * This is set only when there is atleast one or more
299 303 * transports successfully created. We insert the pointer
300 304 * to the created RDMA master xprt into a separately maintained
301 305 * list. This way we can easily reference it later to cleanup,
302 306 * when NFS kRPC service pool is going away/unregistered.
303 307 */
304 308 started_xprts->rtg_count ++;
305 309 xprt_rec = kmem_alloc(sizeof (*xprt_rec), KM_SLEEP);
306 310 xprt_rec->rtr_xprt_ptr = xprt;
307 311 xprt_rec->rtr_next = started_xprts->rtg_listhead;
308 312 started_xprts->rtg_listhead = xprt_rec;
309 313 continue;
310 314 cleanup:
311 315 SVC_DESTROY(xprt);
312 316 if (error == RDMA_FAILED)
313 317 error = EPROTONOSUPPORT;
314 318 }
315 319
316 320 rw_exit(&rdma_lock);
317 321
318 322 /*
319 323 * Don't return any error even if a single plugin was started
320 324 * successfully.
321 325 */
322 326 if (started_xprts->rtg_count == 0)
323 327 return (error);
324 328 return (0);
325 329 }
326 330
327 331 /*
328 332 * Cleanup routine for freeing up memory allocated by
329 333 * svc_rdma_kcreate()
330 334 */
331 335 void
332 336 svc_rdma_kdestroy(SVCMASTERXPRT *xprt)
333 337 {
334 338 struct rdma_data *rd = (struct rdma_data *)xprt->xp_p2;
335 339
336 340
337 341 mutex_destroy(&xprt->xp_req_lock);
338 342 mutex_destroy(&xprt->xp_thread_lock);
339 343 kmem_free(rd, sizeof (*rd));
340 344 kmem_free(xprt, sizeof (*xprt));
341 345 }
342 346
343 347
344 348 static void
345 349 svc_rdma_kstart(SVCMASTERXPRT *xprt)
346 350 {
347 351 struct rdma_svc_data *svcdata;
348 352 rdma_mod_t *rmod;
349 353
350 354 svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data;
351 355 rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
352 356
353 357 /*
354 358 * Create a listener for module at this port
355 359 */
356 360
357 361 if (rmod->rdma_count != 0)
358 362 (*rmod->rdma_ops->rdma_svc_listen)(svcdata);
359 363 else
360 364 svcdata->err_code = RDMA_FAILED;
361 365 }
362 366
363 367 void
364 368 svc_rdma_kstop(SVCMASTERXPRT *xprt)
365 369 {
366 370 struct rdma_svc_data *svcdata;
367 371 rdma_mod_t *rmod;
368 372
369 373 svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data;
370 374 rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
371 375
372 376 /*
373 377 * Call the stop listener routine for each plugin. If rdma_count is
374 378 * already zero set active to zero.
375 379 */
376 380 if (rmod->rdma_count != 0)
377 381 (*rmod->rdma_ops->rdma_svc_stop)(svcdata);
378 382 else
379 383 svcdata->active = 0;
380 384 if (svcdata->active)
381 385 DTRACE_PROBE(krpc__e__svcrdma__kstop);
382 386 }
383 387
384 388 /* ARGSUSED */
385 389 static void
386 390 svc_rdma_kclone_destroy(SVCXPRT *clone_xprt)
387 391 {
388 392
389 393 struct clone_rdma_data *cdrp;
390 394 cdrp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
391 395
392 396 /*
393 397 * Only free buffers and release connection when cloned is set.
394 398 */
395 399 if (cdrp->cloned != TRUE)
396 400 return;
397 401
398 402 rdma_buf_free(cdrp->conn, &cdrp->rpcbuf);
399 403 if (cdrp->cl_reply) {
400 404 clist_free(cdrp->cl_reply);
401 405 cdrp->cl_reply = NULL;
402 406 }
403 407 RDMA_REL_CONN(cdrp->conn);
404 408
405 409 cdrp->cloned = 0;
406 410 }
407 411
408 412 /*
409 413 * Clone the xprt specific information. It will be freed by
410 414 * SVC_CLONE_DESTROY.
411 415 */
412 416 static void
413 417 svc_rdma_kclone_xprt(SVCXPRT *src_xprt, SVCXPRT *dst_xprt)
414 418 {
415 419 struct clone_rdma_data *srcp2;
416 420 struct clone_rdma_data *dstp2;
417 421
418 422 srcp2 = (struct clone_rdma_data *)src_xprt->xp_p2buf;
419 423 dstp2 = (struct clone_rdma_data *)dst_xprt->xp_p2buf;
420 424
421 425 if (srcp2->conn != NULL) {
422 426 srcp2->cloned = TRUE;
423 427 *dstp2 = *srcp2;
424 428 }
425 429 }
426 430
427 431 static void
428 432 svc_rdma_ktattrs(SVCXPRT *clone_xprt, int attrflag, void **tattr)
429 433 {
430 434 CONN *conn;
431 435 *tattr = NULL;
432 436
433 437 switch (attrflag) {
434 438 case SVC_TATTR_ADDRMASK:
435 439 conn = ((struct clone_rdma_data *)clone_xprt->xp_p2buf)->conn;
436 440 ASSERT(conn != NULL);
437 441 if (conn)
438 442 *tattr = (void *)&conn->c_addrmask;
439 443 }
440 444 }
441 445
442 446 static bool_t
443 447 svc_rdma_krecv(SVCXPRT *clone_xprt, mblk_t *mp, struct rpc_msg *msg)
444 448 {
445 449 XDR *xdrs;
446 450 CONN *conn;
447 451 rdma_recv_data_t *rdp = (rdma_recv_data_t *)mp->b_rptr;
448 452 struct clone_rdma_data *crdp;
449 453 struct clist *cl = NULL;
450 454 struct clist *wcl = NULL;
451 455 struct clist *cllong = NULL;
452 456
453 457 rdma_stat status;
454 458 uint32_t vers, op, pos, xid;
455 459 uint32_t rdma_credit;
456 460 uint32_t wcl_total_length = 0;
457 461 bool_t wwl = FALSE;
458 462
459 463 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
460 464 RSSTAT_INCR(rscalls);
461 465 conn = rdp->conn;
462 466
463 467 status = rdma_svc_postrecv(conn);
464 468 if (status != RDMA_SUCCESS) {
465 469 DTRACE_PROBE(krpc__e__svcrdma__krecv__postrecv);
466 470 goto badrpc_call;
467 471 }
468 472
469 473 xdrs = &clone_xprt->xp_xdrin;
470 474 xdrmem_create(xdrs, rdp->rpcmsg.addr, rdp->rpcmsg.len, XDR_DECODE);
471 475 xid = *(uint32_t *)rdp->rpcmsg.addr;
472 476 XDR_SETPOS(xdrs, sizeof (uint32_t));
473 477
474 478 if (! xdr_u_int(xdrs, &vers) ||
475 479 ! xdr_u_int(xdrs, &rdma_credit) ||
476 480 ! xdr_u_int(xdrs, &op)) {
477 481 DTRACE_PROBE(krpc__e__svcrdma__krecv__uint);
478 482 goto xdr_err;
479 483 }
480 484
481 485 /* Checking if the status of the recv operation was normal */
482 486 if (rdp->status != 0) {
483 487 DTRACE_PROBE1(krpc__e__svcrdma__krecv__invalid__status,
484 488 int, rdp->status);
485 489 goto badrpc_call;
486 490 }
487 491
488 492 if (! xdr_do_clist(xdrs, &cl)) {
489 493 DTRACE_PROBE(krpc__e__svcrdma__krecv__do__clist);
490 494 goto xdr_err;
491 495 }
492 496
493 497 if (!xdr_decode_wlist_svc(xdrs, &wcl, &wwl, &wcl_total_length, conn)) {
494 498 DTRACE_PROBE(krpc__e__svcrdma__krecv__decode__wlist);
495 499 if (cl)
496 500 clist_free(cl);
497 501 goto xdr_err;
498 502 }
499 503 crdp->cl_wlist = wcl;
500 504
501 505 crdp->cl_reply = NULL;
502 506 (void) xdr_decode_reply_wchunk(xdrs, &crdp->cl_reply);
503 507
504 508 /*
505 509 * A chunk at 0 offset indicates that the RPC call message
506 510 * is in a chunk. Get the RPC call message chunk.
507 511 */
508 512 if (cl != NULL && op == RDMA_NOMSG) {
509 513
510 514 /* Remove RPC call message chunk from chunklist */
511 515 cllong = cl;
512 516 cl = cl->c_next;
513 517 cllong->c_next = NULL;
514 518
515 519
516 520 /* Allocate and register memory for the RPC call msg chunk */
517 521 cllong->rb_longbuf.type = RDMA_LONG_BUFFER;
518 522 cllong->rb_longbuf.len = cllong->c_len > LONG_REPLY_LEN ?
519 523 cllong->c_len : LONG_REPLY_LEN;
520 524
521 525 if (rdma_buf_alloc(conn, &cllong->rb_longbuf)) {
522 526 clist_free(cllong);
523 527 goto cll_malloc_err;
524 528 }
525 529
526 530 cllong->u.c_daddr3 = cllong->rb_longbuf.addr;
527 531
528 532 if (cllong->u.c_daddr == NULL) {
529 533 DTRACE_PROBE(krpc__e__svcrdma__krecv__nomem);
530 534 rdma_buf_free(conn, &cllong->rb_longbuf);
531 535 clist_free(cllong);
532 536 goto cll_malloc_err;
533 537 }
534 538
535 539 status = clist_register(conn, cllong, CLIST_REG_DST);
536 540 if (status) {
537 541 DTRACE_PROBE(krpc__e__svcrdma__krecv__clist__reg);
538 542 rdma_buf_free(conn, &cllong->rb_longbuf);
539 543 clist_free(cllong);
540 544 goto cll_malloc_err;
541 545 }
542 546
543 547 /*
544 548 * Now read the RPC call message in
545 549 */
546 550 status = RDMA_READ(conn, cllong, WAIT);
547 551 if (status) {
548 552 DTRACE_PROBE(krpc__e__svcrdma__krecv__read);
549 553 (void) clist_deregister(conn, cllong);
550 554 rdma_buf_free(conn, &cllong->rb_longbuf);
551 555 clist_free(cllong);
552 556 goto cll_malloc_err;
553 557 }
554 558
555 559 status = clist_syncmem(conn, cllong, CLIST_REG_DST);
556 560 (void) clist_deregister(conn, cllong);
557 561
558 562 xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->u.c_daddr3,
559 563 cllong->c_len, 0, cl, XDR_DECODE, conn);
560 564
561 565 crdp->rpcbuf = cllong->rb_longbuf;
562 566 crdp->rpcbuf.len = cllong->c_len;
563 567 clist_free(cllong);
564 568 RDMA_BUF_FREE(conn, &rdp->rpcmsg);
565 569 } else {
566 570 pos = XDR_GETPOS(xdrs);
567 571 xdrrdma_create(xdrs, rdp->rpcmsg.addr + pos,
568 572 rdp->rpcmsg.len - pos, 0, cl, XDR_DECODE, conn);
569 573 crdp->rpcbuf = rdp->rpcmsg;
570 574
571 575 /* Use xdrrdmablk_ops to indicate there is a read chunk list */
572 576 if (cl != NULL) {
573 577 int32_t flg = XDR_RDMA_RLIST_REG;
574 578
575 579 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg);
576 580 xdrs->x_ops = &xdrrdmablk_ops;
577 581 }
578 582 }
579 583
580 584 if (crdp->cl_wlist) {
581 585 int32_t flg = XDR_RDMA_WLIST_REG;
582 586
583 587 XDR_CONTROL(xdrs, XDR_RDMA_SET_WLIST, crdp->cl_wlist);
584 588 XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg);
585 589 }
586 590
587 591 if (! xdr_callmsg(xdrs, msg)) {
588 592 DTRACE_PROBE(krpc__e__svcrdma__krecv__callmsg);
589 593 RSSTAT_INCR(rsxdrcall);
590 594 goto callmsg_err;
591 595 }
592 596
593 597 /*
594 598 * Point the remote transport address in the service_transport
595 599 * handle at the address in the request.
596 600 */
597 601 clone_xprt->xp_rtaddr.buf = conn->c_raddr.buf;
598 602 clone_xprt->xp_rtaddr.len = conn->c_raddr.len;
599 603 clone_xprt->xp_rtaddr.maxlen = conn->c_raddr.len;
600 604
601 605 clone_xprt->xp_lcladdr.buf = conn->c_laddr.buf;
602 606 clone_xprt->xp_lcladdr.len = conn->c_laddr.len;
603 607 clone_xprt->xp_lcladdr.maxlen = conn->c_laddr.len;
604 608
605 609 /*
606 610 * In case of RDMA, connection management is
607 611 * entirely done in rpcib module and netid in the
608 612 * SVCMASTERXPRT is NULL. Initialize the clone netid
609 613 * from the connection.
610 614 */
611 615
612 616 clone_xprt->xp_netid = conn->c_netid;
613 617
614 618 clone_xprt->xp_xid = xid;
615 619 crdp->conn = conn;
616 620
617 621 freeb(mp);
618 622
619 623 return (TRUE);
620 624
621 625 callmsg_err:
622 626 rdma_buf_free(conn, &crdp->rpcbuf);
623 627
624 628 cll_malloc_err:
625 629 if (cl)
626 630 clist_free(cl);
627 631 xdr_err:
628 632 XDR_DESTROY(xdrs);
629 633
630 634 badrpc_call:
631 635 RDMA_BUF_FREE(conn, &rdp->rpcmsg);
632 636 RDMA_REL_CONN(conn);
633 637 freeb(mp);
634 638 RSSTAT_INCR(rsbadcalls);
635 639 return (FALSE);
636 640 }
637 641
638 642 static int
639 643 svc_process_long_reply(SVCXPRT * clone_xprt,
640 644 xdrproc_t xdr_results, caddr_t xdr_location,
641 645 struct rpc_msg *msg, bool_t has_args, int *msglen,
642 646 int *freelen, int *numchunks, unsigned int *final_len)
643 647 {
644 648 int status;
645 649 XDR xdrslong;
646 650 struct clist *wcl = NULL;
647 651 int count = 0;
648 652 int alloc_len;
649 653 char *memp;
650 654 rdma_buf_t long_rpc = {0};
651 655 struct clone_rdma_data *crdp;
652 656
653 657 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
654 658
655 659 bzero(&xdrslong, sizeof (xdrslong));
656 660
657 661 /* Choose a size for the long rpc response */
658 662 if (MSG_IS_RPCSEC_GSS(msg)) {
659 663 alloc_len = RNDUP(MAX_AUTH_BYTES + *msglen);
660 664 } else {
661 665 alloc_len = RNDUP(*msglen);
662 666 }
663 667
664 668 if (alloc_len <= 64 * 1024) {
665 669 if (alloc_len > 32 * 1024) {
666 670 alloc_len = 64 * 1024;
667 671 } else {
668 672 if (alloc_len > 16 * 1024) {
669 673 alloc_len = 32 * 1024;
670 674 } else {
671 675 alloc_len = 16 * 1024;
672 676 }
673 677 }
674 678 }
675 679
676 680 long_rpc.type = RDMA_LONG_BUFFER;
677 681 long_rpc.len = alloc_len;
678 682 if (rdma_buf_alloc(crdp->conn, &long_rpc)) {
679 683 return (SVC_RDMA_FAIL);
680 684 }
681 685
682 686 memp = long_rpc.addr;
683 687 xdrmem_create(&xdrslong, memp, alloc_len, XDR_ENCODE);
684 688
685 689 msg->rm_xid = clone_xprt->xp_xid;
686 690
687 691 if (!(xdr_replymsg(&xdrslong, msg) &&
688 692 (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, &xdrslong,
689 693 xdr_results, xdr_location)))) {
690 694 rdma_buf_free(crdp->conn, &long_rpc);
691 695 DTRACE_PROBE(krpc__e__svcrdma__longrep__authwrap);
692 696 return (SVC_RDMA_FAIL);
693 697 }
694 698
695 699 *final_len = XDR_GETPOS(&xdrslong);
696 700
697 701 DTRACE_PROBE1(krpc__i__replylen, uint_t, *final_len);
698 702 *numchunks = 0;
699 703 *freelen = 0;
700 704
701 705 wcl = crdp->cl_reply;
702 706 wcl->rb_longbuf = long_rpc;
703 707
704 708 count = *final_len;
705 709 while ((wcl != NULL) && (count > 0)) {
706 710
707 711 if (wcl->c_dmemhandle.mrc_rmr == 0)
708 712 break;
709 713
710 714 DTRACE_PROBE2(krpc__i__write__chunks, uint32_t, count,
711 715 uint32_t, wcl->c_len);
712 716
713 717 if (wcl->c_len > count) {
714 718 wcl->c_len = count;
715 719 }
716 720 wcl->w.c_saddr3 = (caddr_t)memp;
717 721
718 722 count -= wcl->c_len;
719 723 *numchunks += 1;
720 724 memp += wcl->c_len;
721 725 wcl = wcl->c_next;
722 726 }
723 727
724 728 /*
725 729 * Make rest of the chunks 0-len
726 730 */
727 731 while (wcl != NULL) {
728 732 if (wcl->c_dmemhandle.mrc_rmr == 0)
729 733 break;
730 734 wcl->c_len = 0;
731 735 wcl = wcl->c_next;
732 736 }
733 737
734 738 wcl = crdp->cl_reply;
735 739
736 740 /*
737 741 * MUST fail if there are still more data
738 742 */
739 743 if (count > 0) {
740 744 rdma_buf_free(crdp->conn, &long_rpc);
741 745 DTRACE_PROBE(krpc__e__svcrdma__longrep__dlen__clist);
742 746 return (SVC_RDMA_FAIL);
743 747 }
744 748
745 749 if (clist_register(crdp->conn, wcl, CLIST_REG_SOURCE) != RDMA_SUCCESS) {
746 750 rdma_buf_free(crdp->conn, &long_rpc);
747 751 DTRACE_PROBE(krpc__e__svcrdma__longrep__clistreg);
748 752 return (SVC_RDMA_FAIL);
749 753 }
750 754
751 755 status = clist_syncmem(crdp->conn, wcl, CLIST_REG_SOURCE);
752 756
753 757 if (status) {
754 758 (void) clist_deregister(crdp->conn, wcl);
755 759 rdma_buf_free(crdp->conn, &long_rpc);
756 760 DTRACE_PROBE(krpc__e__svcrdma__longrep__syncmem);
757 761 return (SVC_RDMA_FAIL);
758 762 }
759 763
760 764 status = RDMA_WRITE(crdp->conn, wcl, WAIT);
761 765
762 766 (void) clist_deregister(crdp->conn, wcl);
763 767 rdma_buf_free(crdp->conn, &wcl->rb_longbuf);
764 768
765 769 if (status != RDMA_SUCCESS) {
766 770 DTRACE_PROBE(krpc__e__svcrdma__longrep__write);
767 771 return (SVC_RDMA_FAIL);
768 772 }
769 773
770 774 return (SVC_RDMA_SUCCESS);
771 775 }
772 776
773 777
774 778 static int
775 779 svc_compose_rpcmsg(SVCXPRT * clone_xprt, CONN * conn, xdrproc_t xdr_results,
776 780 caddr_t xdr_location, rdma_buf_t *rpcreply, XDR ** xdrs,
777 781 struct rpc_msg *msg, bool_t has_args, uint_t *len)
778 782 {
779 783 /*
780 784 * Get a pre-allocated buffer for rpc reply
781 785 */
782 786 rpcreply->type = SEND_BUFFER;
783 787 if (rdma_buf_alloc(conn, rpcreply)) {
784 788 DTRACE_PROBE(krpc__e__svcrdma__rpcmsg__reply__nofreebufs);
785 789 return (SVC_RDMA_FAIL);
786 790 }
787 791
788 792 xdrrdma_create(*xdrs, rpcreply->addr, rpcreply->len,
789 793 0, NULL, XDR_ENCODE, conn);
790 794
791 795 msg->rm_xid = clone_xprt->xp_xid;
792 796
793 797 if (has_args) {
794 798 if (!(xdr_replymsg(*xdrs, msg) &&
795 799 (!has_args ||
796 800 SVCAUTH_WRAP(&clone_xprt->xp_auth, *xdrs,
797 801 xdr_results, xdr_location)))) {
798 802 rdma_buf_free(conn, rpcreply);
799 803 DTRACE_PROBE(
800 804 krpc__e__svcrdma__rpcmsg__reply__authwrap1);
801 805 return (SVC_RDMA_FAIL);
802 806 }
803 807 } else {
804 808 if (!xdr_replymsg(*xdrs, msg)) {
805 809 rdma_buf_free(conn, rpcreply);
806 810 DTRACE_PROBE(
807 811 krpc__e__svcrdma__rpcmsg__reply__authwrap2);
808 812 return (SVC_RDMA_FAIL);
809 813 }
810 814 }
811 815
812 816 *len = XDR_GETPOS(*xdrs);
813 817
814 818 return (SVC_RDMA_SUCCESS);
815 819 }
816 820
817 821 /*
818 822 * Send rpc reply.
819 823 */
820 824 static bool_t
821 825 svc_rdma_ksend(SVCXPRT * clone_xprt, struct rpc_msg *msg)
822 826 {
823 827 XDR *xdrs_rpc = &(clone_xprt->xp_xdrout);
824 828 XDR xdrs_rhdr;
825 829 CONN *conn = NULL;
826 830 rdma_buf_t rbuf_resp = {0}, rbuf_rpc_resp = {0};
827 831
828 832 struct clone_rdma_data *crdp;
829 833 struct clist *cl_read = NULL;
830 834 struct clist *cl_send = NULL;
831 835 struct clist *cl_write = NULL;
832 836 xdrproc_t xdr_results; /* results XDR encoding function */
833 837 caddr_t xdr_location; /* response results pointer */
834 838
835 839 int retval = FALSE;
836 840 int status, msglen, num_wreply_segments = 0;
837 841 uint32_t rdma_credit = 0;
838 842 int freelen = 0;
839 843 bool_t has_args;
840 844 uint_t final_resp_len, rdma_response_op, vers;
841 845
842 846 bzero(&xdrs_rhdr, sizeof (XDR));
843 847 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
844 848 conn = crdp->conn;
845 849
846 850 /*
847 851 * If there is a result procedure specified in the reply message,
848 852 * it will be processed in the xdr_replymsg and SVCAUTH_WRAP.
849 853 * We need to make sure it won't be processed twice, so we null
850 854 * it for xdr_replymsg here.
851 855 */
852 856 has_args = FALSE;
853 857 if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
854 858 msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
855 859 if ((xdr_results = msg->acpted_rply.ar_results.proc) != NULL) {
856 860 has_args = TRUE;
857 861 xdr_location = msg->acpted_rply.ar_results.where;
858 862 msg->acpted_rply.ar_results.proc = xdr_void;
859 863 msg->acpted_rply.ar_results.where = NULL;
860 864 }
861 865 }
862 866
863 867 /*
864 868 * Given the limit on the inline response size (RPC_MSG_SZ),
865 869 * there is a need to make a guess as to the overall size of
866 870 * the response. If the resultant size is beyond the inline
867 871 * size, then the server needs to use the "reply chunk list"
868 872 * provided by the client (if the client provided one). An
869 873 * example of this type of response would be a READDIR
870 874 * response (e.g. a small directory read would fit in RPC_MSG_SZ
871 875 * and that is the preference but it may not fit)
872 876 *
873 877 * Combine the encoded size and the size of the true results
874 878 * and then make the decision about where to encode and send results.
875 879 *
876 880 * One important note, this calculation is ignoring the size
877 881 * of the encoding of the authentication overhead. The reason
878 882 * for this is rooted in the complexities of access to the
879 883 * encoded size of RPCSEC_GSS related authentiation,
880 884 * integrity, and privacy.
881 885 *
882 886 * If it turns out that the encoded authentication bumps the
883 887 * response over the RPC_MSG_SZ limit, then it may need to
884 888 * attempt to encode for the reply chunk list.
885 889 */
886 890
887 891 /*
888 892 * Calculating the "sizeof" the RPC response header and the
889 893 * encoded results.
890 894 */
891 895 msglen = xdr_sizeof(xdr_replymsg, msg);
892 896
893 897 if (msglen > 0) {
894 898 RSSTAT_INCR(rstotalreplies);
895 899 }
896 900 if (has_args)
897 901 msglen += xdrrdma_sizeof(xdr_results, xdr_location,
898 902 rdma_minchunk, NULL, NULL);
899 903
900 904 DTRACE_PROBE1(krpc__i__svcrdma__ksend__msglen, int, msglen);
901 905
902 906 status = SVC_RDMA_SUCCESS;
903 907
904 908 if (msglen < RPC_MSG_SZ) {
905 909 /*
906 910 * Looks like the response will fit in the inline
907 911 * response; let's try
908 912 */
909 913 RSSTAT_INCR(rstotalinlinereplies);
910 914
911 915 rdma_response_op = RDMA_MSG;
912 916
913 917 status = svc_compose_rpcmsg(clone_xprt, conn, xdr_results,
914 918 xdr_location, &rbuf_rpc_resp, &xdrs_rpc, msg,
915 919 has_args, &final_resp_len);
916 920
917 921 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_status,
918 922 int, status);
919 923 DTRACE_PROBE1(krpc__i__srdma__ksend__compose_len,
920 924 int, final_resp_len);
921 925
922 926 if (status == SVC_RDMA_SUCCESS && crdp->cl_reply) {
923 927 clist_free(crdp->cl_reply);
924 928 crdp->cl_reply = NULL;
925 929 }
926 930 }
927 931
928 932 /*
929 933 * If the encode failed (size?) or the message really is
930 934 * larger than what is allowed, try the response chunk list.
931 935 */
932 936 if (status != SVC_RDMA_SUCCESS || msglen >= RPC_MSG_SZ) {
933 937 /*
934 938 * attempting to use a reply chunk list when there
935 939 * isn't one won't get very far...
936 940 */
937 941 if (crdp->cl_reply == NULL) {
938 942 DTRACE_PROBE(krpc__e__svcrdma__ksend__noreplycl);
939 943 goto out;
940 944 }
941 945
942 946 RSSTAT_INCR(rstotallongreplies);
943 947
944 948 msglen = xdr_sizeof(xdr_replymsg, msg);
945 949 msglen += xdrrdma_sizeof(xdr_results, xdr_location, 0,
946 950 NULL, NULL);
947 951
948 952 status = svc_process_long_reply(clone_xprt, xdr_results,
949 953 xdr_location, msg, has_args, &msglen, &freelen,
950 954 &num_wreply_segments, &final_resp_len);
951 955
952 956 DTRACE_PROBE1(krpc__i__svcrdma__ksend__longreplen,
953 957 int, final_resp_len);
954 958
955 959 if (status != SVC_RDMA_SUCCESS) {
956 960 DTRACE_PROBE(krpc__e__svcrdma__ksend__compose__failed);
957 961 goto out;
958 962 }
959 963
960 964 rdma_response_op = RDMA_NOMSG;
961 965 }
962 966
963 967 DTRACE_PROBE1(krpc__i__svcrdma__ksend__rdmamsg__len,
964 968 int, final_resp_len);
965 969
966 970 rbuf_resp.type = SEND_BUFFER;
967 971 if (rdma_buf_alloc(conn, &rbuf_resp)) {
968 972 rdma_buf_free(conn, &rbuf_rpc_resp);
969 973 DTRACE_PROBE(krpc__e__svcrdma__ksend__nofreebufs);
970 974 goto out;
971 975 }
972 976
973 977 rdma_credit = rdma_bufs_granted;
974 978
975 979 vers = RPCRDMA_VERS;
976 980 xdrmem_create(&xdrs_rhdr, rbuf_resp.addr, rbuf_resp.len, XDR_ENCODE);
977 981 (*(uint32_t *)rbuf_resp.addr) = msg->rm_xid;
978 982 /* Skip xid and set the xdr position accordingly. */
979 983 XDR_SETPOS(&xdrs_rhdr, sizeof (uint32_t));
980 984 if (!xdr_u_int(&xdrs_rhdr, &vers) ||
981 985 !xdr_u_int(&xdrs_rhdr, &rdma_credit) ||
982 986 !xdr_u_int(&xdrs_rhdr, &rdma_response_op)) {
983 987 rdma_buf_free(conn, &rbuf_rpc_resp);
984 988 rdma_buf_free(conn, &rbuf_resp);
985 989 DTRACE_PROBE(krpc__e__svcrdma__ksend__uint);
986 990 goto out;
987 991 }
988 992
989 993 /*
990 994 * Now XDR the read chunk list, actually always NULL
991 995 */
992 996 (void) xdr_encode_rlist_svc(&xdrs_rhdr, cl_read);
993 997
994 998 /*
995 999 * encode write list -- we already drove RDMA_WRITEs
996 1000 */
997 1001 cl_write = crdp->cl_wlist;
998 1002 if (!xdr_encode_wlist(&xdrs_rhdr, cl_write)) {
999 1003 DTRACE_PROBE(krpc__e__svcrdma__ksend__enc__wlist);
1000 1004 rdma_buf_free(conn, &rbuf_rpc_resp);
1001 1005 rdma_buf_free(conn, &rbuf_resp);
1002 1006 goto out;
1003 1007 }
1004 1008
1005 1009 /*
1006 1010 * XDR encode the RDMA_REPLY write chunk
1007 1011 */
1008 1012 if (!xdr_encode_reply_wchunk(&xdrs_rhdr, crdp->cl_reply,
1009 1013 num_wreply_segments)) {
1010 1014 rdma_buf_free(conn, &rbuf_rpc_resp);
1011 1015 rdma_buf_free(conn, &rbuf_resp);
1012 1016 goto out;
1013 1017 }
1014 1018
1015 1019 clist_add(&cl_send, 0, XDR_GETPOS(&xdrs_rhdr), &rbuf_resp.handle,
1016 1020 rbuf_resp.addr, NULL, NULL);
1017 1021
1018 1022 if (rdma_response_op == RDMA_MSG) {
1019 1023 clist_add(&cl_send, 0, final_resp_len, &rbuf_rpc_resp.handle,
1020 1024 rbuf_rpc_resp.addr, NULL, NULL);
1021 1025 }
1022 1026
1023 1027 status = RDMA_SEND(conn, cl_send, msg->rm_xid);
1024 1028
1025 1029 if (status == RDMA_SUCCESS) {
1026 1030 retval = TRUE;
1027 1031 }
1028 1032
1029 1033 out:
1030 1034 /*
1031 1035 * Free up sendlist chunks
1032 1036 */
1033 1037 if (cl_send != NULL)
1034 1038 clist_free(cl_send);
1035 1039
1036 1040 /*
1037 1041 * Destroy private data for xdr rdma
1038 1042 */
1039 1043 if (clone_xprt->xp_xdrout.x_ops != NULL) {
1040 1044 XDR_DESTROY(&(clone_xprt->xp_xdrout));
1041 1045 }
1042 1046
1043 1047 if (crdp->cl_reply) {
1044 1048 clist_free(crdp->cl_reply);
1045 1049 crdp->cl_reply = NULL;
1046 1050 }
1047 1051
1048 1052 /*
1049 1053 * This is completely disgusting. If public is set it is
1050 1054 * a pointer to a structure whose first field is the address
1051 1055 * of the function to free that structure and any related
1052 1056 * stuff. (see rrokfree in nfs_xdr.c).
1053 1057 */
1054 1058 if (xdrs_rpc->x_public) {
1055 1059 /* LINTED pointer alignment */
1056 1060 (**((int (**)()) xdrs_rpc->x_public)) (xdrs_rpc->x_public);
1057 1061 }
1058 1062
1059 1063 if (xdrs_rhdr.x_ops != NULL) {
1060 1064 XDR_DESTROY(&xdrs_rhdr);
1061 1065 }
1062 1066
1063 1067 return (retval);
1064 1068 }
1065 1069
1066 1070 /*
1067 1071 * Deserialize arguments.
1068 1072 */
1069 1073 static bool_t
1070 1074 svc_rdma_kgetargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, caddr_t args_ptr)
1071 1075 {
1072 1076 if ((SVCAUTH_UNWRAP(&clone_xprt->xp_auth, &clone_xprt->xp_xdrin,
1073 1077 xdr_args, args_ptr)) != TRUE)
1074 1078 return (FALSE);
1075 1079 return (TRUE);
1076 1080 }
1077 1081
1078 1082 static bool_t
1079 1083 svc_rdma_kfreeargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args,
1080 1084 caddr_t args_ptr)
1081 1085 {
1082 1086 struct clone_rdma_data *crdp;
1083 1087 bool_t retval;
1084 1088
1085 1089 /*
1086 1090 * If the cloned bit is true, then this transport specific
1087 1091 * rmda data has been duplicated into another cloned xprt. Do
1088 1092 * not free, or release the connection, it is still in use. The
1089 1093 * buffers will be freed and the connection released later by
1090 1094 * SVC_CLONE_DESTROY().
1091 1095 */
1092 1096 crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
1093 1097 if (crdp->cloned == TRUE) {
1094 1098 crdp->cloned = 0;
1095 1099 return (TRUE);
1096 1100 }
1097 1101
1098 1102 /*
1099 1103 * Free the args if needed then XDR_DESTROY
1100 1104 */
1101 1105 if (args_ptr) {
1102 1106 XDR *xdrs = &clone_xprt->xp_xdrin;
1103 1107
1104 1108 xdrs->x_op = XDR_FREE;
1105 1109 retval = (*xdr_args)(xdrs, args_ptr);
1106 1110 }
1107 1111
1108 1112 XDR_DESTROY(&(clone_xprt->xp_xdrin));
1109 1113 rdma_buf_free(crdp->conn, &crdp->rpcbuf);
1110 1114 if (crdp->cl_reply) {
1111 1115 clist_free(crdp->cl_reply);
1112 1116 crdp->cl_reply = NULL;
1113 1117 }
1114 1118 RDMA_REL_CONN(crdp->conn);
1115 1119
1116 1120 return (retval);
1117 1121 }
1118 1122
1119 1123 /* ARGSUSED */
1120 1124 static int32_t *
1121 1125 svc_rdma_kgetres(SVCXPRT *clone_xprt, int size)
1122 1126 {
1123 1127 return (NULL);
1124 1128 }
1125 1129
1126 1130 /* ARGSUSED */
1127 1131 static void
1128 1132 svc_rdma_kfreeres(SVCXPRT *clone_xprt)
1129 1133 {
1130 1134 }
1131 1135
1132 1136 /*
1133 1137 * the dup cacheing routines below provide a cache of non-failure
1134 1138 * transaction id's. rpc service routines can use this to detect
1135 1139 * retransmissions and re-send a non-failure response.
1136 1140 */
1137 1141
1138 1142 /*
1139 1143 * MAXDUPREQS is the number of cached items. It should be adjusted
1140 1144 * to the service load so that there is likely to be a response entry
1141 1145 * when the first retransmission comes in.
1142 1146 */
1143 1147 #define MAXDUPREQS 8192
1144 1148
1145 1149 /*
1146 1150 * This should be appropriately scaled to MAXDUPREQS. To produce as less as
1147 1151 * possible collisions it is suggested to set this to a prime.
1148 1152 */
1149 1153 #define DRHASHSZ 2053
1150 1154
1151 1155 #define XIDHASH(xid) ((xid) % DRHASHSZ)
1152 1156 #define DRHASH(dr) XIDHASH((dr)->dr_xid)
1153 1157 #define REQTOXID(req) ((req)->rq_xprt->xp_xid)
1154 1158
1155 1159 static int rdmandupreqs = 0;
1156 1160 int rdmamaxdupreqs = MAXDUPREQS;
1157 1161 static kmutex_t rdmadupreq_lock;
1158 1162 static struct dupreq *rdmadrhashtbl[DRHASHSZ];
1159 1163 static int rdmadrhashstat[DRHASHSZ];
1160 1164
1161 1165 static void unhash(struct dupreq *);
1162 1166
1163 1167 /*
1164 1168 * rdmadrmru points to the head of a circular linked list in lru order.
1165 1169 * rdmadrmru->dr_next == drlru
1166 1170 */
1167 1171 struct dupreq *rdmadrmru;
1168 1172
1169 1173 /*
1170 1174 * svc_rdma_kdup searches the request cache and returns 0 if the
1171 1175 * request is not found in the cache. If it is found, then it
1172 1176 * returns the state of the request (in progress or done) and
1173 1177 * the status or attributes that were part of the original reply.
1174 1178 */
1175 1179 static int
1176 1180 svc_rdma_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp,
1177 1181 bool_t *dupcachedp)
1178 1182 {
1179 1183 struct dupreq *dr;
1180 1184 uint32_t xid;
1181 1185 uint32_t drhash;
1182 1186 int status;
1183 1187
1184 1188 xid = REQTOXID(req);
1185 1189 mutex_enter(&rdmadupreq_lock);
1186 1190 RSSTAT_INCR(rsdupchecks);
1187 1191 /*
1188 1192 * Check to see whether an entry already exists in the cache.
1189 1193 */
1190 1194 dr = rdmadrhashtbl[XIDHASH(xid)];
1191 1195 while (dr != NULL) {
1192 1196 if (dr->dr_xid == xid &&
1193 1197 dr->dr_proc == req->rq_proc &&
1194 1198 dr->dr_prog == req->rq_prog &&
1195 1199 dr->dr_vers == req->rq_vers &&
1196 1200 dr->dr_addr.len == req->rq_xprt->xp_rtaddr.len &&
1197 1201 bcmp((caddr_t)dr->dr_addr.buf,
1198 1202 (caddr_t)req->rq_xprt->xp_rtaddr.buf,
1199 1203 dr->dr_addr.len) == 0) {
1200 1204 status = dr->dr_status;
1201 1205 if (status == DUP_DONE) {
1202 1206 bcopy(dr->dr_resp.buf, res, size);
1203 1207 if (dupcachedp != NULL)
1204 1208 *dupcachedp = (dr->dr_resfree != NULL);
1205 1209 } else {
1206 1210 dr->dr_status = DUP_INPROGRESS;
1207 1211 *drpp = dr;
1208 1212 }
1209 1213 RSSTAT_INCR(rsdupreqs);
1210 1214 mutex_exit(&rdmadupreq_lock);
1211 1215 return (status);
1212 1216 }
1213 1217 dr = dr->dr_chain;
1214 1218 }
1215 1219
1216 1220 /*
1217 1221 * There wasn't an entry, either allocate a new one or recycle
1218 1222 * an old one.
1219 1223 */
1220 1224 if (rdmandupreqs < rdmamaxdupreqs) {
1221 1225 dr = kmem_alloc(sizeof (*dr), KM_NOSLEEP);
1222 1226 if (dr == NULL) {
1223 1227 mutex_exit(&rdmadupreq_lock);
1224 1228 return (DUP_ERROR);
1225 1229 }
1226 1230 dr->dr_resp.buf = NULL;
1227 1231 dr->dr_resp.maxlen = 0;
1228 1232 dr->dr_addr.buf = NULL;
1229 1233 dr->dr_addr.maxlen = 0;
1230 1234 if (rdmadrmru) {
1231 1235 dr->dr_next = rdmadrmru->dr_next;
1232 1236 rdmadrmru->dr_next = dr;
1233 1237 } else {
1234 1238 dr->dr_next = dr;
1235 1239 }
1236 1240 rdmandupreqs++;
1237 1241 } else {
1238 1242 dr = rdmadrmru->dr_next;
1239 1243 while (dr->dr_status == DUP_INPROGRESS) {
1240 1244 dr = dr->dr_next;
1241 1245 if (dr == rdmadrmru->dr_next) {
1242 1246 mutex_exit(&rdmadupreq_lock);
1243 1247 return (DUP_ERROR);
1244 1248 }
1245 1249 }
1246 1250 unhash(dr);
1247 1251 if (dr->dr_resfree) {
1248 1252 (*dr->dr_resfree)(dr->dr_resp.buf);
1249 1253 }
1250 1254 }
1251 1255 dr->dr_resfree = NULL;
1252 1256 rdmadrmru = dr;
1253 1257
1254 1258 dr->dr_xid = REQTOXID(req);
1255 1259 dr->dr_prog = req->rq_prog;
1256 1260 dr->dr_vers = req->rq_vers;
1257 1261 dr->dr_proc = req->rq_proc;
1258 1262 if (dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) {
1259 1263 if (dr->dr_addr.buf != NULL)
1260 1264 kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen);
1261 1265 dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len;
1262 1266 dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP);
1263 1267 if (dr->dr_addr.buf == NULL) {
1264 1268 dr->dr_addr.maxlen = 0;
1265 1269 dr->dr_status = DUP_DROP;
1266 1270 mutex_exit(&rdmadupreq_lock);
1267 1271 return (DUP_ERROR);
1268 1272 }
1269 1273 }
1270 1274 dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len;
1271 1275 bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf, dr->dr_addr.len);
1272 1276 if (dr->dr_resp.maxlen < size) {
1273 1277 if (dr->dr_resp.buf != NULL)
1274 1278 kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen);
1275 1279 dr->dr_resp.maxlen = (unsigned int)size;
1276 1280 dr->dr_resp.buf = kmem_alloc(size, KM_NOSLEEP);
1277 1281 if (dr->dr_resp.buf == NULL) {
1278 1282 dr->dr_resp.maxlen = 0;
1279 1283 dr->dr_status = DUP_DROP;
1280 1284 mutex_exit(&rdmadupreq_lock);
1281 1285 return (DUP_ERROR);
1282 1286 }
1283 1287 }
1284 1288 dr->dr_status = DUP_INPROGRESS;
1285 1289
1286 1290 drhash = (uint32_t)DRHASH(dr);
1287 1291 dr->dr_chain = rdmadrhashtbl[drhash];
1288 1292 rdmadrhashtbl[drhash] = dr;
1289 1293 rdmadrhashstat[drhash]++;
1290 1294 mutex_exit(&rdmadupreq_lock);
1291 1295 *drpp = dr;
1292 1296 return (DUP_NEW);
1293 1297 }
1294 1298
1295 1299 /*
1296 1300 * svc_rdma_kdupdone marks the request done (DUP_DONE or DUP_DROP)
1297 1301 * and stores the response.
1298 1302 */
1299 1303 static void
1300 1304 svc_rdma_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(),
1301 1305 int size, int status)
1302 1306 {
1303 1307 ASSERT(dr->dr_resfree == NULL);
1304 1308 if (status == DUP_DONE) {
1305 1309 bcopy(res, dr->dr_resp.buf, size);
1306 1310 dr->dr_resfree = dis_resfree;
1307 1311 }
1308 1312 dr->dr_status = status;
1309 1313 }
1310 1314
1311 1315 /*
1312 1316 * This routine expects that the mutex, rdmadupreq_lock, is already held.
1313 1317 */
1314 1318 static void
1315 1319 unhash(struct dupreq *dr)
1316 1320 {
1317 1321 struct dupreq *drt;
1318 1322 struct dupreq *drtprev = NULL;
1319 1323 uint32_t drhash;
1320 1324
1321 1325 ASSERT(MUTEX_HELD(&rdmadupreq_lock));
1322 1326
1323 1327 drhash = (uint32_t)DRHASH(dr);
1324 1328 drt = rdmadrhashtbl[drhash];
1325 1329 while (drt != NULL) {
1326 1330 if (drt == dr) {
1327 1331 rdmadrhashstat[drhash]--;
1328 1332 if (drtprev == NULL) {
1329 1333 rdmadrhashtbl[drhash] = drt->dr_chain;
1330 1334 } else {
1331 1335 drtprev->dr_chain = drt->dr_chain;
1332 1336 }
1333 1337 return;
1334 1338 }
1335 1339 drtprev = drt;
1336 1340 drt = drt->dr_chain;
1337 1341 }
1338 1342 }
1339 1343
1340 1344 bool_t
1341 1345 rdma_get_wchunk(struct svc_req *req, iovec_t *iov, struct clist *wlist)
1342 1346 {
1343 1347 struct clist *clist;
1344 1348 uint32_t tlen;
1345 1349
1346 1350 if (req->rq_xprt->xp_type != T_RDMA) {
1347 1351 return (FALSE);
1348 1352 }
1349 1353
1350 1354 tlen = 0;
1351 1355 clist = wlist;
1352 1356 while (clist) {
1353 1357 tlen += clist->c_len;
1354 1358 clist = clist->c_next;
1355 1359 }
1356 1360
1357 1361 /*
1358 1362 * set iov to addr+len of first segment of first wchunk of
1359 1363 * wlist sent by client. krecv() already malloc'd a buffer
1360 1364 * large enough, but registration is deferred until we write
1361 1365 * the buffer back to (NFS) client using RDMA_WRITE.
1362 1366 */
1363 1367 iov->iov_base = (caddr_t)(uintptr_t)wlist->w.c_saddr;
1364 1368 iov->iov_len = tlen;
1365 1369
1366 1370 return (TRUE);
1367 1371 }
1368 1372
1369 1373 /*
1370 1374 * routine to setup the read chunk lists
1371 1375 */
1372 1376
1373 1377 int
1374 1378 rdma_setup_read_chunks(struct clist *wcl, uint32_t count, int *wcl_len)
1375 1379 {
1376 1380 int data_len, avail_len;
1377 1381 uint_t round_len;
1378 1382
1379 1383 data_len = avail_len = 0;
1380 1384
1381 1385 while (wcl != NULL && count > 0) {
1382 1386 if (wcl->c_dmemhandle.mrc_rmr == 0)
1383 1387 break;
1384 1388
1385 1389 if (wcl->c_len < count) {
1386 1390 data_len += wcl->c_len;
1387 1391 avail_len = 0;
1388 1392 } else {
1389 1393 data_len += count;
1390 1394 avail_len = wcl->c_len - count;
1391 1395 wcl->c_len = count;
1392 1396 }
1393 1397 count -= wcl->c_len;
1394 1398
1395 1399 if (count == 0)
1396 1400 break;
1397 1401
1398 1402 wcl = wcl->c_next;
1399 1403 }
1400 1404
1401 1405 /*
1402 1406 * MUST fail if there are still more data
1403 1407 */
1404 1408 if (count > 0) {
1405 1409 DTRACE_PROBE2(krpc__e__rdma_setup_read_chunks_clist_len,
1406 1410 int, data_len, int, count);
1407 1411 return (FALSE);
1408 1412 }
1409 1413
1410 1414 /*
1411 1415 * Round up the last chunk to 4-byte boundary
1412 1416 */
1413 1417 *wcl_len = roundup(data_len, BYTES_PER_XDR_UNIT);
1414 1418 round_len = *wcl_len - data_len;
1415 1419
1416 1420 if (round_len) {
1417 1421
1418 1422 /*
1419 1423 * If there is space in the current chunk,
1420 1424 * add the roundup to the chunk.
1421 1425 */
1422 1426 if (avail_len >= round_len) {
1423 1427 wcl->c_len += round_len;
1424 1428 } else {
1425 1429 /*
1426 1430 * try the next one.
1427 1431 */
1428 1432 wcl = wcl->c_next;
1429 1433 if ((wcl == NULL) || (wcl->c_len < round_len)) {
1430 1434 DTRACE_PROBE1(
1431 1435 krpc__e__rdma_setup_read_chunks_rndup,
1432 1436 int, round_len);
1433 1437 return (FALSE);
1434 1438 }
1435 1439 wcl->c_len = round_len;
1436 1440 }
1437 1441 }
1438 1442
1439 1443 wcl = wcl->c_next;
1440 1444
1441 1445 /*
1442 1446 * Make rest of the chunks 0-len
1443 1447 */
1444 1448
1445 1449 clist_zero_len(wcl);
1446 1450
1447 1451 return (TRUE);
1448 1452 }
|
↓ open down ↓ |
1287 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX