Print this page
    
3354 kernel crash in rpcsec_gss after using gsscred
Reviewed by: Toomas Soome <tsoome@me.com>
Reviewed by: Carlos Neira <cneirabustos@gmail.com>
Approved by: Robert Mustacchi <rm@joyent.com>
re #12783 rb4338 Flow control is needed in rpcmod when the NFS server is unable to keep up with the network
    
      
        | Split | 
	Close | 
      
      | Expand all | 
      | Collapse all | 
    
    
          --- old/usr/src/uts/common/rpc/svc_rdma.c
          +++ new/usr/src/uts/common/rpc/svc_rdma.c
   1    1  /*
   2    2   * CDDL HEADER START
   3    3   *
   4    4   * The contents of this file are subject to the terms of the
   5    5   * Common Development and Distribution License (the "License").
   6    6   * You may not use this file except in compliance with the License.
   7    7   *
   8    8   * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9    9   * or http://www.opensolaris.org/os/licensing.
  10   10   * See the License for the specific language governing permissions
  11   11   * and limitations under the License.
  12   12   *
  13   13   * When distributing Covered Code, include this CDDL HEADER in each
  14   14   * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  
    | 
      ↓ open down ↓ | 
    14 lines elided | 
    
      ↑ open up ↑ | 
  
  15   15   * If applicable, add the following below this CDDL HEADER, with the
  16   16   * fields enclosed by brackets "[]" replaced with your own identifying
  17   17   * information: Portions Copyright [yyyy] [name of copyright owner]
  18   18   *
  19   19   * CDDL HEADER END
  20   20   */
  21   21  /*
  22   22   * Copyright (c) 1983, 2010, Oracle and/or its affiliates. All rights reserved.
  23   23   * Copyright (c) 2012 by Delphix. All rights reserved.
  24   24   * Copyright 2013 Nexenta Systems, Inc.  All rights reserved.
       25 + * Copyright 2012 Marcel Telka <marcel@telka.sk>
       26 + * Copyright 2018 OmniOS Community Edition (OmniOSce) Association.
  25   27   */
  26   28  /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
  27   29  /* All Rights Reserved */
  28   30  /*
  29   31   * Portions of this source code were derived from Berkeley
  30   32   * 4.3 BSD under license from the Regents of the University of
  31   33   * California.
  32   34   */
  33   35  
  34   36  /*
  35   37   * Server side of RPC over RDMA in the kernel.
  36   38   */
  37   39  
  38   40  #include <sys/param.h>
  39   41  #include <sys/types.h>
  40   42  #include <sys/user.h>
  41   43  #include <sys/sysmacros.h>
  42   44  #include <sys/proc.h>
  43   45  #include <sys/file.h>
  44   46  #include <sys/errno.h>
  45   47  #include <sys/kmem.h>
  46   48  #include <sys/debug.h>
  47   49  #include <sys/systm.h>
  48   50  #include <sys/cmn_err.h>
  49   51  #include <sys/kstat.h>
  50   52  #include <sys/vtrace.h>
  51   53  #include <sys/debug.h>
  52   54  
  53   55  #include <rpc/types.h>
  54   56  #include <rpc/xdr.h>
  55   57  #include <rpc/auth.h>
  56   58  #include <rpc/clnt.h>
  57   59  #include <rpc/rpc_msg.h>
  58   60  #include <rpc/svc.h>
  59   61  #include <rpc/rpc_rdma.h>
  60   62  #include <sys/ddi.h>
  61   63  #include <sys/sunddi.h>
  62   64  
  63   65  #include <inet/common.h>
  64   66  #include <inet/ip.h>
  65   67  #include <inet/ip6.h>
  66   68  
  67   69  #include <nfs/nfs.h>
  68   70  #include <sys/sdt.h>
  69   71  
  70   72  #define SVC_RDMA_SUCCESS 0
  71   73  #define SVC_RDMA_FAIL -1
  72   74  
  73   75  #define SVC_CREDIT_FACTOR (0.5)
  74   76  
  
    | 
      ↓ open down ↓ | 
    40 lines elided | 
    
      ↑ open up ↑ | 
  
  75   77  #define MSG_IS_RPCSEC_GSS(msg)          \
  76   78          ((msg)->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS)
  77   79  
  78   80  
  79   81  uint32_t rdma_bufs_granted = RDMA_BUFS_GRANT;
  80   82  
  81   83  /*
  82   84   * RDMA transport specific data associated with SVCMASTERXPRT
  83   85   */
  84   86  struct rdma_data {
  85      -        SVCMASTERXPRT   *rd_xprt;       /* back ptr to SVCMASTERXPRT */
       87 +        SVCMASTERXPRT   *rd_xprt;       /* back ptr to SVCMASTERXPRT */
  86   88          struct rdma_svc_data rd_data;   /* rdma data */
  87   89          rdma_mod_t      *r_mod;         /* RDMA module containing ops ptr */
  88   90  };
  89   91  
  90   92  /*
  91   93   * Plugin connection specific data stashed away in clone SVCXPRT
  92   94   */
  93   95  struct clone_rdma_data {
  94   96          bool_t          cloned;         /* xprt cloned for thread processing */
  95   97          CONN            *conn;          /* RDMA connection */
  96   98          rdma_buf_t      rpcbuf;         /* RPC req/resp buffer */
  97   99          struct clist    *cl_reply;      /* reply chunk buffer info */
  98  100          struct clist    *cl_wlist;              /* write list clist */
  99  101  };
 100  102  
 101  103  
 102  104  #define MAXADDRLEN      128     /* max length for address mask */
 103  105  
 104  106  /*
 105  107   * Routines exported through ops vector.
 106  108   */
 107  109  static bool_t           svc_rdma_krecv(SVCXPRT *, mblk_t *, struct rpc_msg *);
 108  110  static bool_t           svc_rdma_ksend(SVCXPRT *, struct rpc_msg *);
 109  111  static bool_t           svc_rdma_kgetargs(SVCXPRT *, xdrproc_t, caddr_t);
 110  112  static bool_t           svc_rdma_kfreeargs(SVCXPRT *, xdrproc_t, caddr_t);
 111  113  void                    svc_rdma_kdestroy(SVCMASTERXPRT *);
 112  114  static int              svc_rdma_kdup(struct svc_req *, caddr_t, int,
 113  115                                  struct dupreq **, bool_t *);
 114  116  static void             svc_rdma_kdupdone(struct dupreq *, caddr_t,
 115  117                                  void (*)(), int, int);
 116  118  static int32_t          *svc_rdma_kgetres(SVCXPRT *, int);
 117  119  static void             svc_rdma_kfreeres(SVCXPRT *);
 118  120  static void             svc_rdma_kclone_destroy(SVCXPRT *);
 119  121  static void             svc_rdma_kstart(SVCMASTERXPRT *);
 120  122  void                    svc_rdma_kstop(SVCMASTERXPRT *);
 121  123  static void             svc_rdma_kclone_xprt(SVCXPRT *, SVCXPRT *);
 122  124  static void             svc_rdma_ktattrs(SVCXPRT *, int, void **);
 123  125  
 124  126  static int      svc_process_long_reply(SVCXPRT *, xdrproc_t,
 125  127                          caddr_t, struct rpc_msg *, bool_t, int *,
 126  128                          int *, int *, unsigned int *);
 127  129  
 128  130  static int      svc_compose_rpcmsg(SVCXPRT *, CONN *, xdrproc_t,
 129  131                          caddr_t, rdma_buf_t *, XDR **, struct rpc_msg *,
 130  132                          bool_t, uint_t *);
 131  133  static bool_t rpcmsg_length(xdrproc_t,
 132  134                  caddr_t,
 133  135                  struct rpc_msg *, bool_t, int);
 134  136  
 135  137  /*
 136  138   * Server transport operations vector.
 137  139   */
 138  140  struct svc_ops rdma_svc_ops = {
 139  141          svc_rdma_krecv,         /* Get requests */
 140  142          svc_rdma_kgetargs,      /* Deserialize arguments */
  
    | 
      ↓ open down ↓ | 
    45 lines elided | 
    
      ↑ open up ↑ | 
  
 141  143          svc_rdma_ksend,         /* Send reply */
 142  144          svc_rdma_kfreeargs,     /* Free argument data space */
 143  145          svc_rdma_kdestroy,      /* Destroy transport handle */
 144  146          svc_rdma_kdup,          /* Check entry in dup req cache */
 145  147          svc_rdma_kdupdone,      /* Mark entry in dup req cache as done */
 146  148          svc_rdma_kgetres,       /* Get pointer to response buffer */
 147  149          svc_rdma_kfreeres,      /* Destroy pre-serialized response header */
 148  150          svc_rdma_kclone_destroy,        /* Destroy a clone xprt */
 149  151          svc_rdma_kstart,        /* Tell `ready-to-receive' to rpcmod */
 150  152          svc_rdma_kclone_xprt,   /* Transport specific clone xprt */
 151      -        svc_rdma_ktattrs        /* Get Transport Attributes */
      153 +        svc_rdma_ktattrs,       /* Get Transport Attributes */
      154 +        NULL,                   /* Increment transport reference count */
      155 +        NULL                    /* Decrement transport reference count */
 152  156  };
 153  157  
 154  158  /*
 155  159   * Server statistics
 156  160   * NOTE: This structure type is duplicated in the NFS fast path.
 157  161   */
 158  162  struct {
 159  163          kstat_named_t   rscalls;
 160  164          kstat_named_t   rsbadcalls;
 161  165          kstat_named_t   rsnullrecv;
 162  166          kstat_named_t   rsbadlen;
 163  167          kstat_named_t   rsxdrcall;
 164  168          kstat_named_t   rsdupchecks;
 165  169          kstat_named_t   rsdupreqs;
 166  170          kstat_named_t   rslongrpcs;
 167  171          kstat_named_t   rstotalreplies;
 168  172          kstat_named_t   rstotallongreplies;
 169  173          kstat_named_t   rstotalinlinereplies;
 170  174  } rdmarsstat = {
 171  175          { "calls",      KSTAT_DATA_UINT64 },
 172  176          { "badcalls",   KSTAT_DATA_UINT64 },
 173  177          { "nullrecv",   KSTAT_DATA_UINT64 },
 174  178          { "badlen",     KSTAT_DATA_UINT64 },
 175  179          { "xdrcall",    KSTAT_DATA_UINT64 },
 176  180          { "dupchecks",  KSTAT_DATA_UINT64 },
 177  181          { "dupreqs",    KSTAT_DATA_UINT64 },
 178  182          { "longrpcs",   KSTAT_DATA_UINT64 },
 179  183          { "totalreplies",       KSTAT_DATA_UINT64 },
 180  184          { "totallongreplies",   KSTAT_DATA_UINT64 },
 181  185          { "totalinlinereplies", KSTAT_DATA_UINT64 },
 182  186  };
 183  187  
 184  188  kstat_named_t *rdmarsstat_ptr = (kstat_named_t *)&rdmarsstat;
 185  189  uint_t rdmarsstat_ndata = sizeof (rdmarsstat) / sizeof (kstat_named_t);
 186  190  
 187  191  #define RSSTAT_INCR(x)  atomic_inc_64(&rdmarsstat.x.value.ui64)
 188  192  /*
 189  193   * Create a transport record.
 190  194   * The transport record, output buffer, and private data structure
 191  195   * are allocated.  The output buffer is serialized into using xdrmem.
 192  196   * There is one transport record per user process which implements a
 193  197   * set of services.
 194  198   */
 195  199  /* ARGSUSED */
 196  200  int
 197  201  svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id,
 198  202      rdma_xprt_group_t *started_xprts)
 199  203  {
 200  204          int error;
 201  205          SVCMASTERXPRT *xprt;
 202  206          struct rdma_data *rd;
 203  207          rdma_registry_t *rmod;
 204  208          rdma_xprt_record_t *xprt_rec;
 205  209          queue_t *q;
 206  210          /*
 207  211           * modload the RDMA plugins is not already done.
 208  212           */
 209  213          if (!rdma_modloaded) {
 210  214                  /*CONSTANTCONDITION*/
 211  215                  ASSERT(sizeof (struct clone_rdma_data) <= SVC_P2LEN);
 212  216  
 213  217                  mutex_enter(&rdma_modload_lock);
 214  218                  if (!rdma_modloaded) {
 215  219                          error = rdma_modload();
 216  220                  }
 217  221                  mutex_exit(&rdma_modload_lock);
 218  222  
 219  223                  if (error)
 220  224                          return (error);
 221  225          }
 222  226  
 223  227          /*
 224  228           * master_xprt_count is the count of master transport handles
 225  229           * that were successfully created and are ready to recieve for
 226  230           * RDMA based access.
 227  231           */
 228  232          error = 0;
 229  233          xprt_rec = NULL;
 230  234          rw_enter(&rdma_lock, RW_READER);
 231  235          if (rdma_mod_head == NULL) {
 232  236                  started_xprts->rtg_count = 0;
 233  237                  rw_exit(&rdma_lock);
 234  238                  if (rdma_dev_available)
 235  239                          return (EPROTONOSUPPORT);
 236  240                  else
 237  241                          return (ENODEV);
 238  242          }
 239  243  
 240  244          /*
 241  245           * If we have reached here, then atleast one RDMA plugin has loaded.
 242  246           * Create a master_xprt, make it start listenining on the device,
 243  247           * if an error is generated, record it, we might need to shut
 244  248           * the master_xprt.
 245  249           * SVC_START() calls svc_rdma_kstart which calls plugin binding
 246  250           * routines.
 247  251           */
 248  252          for (rmod = rdma_mod_head; rmod != NULL; rmod = rmod->r_next) {
 249  253  
 250  254                  /*
 251  255                   * One SVCMASTERXPRT per RDMA plugin.
 252  256                   */
 253  257                  xprt = kmem_zalloc(sizeof (*xprt), KM_SLEEP);
 254  258                  xprt->xp_ops = &rdma_svc_ops;
 255  259                  xprt->xp_sct = sct;
 256  260                  xprt->xp_type = T_RDMA;
 257  261                  mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL);
 258  262                  mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL);
 259  263                  xprt->xp_req_head = (mblk_t *)0;
 260  264                  xprt->xp_req_tail = (mblk_t *)0;
 261  265                  xprt->xp_full = FALSE;
 262  266                  xprt->xp_enable = FALSE;
 263  267                  xprt->xp_reqs = 0;
 264  268                  xprt->xp_size = 0;
 265  269                  xprt->xp_threads = 0;
 266  270                  xprt->xp_detached_threads = 0;
 267  271  
 268  272                  rd = kmem_zalloc(sizeof (*rd), KM_SLEEP);
 269  273                  xprt->xp_p2 = (caddr_t)rd;
 270  274                  rd->rd_xprt = xprt;
 271  275                  rd->r_mod = rmod->r_mod;
 272  276  
 273  277                  q = &rd->rd_data.q;
 274  278                  xprt->xp_wq = q;
 275  279                  q->q_ptr = &rd->rd_xprt;
 276  280                  xprt->xp_netid = NULL;
 277  281  
 278  282                  /*
 279  283                   * Each of the plugins will have their own Service ID
 280  284                   * to listener specific mapping, like port number for VI
 281  285                   * and service name for IB.
 282  286                   */
 283  287                  rd->rd_data.svcid = id;
 284  288                  error = svc_xprt_register(xprt, id);
 285  289                  if (error) {
 286  290                          DTRACE_PROBE(krpc__e__svcrdma__xprt__reg);
 287  291                          goto cleanup;
 288  292                  }
 289  293  
 290  294                  SVC_START(xprt);
 291  295                  if (!rd->rd_data.active) {
 292  296                          svc_xprt_unregister(xprt);
 293  297                          error = rd->rd_data.err_code;
 294  298                          goto cleanup;
 295  299                  }
 296  300  
 297  301                  /*
 298  302                   * This is set only when there is atleast one or more
 299  303                   * transports successfully created. We insert the pointer
 300  304                   * to the created RDMA master xprt into a separately maintained
 301  305                   * list. This way we can easily reference it later to cleanup,
 302  306                   * when NFS kRPC service pool is going away/unregistered.
 303  307                   */
 304  308                  started_xprts->rtg_count ++;
 305  309                  xprt_rec = kmem_alloc(sizeof (*xprt_rec), KM_SLEEP);
 306  310                  xprt_rec->rtr_xprt_ptr = xprt;
 307  311                  xprt_rec->rtr_next = started_xprts->rtg_listhead;
 308  312                  started_xprts->rtg_listhead = xprt_rec;
 309  313                  continue;
 310  314  cleanup:
 311  315                  SVC_DESTROY(xprt);
 312  316                  if (error == RDMA_FAILED)
 313  317                          error = EPROTONOSUPPORT;
 314  318          }
 315  319  
 316  320          rw_exit(&rdma_lock);
 317  321  
 318  322          /*
 319  323           * Don't return any error even if a single plugin was started
 320  324           * successfully.
 321  325           */
 322  326          if (started_xprts->rtg_count == 0)
 323  327                  return (error);
 324  328          return (0);
 325  329  }
 326  330  
 327  331  /*
 328  332   * Cleanup routine for freeing up memory allocated by
 329  333   * svc_rdma_kcreate()
 330  334   */
 331  335  void
 332  336  svc_rdma_kdestroy(SVCMASTERXPRT *xprt)
 333  337  {
 334  338          struct rdma_data *rd = (struct rdma_data *)xprt->xp_p2;
 335  339  
 336  340  
 337  341          mutex_destroy(&xprt->xp_req_lock);
 338  342          mutex_destroy(&xprt->xp_thread_lock);
 339  343          kmem_free(rd, sizeof (*rd));
 340  344          kmem_free(xprt, sizeof (*xprt));
 341  345  }
 342  346  
 343  347  
 344  348  static void
 345  349  svc_rdma_kstart(SVCMASTERXPRT *xprt)
 346  350  {
 347  351          struct rdma_svc_data *svcdata;
 348  352          rdma_mod_t *rmod;
 349  353  
 350  354          svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data;
 351  355          rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
 352  356  
 353  357          /*
 354  358           * Create a listener for  module at this port
 355  359           */
 356  360  
 357  361          if (rmod->rdma_count != 0)
 358  362                  (*rmod->rdma_ops->rdma_svc_listen)(svcdata);
 359  363          else
 360  364                  svcdata->err_code = RDMA_FAILED;
 361  365  }
 362  366  
 363  367  void
 364  368  svc_rdma_kstop(SVCMASTERXPRT *xprt)
 365  369  {
 366  370          struct rdma_svc_data *svcdata;
 367  371          rdma_mod_t *rmod;
 368  372  
 369  373          svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data;
 370  374          rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
 371  375  
 372  376          /*
 373  377           * Call the stop listener routine for each plugin. If rdma_count is
 374  378           * already zero set active to zero.
 375  379           */
 376  380          if (rmod->rdma_count != 0)
 377  381                  (*rmod->rdma_ops->rdma_svc_stop)(svcdata);
 378  382          else
 379  383                  svcdata->active = 0;
 380  384          if (svcdata->active)
 381  385                  DTRACE_PROBE(krpc__e__svcrdma__kstop);
 382  386  }
 383  387  
 384  388  /* ARGSUSED */
 385  389  static void
 386  390  svc_rdma_kclone_destroy(SVCXPRT *clone_xprt)
 387  391  {
 388  392  
 389  393          struct clone_rdma_data *cdrp;
 390  394          cdrp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 391  395  
 392  396          /*
 393  397           * Only free buffers and release connection when cloned is set.
 394  398           */
 395  399          if (cdrp->cloned != TRUE)
 396  400                  return;
 397  401  
 398  402          rdma_buf_free(cdrp->conn, &cdrp->rpcbuf);
 399  403          if (cdrp->cl_reply) {
 400  404                  clist_free(cdrp->cl_reply);
 401  405                  cdrp->cl_reply = NULL;
 402  406          }
 403  407          RDMA_REL_CONN(cdrp->conn);
 404  408  
 405  409          cdrp->cloned = 0;
 406  410  }
 407  411  
 408  412  /*
 409  413   * Clone the xprt specific information.  It will be freed by
 410  414   * SVC_CLONE_DESTROY.
 411  415   */
 412  416  static void
 413  417  svc_rdma_kclone_xprt(SVCXPRT *src_xprt, SVCXPRT *dst_xprt)
 414  418  {
 415  419          struct clone_rdma_data *srcp2;
 416  420          struct clone_rdma_data *dstp2;
 417  421  
 418  422          srcp2 = (struct clone_rdma_data *)src_xprt->xp_p2buf;
 419  423          dstp2 = (struct clone_rdma_data *)dst_xprt->xp_p2buf;
 420  424  
 421  425          if (srcp2->conn != NULL) {
 422  426                  srcp2->cloned = TRUE;
 423  427                  *dstp2 = *srcp2;
 424  428          }
 425  429  }
 426  430  
 427  431  static void
 428  432  svc_rdma_ktattrs(SVCXPRT *clone_xprt, int attrflag, void **tattr)
 429  433  {
 430  434          CONN    *conn;
 431  435          *tattr = NULL;
 432  436  
 433  437          switch (attrflag) {
 434  438          case SVC_TATTR_ADDRMASK:
 435  439                  conn = ((struct clone_rdma_data *)clone_xprt->xp_p2buf)->conn;
 436  440                  ASSERT(conn != NULL);
 437  441                  if (conn)
 438  442                          *tattr = (void *)&conn->c_addrmask;
 439  443          }
 440  444  }
 441  445  
 442  446  static bool_t
 443  447  svc_rdma_krecv(SVCXPRT *clone_xprt, mblk_t *mp, struct rpc_msg *msg)
 444  448  {
 445  449          XDR     *xdrs;
 446  450          CONN    *conn;
 447  451          rdma_recv_data_t        *rdp = (rdma_recv_data_t *)mp->b_rptr;
 448  452          struct clone_rdma_data *crdp;
 449  453          struct clist    *cl = NULL;
 450  454          struct clist    *wcl = NULL;
 451  455          struct clist    *cllong = NULL;
 452  456  
 453  457          rdma_stat       status;
 454  458          uint32_t vers, op, pos, xid;
 455  459          uint32_t rdma_credit;
 456  460          uint32_t wcl_total_length = 0;
 457  461          bool_t  wwl = FALSE;
 458  462  
 459  463          crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 460  464          RSSTAT_INCR(rscalls);
 461  465          conn = rdp->conn;
 462  466  
 463  467          status = rdma_svc_postrecv(conn);
 464  468          if (status != RDMA_SUCCESS) {
 465  469                  DTRACE_PROBE(krpc__e__svcrdma__krecv__postrecv);
 466  470                  goto badrpc_call;
 467  471          }
 468  472  
 469  473          xdrs = &clone_xprt->xp_xdrin;
 470  474          xdrmem_create(xdrs, rdp->rpcmsg.addr, rdp->rpcmsg.len, XDR_DECODE);
 471  475          xid = *(uint32_t *)rdp->rpcmsg.addr;
 472  476          XDR_SETPOS(xdrs, sizeof (uint32_t));
 473  477  
 474  478          if (! xdr_u_int(xdrs, &vers) ||
 475  479              ! xdr_u_int(xdrs, &rdma_credit) ||
 476  480              ! xdr_u_int(xdrs, &op)) {
 477  481                  DTRACE_PROBE(krpc__e__svcrdma__krecv__uint);
 478  482                  goto xdr_err;
 479  483          }
 480  484  
 481  485          /* Checking if the status of the recv operation was normal */
 482  486          if (rdp->status != 0) {
 483  487                  DTRACE_PROBE1(krpc__e__svcrdma__krecv__invalid__status,
 484  488                      int, rdp->status);
 485  489                  goto badrpc_call;
 486  490          }
 487  491  
 488  492          if (! xdr_do_clist(xdrs, &cl)) {
 489  493                  DTRACE_PROBE(krpc__e__svcrdma__krecv__do__clist);
 490  494                  goto xdr_err;
 491  495          }
 492  496  
 493  497          if (!xdr_decode_wlist_svc(xdrs, &wcl, &wwl, &wcl_total_length, conn)) {
 494  498                  DTRACE_PROBE(krpc__e__svcrdma__krecv__decode__wlist);
 495  499                  if (cl)
 496  500                          clist_free(cl);
 497  501                  goto xdr_err;
 498  502          }
 499  503          crdp->cl_wlist = wcl;
 500  504  
 501  505          crdp->cl_reply = NULL;
 502  506          (void) xdr_decode_reply_wchunk(xdrs, &crdp->cl_reply);
 503  507  
 504  508          /*
 505  509           * A chunk at 0 offset indicates that the RPC call message
 506  510           * is in a chunk. Get the RPC call message chunk.
 507  511           */
 508  512          if (cl != NULL && op == RDMA_NOMSG) {
 509  513  
 510  514                  /* Remove RPC call message chunk from chunklist */
 511  515                  cllong = cl;
 512  516                  cl = cl->c_next;
 513  517                  cllong->c_next = NULL;
 514  518  
 515  519  
 516  520                  /* Allocate and register memory for the RPC call msg chunk */
 517  521                  cllong->rb_longbuf.type = RDMA_LONG_BUFFER;
 518  522                  cllong->rb_longbuf.len = cllong->c_len > LONG_REPLY_LEN ?
 519  523                      cllong->c_len : LONG_REPLY_LEN;
 520  524  
 521  525                  if (rdma_buf_alloc(conn, &cllong->rb_longbuf)) {
 522  526                          clist_free(cllong);
 523  527                          goto cll_malloc_err;
 524  528                  }
 525  529  
 526  530                  cllong->u.c_daddr3 = cllong->rb_longbuf.addr;
 527  531  
 528  532                  if (cllong->u.c_daddr == NULL) {
 529  533                          DTRACE_PROBE(krpc__e__svcrdma__krecv__nomem);
 530  534                          rdma_buf_free(conn, &cllong->rb_longbuf);
 531  535                          clist_free(cllong);
 532  536                          goto cll_malloc_err;
 533  537                  }
 534  538  
 535  539                  status = clist_register(conn, cllong, CLIST_REG_DST);
 536  540                  if (status) {
 537  541                          DTRACE_PROBE(krpc__e__svcrdma__krecv__clist__reg);
 538  542                          rdma_buf_free(conn, &cllong->rb_longbuf);
 539  543                          clist_free(cllong);
 540  544                          goto cll_malloc_err;
 541  545                  }
 542  546  
 543  547                  /*
 544  548                   * Now read the RPC call message in
 545  549                   */
 546  550                  status = RDMA_READ(conn, cllong, WAIT);
 547  551                  if (status) {
 548  552                          DTRACE_PROBE(krpc__e__svcrdma__krecv__read);
 549  553                          (void) clist_deregister(conn, cllong);
 550  554                          rdma_buf_free(conn, &cllong->rb_longbuf);
 551  555                          clist_free(cllong);
 552  556                          goto cll_malloc_err;
 553  557                  }
 554  558  
 555  559                  status = clist_syncmem(conn, cllong, CLIST_REG_DST);
 556  560                  (void) clist_deregister(conn, cllong);
 557  561  
 558  562                  xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->u.c_daddr3,
 559  563                      cllong->c_len, 0, cl, XDR_DECODE, conn);
 560  564  
 561  565                  crdp->rpcbuf = cllong->rb_longbuf;
 562  566                  crdp->rpcbuf.len = cllong->c_len;
 563  567                  clist_free(cllong);
 564  568                  RDMA_BUF_FREE(conn, &rdp->rpcmsg);
 565  569          } else {
 566  570                  pos = XDR_GETPOS(xdrs);
 567  571                  xdrrdma_create(xdrs, rdp->rpcmsg.addr + pos,
 568  572                      rdp->rpcmsg.len - pos, 0, cl, XDR_DECODE, conn);
 569  573                  crdp->rpcbuf = rdp->rpcmsg;
 570  574  
 571  575                  /* Use xdrrdmablk_ops to indicate there is a read chunk list */
 572  576                  if (cl != NULL) {
 573  577                          int32_t flg = XDR_RDMA_RLIST_REG;
 574  578  
 575  579                          XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg);
 576  580                          xdrs->x_ops = &xdrrdmablk_ops;
 577  581                  }
 578  582          }
 579  583  
 580  584          if (crdp->cl_wlist) {
 581  585                  int32_t flg = XDR_RDMA_WLIST_REG;
 582  586  
 583  587                  XDR_CONTROL(xdrs, XDR_RDMA_SET_WLIST, crdp->cl_wlist);
 584  588                  XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg);
 585  589          }
 586  590  
 587  591          if (! xdr_callmsg(xdrs, msg)) {
 588  592                  DTRACE_PROBE(krpc__e__svcrdma__krecv__callmsg);
 589  593                  RSSTAT_INCR(rsxdrcall);
 590  594                  goto callmsg_err;
 591  595          }
 592  596  
 593  597          /*
 594  598           * Point the remote transport address in the service_transport
 595  599           * handle at the address in the request.
 596  600           */
 597  601          clone_xprt->xp_rtaddr.buf = conn->c_raddr.buf;
 598  602          clone_xprt->xp_rtaddr.len = conn->c_raddr.len;
 599  603          clone_xprt->xp_rtaddr.maxlen = conn->c_raddr.len;
 600  604  
 601  605          clone_xprt->xp_lcladdr.buf = conn->c_laddr.buf;
 602  606          clone_xprt->xp_lcladdr.len = conn->c_laddr.len;
 603  607          clone_xprt->xp_lcladdr.maxlen = conn->c_laddr.len;
 604  608  
 605  609          /*
 606  610           * In case of RDMA, connection management is
 607  611           * entirely done in rpcib module and netid in the
 608  612           * SVCMASTERXPRT is NULL. Initialize the clone netid
 609  613           * from the connection.
 610  614           */
 611  615  
 612  616          clone_xprt->xp_netid = conn->c_netid;
 613  617  
 614  618          clone_xprt->xp_xid = xid;
 615  619          crdp->conn = conn;
 616  620  
 617  621          freeb(mp);
 618  622  
 619  623          return (TRUE);
 620  624  
 621  625  callmsg_err:
 622  626          rdma_buf_free(conn, &crdp->rpcbuf);
 623  627  
 624  628  cll_malloc_err:
 625  629          if (cl)
 626  630                  clist_free(cl);
 627  631  xdr_err:
 628  632          XDR_DESTROY(xdrs);
 629  633  
 630  634  badrpc_call:
 631  635          RDMA_BUF_FREE(conn, &rdp->rpcmsg);
 632  636          RDMA_REL_CONN(conn);
 633  637          freeb(mp);
 634  638          RSSTAT_INCR(rsbadcalls);
 635  639          return (FALSE);
 636  640  }
 637  641  
 638  642  static int
 639  643  svc_process_long_reply(SVCXPRT * clone_xprt,
 640  644      xdrproc_t xdr_results, caddr_t xdr_location,
 641  645      struct rpc_msg *msg, bool_t has_args, int *msglen,
 642  646      int *freelen, int *numchunks, unsigned int *final_len)
 643  647  {
 644  648          int status;
 645  649          XDR xdrslong;
 646  650          struct clist *wcl = NULL;
 647  651          int count = 0;
 648  652          int alloc_len;
 649  653          char  *memp;
 650  654          rdma_buf_t long_rpc = {0};
 651  655          struct clone_rdma_data *crdp;
 652  656  
 653  657          crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 654  658  
 655  659          bzero(&xdrslong, sizeof (xdrslong));
 656  660  
 657  661          /* Choose a size for the long rpc response */
 658  662          if (MSG_IS_RPCSEC_GSS(msg)) {
 659  663                  alloc_len = RNDUP(MAX_AUTH_BYTES + *msglen);
 660  664          } else {
 661  665                  alloc_len = RNDUP(*msglen);
 662  666          }
 663  667  
 664  668          if (alloc_len <= 64 * 1024) {
 665  669                  if (alloc_len > 32 * 1024) {
 666  670                          alloc_len = 64 * 1024;
 667  671                  } else {
 668  672                          if (alloc_len > 16 * 1024) {
 669  673                                  alloc_len = 32 * 1024;
 670  674                          } else {
 671  675                                  alloc_len = 16 * 1024;
 672  676                          }
 673  677                  }
 674  678          }
 675  679  
 676  680          long_rpc.type = RDMA_LONG_BUFFER;
 677  681          long_rpc.len = alloc_len;
 678  682          if (rdma_buf_alloc(crdp->conn, &long_rpc)) {
 679  683                  return (SVC_RDMA_FAIL);
 680  684          }
 681  685  
 682  686          memp = long_rpc.addr;
 683  687          xdrmem_create(&xdrslong, memp, alloc_len, XDR_ENCODE);
 684  688  
 685  689          msg->rm_xid = clone_xprt->xp_xid;
 686  690  
 687  691          if (!(xdr_replymsg(&xdrslong, msg) &&
 688  692              (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, &xdrslong,
 689  693              xdr_results, xdr_location)))) {
 690  694                  rdma_buf_free(crdp->conn, &long_rpc);
 691  695                  DTRACE_PROBE(krpc__e__svcrdma__longrep__authwrap);
 692  696                  return (SVC_RDMA_FAIL);
 693  697          }
 694  698  
 695  699          *final_len = XDR_GETPOS(&xdrslong);
 696  700  
 697  701          DTRACE_PROBE1(krpc__i__replylen, uint_t, *final_len);
 698  702          *numchunks = 0;
 699  703          *freelen = 0;
 700  704  
 701  705          wcl = crdp->cl_reply;
 702  706          wcl->rb_longbuf = long_rpc;
 703  707  
 704  708          count = *final_len;
 705  709          while ((wcl != NULL) && (count > 0)) {
 706  710  
 707  711                  if (wcl->c_dmemhandle.mrc_rmr == 0)
 708  712                          break;
 709  713  
 710  714                  DTRACE_PROBE2(krpc__i__write__chunks, uint32_t, count,
 711  715                      uint32_t, wcl->c_len);
 712  716  
 713  717                  if (wcl->c_len > count) {
 714  718                          wcl->c_len = count;
 715  719                  }
 716  720                  wcl->w.c_saddr3 = (caddr_t)memp;
 717  721  
 718  722                  count -= wcl->c_len;
 719  723                  *numchunks +=  1;
 720  724                  memp += wcl->c_len;
 721  725                  wcl = wcl->c_next;
 722  726          }
 723  727  
 724  728          /*
 725  729           * Make rest of the chunks 0-len
 726  730           */
 727  731          while (wcl != NULL) {
 728  732                  if (wcl->c_dmemhandle.mrc_rmr == 0)
 729  733                          break;
 730  734                  wcl->c_len = 0;
 731  735                  wcl = wcl->c_next;
 732  736          }
 733  737  
 734  738          wcl = crdp->cl_reply;
 735  739  
 736  740          /*
 737  741           * MUST fail if there are still more data
 738  742           */
 739  743          if (count > 0) {
 740  744                  rdma_buf_free(crdp->conn, &long_rpc);
 741  745                  DTRACE_PROBE(krpc__e__svcrdma__longrep__dlen__clist);
 742  746                  return (SVC_RDMA_FAIL);
 743  747          }
 744  748  
 745  749          if (clist_register(crdp->conn, wcl, CLIST_REG_SOURCE) != RDMA_SUCCESS) {
 746  750                  rdma_buf_free(crdp->conn, &long_rpc);
 747  751                  DTRACE_PROBE(krpc__e__svcrdma__longrep__clistreg);
 748  752                  return (SVC_RDMA_FAIL);
 749  753          }
 750  754  
 751  755          status = clist_syncmem(crdp->conn, wcl, CLIST_REG_SOURCE);
 752  756  
 753  757          if (status) {
 754  758                  (void) clist_deregister(crdp->conn, wcl);
 755  759                  rdma_buf_free(crdp->conn, &long_rpc);
 756  760                  DTRACE_PROBE(krpc__e__svcrdma__longrep__syncmem);
 757  761                  return (SVC_RDMA_FAIL);
 758  762          }
 759  763  
 760  764          status = RDMA_WRITE(crdp->conn, wcl, WAIT);
 761  765  
 762  766          (void) clist_deregister(crdp->conn, wcl);
 763  767          rdma_buf_free(crdp->conn, &wcl->rb_longbuf);
 764  768  
 765  769          if (status != RDMA_SUCCESS) {
 766  770                  DTRACE_PROBE(krpc__e__svcrdma__longrep__write);
 767  771                  return (SVC_RDMA_FAIL);
 768  772          }
 769  773  
 770  774          return (SVC_RDMA_SUCCESS);
 771  775  }
 772  776  
 773  777  
 774  778  static int
 775  779  svc_compose_rpcmsg(SVCXPRT * clone_xprt, CONN * conn, xdrproc_t xdr_results,
 776  780      caddr_t xdr_location, rdma_buf_t *rpcreply, XDR ** xdrs,
 777  781      struct rpc_msg *msg, bool_t has_args, uint_t *len)
 778  782  {
 779  783          /*
 780  784           * Get a pre-allocated buffer for rpc reply
 781  785           */
 782  786          rpcreply->type = SEND_BUFFER;
 783  787          if (rdma_buf_alloc(conn, rpcreply)) {
 784  788                  DTRACE_PROBE(krpc__e__svcrdma__rpcmsg__reply__nofreebufs);
 785  789                  return (SVC_RDMA_FAIL);
 786  790          }
 787  791  
 788  792          xdrrdma_create(*xdrs, rpcreply->addr, rpcreply->len,
 789  793              0, NULL, XDR_ENCODE, conn);
 790  794  
 791  795          msg->rm_xid = clone_xprt->xp_xid;
 792  796  
 793  797          if (has_args) {
 794  798                  if (!(xdr_replymsg(*xdrs, msg) &&
 795  799                      (!has_args ||
 796  800                      SVCAUTH_WRAP(&clone_xprt->xp_auth, *xdrs,
 797  801                      xdr_results, xdr_location)))) {
 798  802                          rdma_buf_free(conn, rpcreply);
 799  803                          DTRACE_PROBE(
 800  804                              krpc__e__svcrdma__rpcmsg__reply__authwrap1);
 801  805                          return (SVC_RDMA_FAIL);
 802  806                  }
 803  807          } else {
 804  808                  if (!xdr_replymsg(*xdrs, msg)) {
 805  809                          rdma_buf_free(conn, rpcreply);
 806  810                          DTRACE_PROBE(
 807  811                              krpc__e__svcrdma__rpcmsg__reply__authwrap2);
 808  812                          return (SVC_RDMA_FAIL);
 809  813                  }
 810  814          }
 811  815  
 812  816          *len = XDR_GETPOS(*xdrs);
 813  817  
 814  818          return (SVC_RDMA_SUCCESS);
 815  819  }
 816  820  
 817  821  /*
 818  822   * Send rpc reply.
 819  823   */
 820  824  static bool_t
 821  825  svc_rdma_ksend(SVCXPRT * clone_xprt, struct rpc_msg *msg)
 822  826  {
 823  827          XDR *xdrs_rpc = &(clone_xprt->xp_xdrout);
 824  828          XDR xdrs_rhdr;
 825  829          CONN *conn = NULL;
 826  830          rdma_buf_t rbuf_resp = {0}, rbuf_rpc_resp = {0};
 827  831  
 828  832          struct clone_rdma_data *crdp;
 829  833          struct clist *cl_read = NULL;
 830  834          struct clist *cl_send = NULL;
 831  835          struct clist *cl_write = NULL;
 832  836          xdrproc_t xdr_results;          /* results XDR encoding function */
 833  837          caddr_t xdr_location;           /* response results pointer */
 834  838  
 835  839          int retval = FALSE;
 836  840          int status, msglen, num_wreply_segments = 0;
 837  841          uint32_t rdma_credit = 0;
 838  842          int freelen = 0;
 839  843          bool_t has_args;
 840  844          uint_t  final_resp_len, rdma_response_op, vers;
 841  845  
 842  846          bzero(&xdrs_rhdr, sizeof (XDR));
 843  847          crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 844  848          conn = crdp->conn;
 845  849  
 846  850          /*
 847  851           * If there is a result procedure specified in the reply message,
 848  852           * it will be processed in the xdr_replymsg and SVCAUTH_WRAP.
 849  853           * We need to make sure it won't be processed twice, so we null
 850  854           * it for xdr_replymsg here.
 851  855           */
 852  856          has_args = FALSE;
 853  857          if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
 854  858              msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
 855  859                  if ((xdr_results = msg->acpted_rply.ar_results.proc) != NULL) {
 856  860                          has_args = TRUE;
 857  861                          xdr_location = msg->acpted_rply.ar_results.where;
 858  862                          msg->acpted_rply.ar_results.proc = xdr_void;
 859  863                          msg->acpted_rply.ar_results.where = NULL;
 860  864                  }
 861  865          }
 862  866  
 863  867          /*
 864  868           * Given the limit on the inline response size (RPC_MSG_SZ),
 865  869           * there is a need to make a guess as to the overall size of
 866  870           * the response.  If the resultant size is beyond the inline
 867  871           * size, then the server needs to use the "reply chunk list"
 868  872           * provided by the client (if the client provided one).  An
 869  873           * example of this type of response would be a READDIR
 870  874           * response (e.g. a small directory read would fit in RPC_MSG_SZ
 871  875           * and that is the preference but it may not fit)
 872  876           *
 873  877           * Combine the encoded size and the size of the true results
 874  878           * and then make the decision about where to encode and send results.
 875  879           *
 876  880           * One important note, this calculation is ignoring the size
 877  881           * of the encoding of the authentication overhead.  The reason
 878  882           * for this is rooted in the complexities of access to the
 879  883           * encoded size of RPCSEC_GSS related authentiation,
 880  884           * integrity, and privacy.
 881  885           *
 882  886           * If it turns out that the encoded authentication bumps the
 883  887           * response over the RPC_MSG_SZ limit, then it may need to
 884  888           * attempt to encode for the reply chunk list.
 885  889           */
 886  890  
 887  891          /*
 888  892           * Calculating the "sizeof" the RPC response header and the
 889  893           * encoded results.
 890  894           */
 891  895          msglen = xdr_sizeof(xdr_replymsg, msg);
 892  896  
 893  897          if (msglen > 0) {
 894  898                  RSSTAT_INCR(rstotalreplies);
 895  899          }
 896  900          if (has_args)
 897  901                  msglen += xdrrdma_sizeof(xdr_results, xdr_location,
 898  902                      rdma_minchunk, NULL, NULL);
 899  903  
 900  904          DTRACE_PROBE1(krpc__i__svcrdma__ksend__msglen, int, msglen);
 901  905  
 902  906          status = SVC_RDMA_SUCCESS;
 903  907  
 904  908          if (msglen < RPC_MSG_SZ) {
 905  909                  /*
 906  910                   * Looks like the response will fit in the inline
 907  911                   * response; let's try
 908  912                   */
 909  913                  RSSTAT_INCR(rstotalinlinereplies);
 910  914  
 911  915                  rdma_response_op = RDMA_MSG;
 912  916  
 913  917                  status = svc_compose_rpcmsg(clone_xprt, conn, xdr_results,
 914  918                      xdr_location, &rbuf_rpc_resp, &xdrs_rpc, msg,
 915  919                      has_args, &final_resp_len);
 916  920  
 917  921                  DTRACE_PROBE1(krpc__i__srdma__ksend__compose_status,
 918  922                      int, status);
 919  923                  DTRACE_PROBE1(krpc__i__srdma__ksend__compose_len,
 920  924                      int, final_resp_len);
 921  925  
 922  926                  if (status == SVC_RDMA_SUCCESS && crdp->cl_reply) {
 923  927                          clist_free(crdp->cl_reply);
 924  928                          crdp->cl_reply = NULL;
 925  929                  }
 926  930          }
 927  931  
 928  932          /*
 929  933           * If the encode failed (size?) or the message really is
 930  934           * larger than what is allowed, try the response chunk list.
 931  935           */
 932  936          if (status != SVC_RDMA_SUCCESS || msglen >= RPC_MSG_SZ) {
 933  937                  /*
 934  938                   * attempting to use a reply chunk list when there
 935  939                   * isn't one won't get very far...
 936  940                   */
 937  941                  if (crdp->cl_reply == NULL) {
 938  942                          DTRACE_PROBE(krpc__e__svcrdma__ksend__noreplycl);
 939  943                          goto out;
 940  944                  }
 941  945  
 942  946                  RSSTAT_INCR(rstotallongreplies);
 943  947  
 944  948                  msglen = xdr_sizeof(xdr_replymsg, msg);
 945  949                  msglen += xdrrdma_sizeof(xdr_results, xdr_location, 0,
 946  950                      NULL, NULL);
 947  951  
 948  952                  status = svc_process_long_reply(clone_xprt, xdr_results,
 949  953                      xdr_location, msg, has_args, &msglen, &freelen,
 950  954                      &num_wreply_segments, &final_resp_len);
 951  955  
 952  956                  DTRACE_PROBE1(krpc__i__svcrdma__ksend__longreplen,
 953  957                      int, final_resp_len);
 954  958  
 955  959                  if (status != SVC_RDMA_SUCCESS) {
 956  960                          DTRACE_PROBE(krpc__e__svcrdma__ksend__compose__failed);
 957  961                          goto out;
 958  962                  }
 959  963  
 960  964                  rdma_response_op = RDMA_NOMSG;
 961  965          }
 962  966  
 963  967          DTRACE_PROBE1(krpc__i__svcrdma__ksend__rdmamsg__len,
 964  968              int, final_resp_len);
 965  969  
 966  970          rbuf_resp.type = SEND_BUFFER;
 967  971          if (rdma_buf_alloc(conn, &rbuf_resp)) {
 968  972                  rdma_buf_free(conn, &rbuf_rpc_resp);
 969  973                  DTRACE_PROBE(krpc__e__svcrdma__ksend__nofreebufs);
 970  974                  goto out;
 971  975          }
 972  976  
 973  977          rdma_credit = rdma_bufs_granted;
 974  978  
 975  979          vers = RPCRDMA_VERS;
 976  980          xdrmem_create(&xdrs_rhdr, rbuf_resp.addr, rbuf_resp.len, XDR_ENCODE);
 977  981          (*(uint32_t *)rbuf_resp.addr) = msg->rm_xid;
 978  982          /* Skip xid and set the xdr position accordingly. */
 979  983          XDR_SETPOS(&xdrs_rhdr, sizeof (uint32_t));
 980  984          if (!xdr_u_int(&xdrs_rhdr, &vers) ||
 981  985              !xdr_u_int(&xdrs_rhdr, &rdma_credit) ||
 982  986              !xdr_u_int(&xdrs_rhdr, &rdma_response_op)) {
 983  987                  rdma_buf_free(conn, &rbuf_rpc_resp);
 984  988                  rdma_buf_free(conn, &rbuf_resp);
 985  989                  DTRACE_PROBE(krpc__e__svcrdma__ksend__uint);
 986  990                  goto out;
 987  991          }
 988  992  
 989  993          /*
 990  994           * Now XDR the read chunk list, actually always NULL
 991  995           */
 992  996          (void) xdr_encode_rlist_svc(&xdrs_rhdr, cl_read);
 993  997  
 994  998          /*
 995  999           * encode write list -- we already drove RDMA_WRITEs
 996 1000           */
 997 1001          cl_write = crdp->cl_wlist;
 998 1002          if (!xdr_encode_wlist(&xdrs_rhdr, cl_write)) {
 999 1003                  DTRACE_PROBE(krpc__e__svcrdma__ksend__enc__wlist);
1000 1004                  rdma_buf_free(conn, &rbuf_rpc_resp);
1001 1005                  rdma_buf_free(conn, &rbuf_resp);
1002 1006                  goto out;
1003 1007          }
1004 1008  
1005 1009          /*
1006 1010           * XDR encode the RDMA_REPLY write chunk
1007 1011           */
1008 1012          if (!xdr_encode_reply_wchunk(&xdrs_rhdr, crdp->cl_reply,
1009 1013              num_wreply_segments)) {
1010 1014                  rdma_buf_free(conn, &rbuf_rpc_resp);
1011 1015                  rdma_buf_free(conn, &rbuf_resp);
1012 1016                  goto out;
1013 1017          }
1014 1018  
1015 1019          clist_add(&cl_send, 0, XDR_GETPOS(&xdrs_rhdr), &rbuf_resp.handle,
1016 1020              rbuf_resp.addr, NULL, NULL);
1017 1021  
1018 1022          if (rdma_response_op == RDMA_MSG) {
1019 1023                  clist_add(&cl_send, 0, final_resp_len, &rbuf_rpc_resp.handle,
1020 1024                      rbuf_rpc_resp.addr, NULL, NULL);
1021 1025          }
1022 1026  
1023 1027          status = RDMA_SEND(conn, cl_send, msg->rm_xid);
1024 1028  
1025 1029          if (status == RDMA_SUCCESS) {
1026 1030                  retval = TRUE;
1027 1031          }
1028 1032  
1029 1033  out:
1030 1034          /*
1031 1035           * Free up sendlist chunks
1032 1036           */
1033 1037          if (cl_send != NULL)
1034 1038                  clist_free(cl_send);
1035 1039  
1036 1040          /*
1037 1041           * Destroy private data for xdr rdma
1038 1042           */
1039 1043          if (clone_xprt->xp_xdrout.x_ops != NULL) {
1040 1044                  XDR_DESTROY(&(clone_xprt->xp_xdrout));
1041 1045          }
1042 1046  
1043 1047          if (crdp->cl_reply) {
1044 1048                  clist_free(crdp->cl_reply);
1045 1049                  crdp->cl_reply = NULL;
1046 1050          }
1047 1051  
1048 1052          /*
1049 1053           * This is completely disgusting.  If public is set it is
1050 1054           * a pointer to a structure whose first field is the address
1051 1055           * of the function to free that structure and any related
1052 1056           * stuff.  (see rrokfree in nfs_xdr.c).
1053 1057           */
1054 1058          if (xdrs_rpc->x_public) {
1055 1059                  /* LINTED pointer alignment */
1056 1060                  (**((int (**)()) xdrs_rpc->x_public)) (xdrs_rpc->x_public);
1057 1061          }
1058 1062  
1059 1063          if (xdrs_rhdr.x_ops != NULL) {
1060 1064                  XDR_DESTROY(&xdrs_rhdr);
1061 1065          }
1062 1066  
1063 1067          return (retval);
1064 1068  }
1065 1069  
1066 1070  /*
1067 1071   * Deserialize arguments.
1068 1072   */
1069 1073  static bool_t
1070 1074  svc_rdma_kgetargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, caddr_t args_ptr)
1071 1075  {
1072 1076          if ((SVCAUTH_UNWRAP(&clone_xprt->xp_auth, &clone_xprt->xp_xdrin,
1073 1077              xdr_args, args_ptr)) != TRUE)
1074 1078                  return (FALSE);
1075 1079          return (TRUE);
1076 1080  }
1077 1081  
1078 1082  static bool_t
1079 1083  svc_rdma_kfreeargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args,
1080 1084      caddr_t args_ptr)
1081 1085  {
1082 1086          struct clone_rdma_data *crdp;
1083 1087          bool_t retval;
1084 1088  
1085 1089          /*
1086 1090           * If the cloned bit is true, then this transport specific
1087 1091           * rmda data has been duplicated into another cloned xprt. Do
1088 1092           * not free, or release the connection, it is still in use.  The
1089 1093           * buffers will be freed and the connection released later by
1090 1094           * SVC_CLONE_DESTROY().
1091 1095           */
1092 1096          crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
1093 1097          if (crdp->cloned == TRUE) {
1094 1098                  crdp->cloned = 0;
1095 1099                  return (TRUE);
1096 1100          }
1097 1101  
1098 1102          /*
1099 1103           * Free the args if needed then XDR_DESTROY
1100 1104           */
1101 1105          if (args_ptr) {
1102 1106                  XDR     *xdrs = &clone_xprt->xp_xdrin;
1103 1107  
1104 1108                  xdrs->x_op = XDR_FREE;
1105 1109                  retval = (*xdr_args)(xdrs, args_ptr);
1106 1110          }
1107 1111  
1108 1112          XDR_DESTROY(&(clone_xprt->xp_xdrin));
1109 1113          rdma_buf_free(crdp->conn, &crdp->rpcbuf);
1110 1114          if (crdp->cl_reply) {
1111 1115                  clist_free(crdp->cl_reply);
1112 1116                  crdp->cl_reply = NULL;
1113 1117          }
1114 1118          RDMA_REL_CONN(crdp->conn);
1115 1119  
1116 1120          return (retval);
1117 1121  }
1118 1122  
1119 1123  /* ARGSUSED */
1120 1124  static int32_t *
1121 1125  svc_rdma_kgetres(SVCXPRT *clone_xprt, int size)
1122 1126  {
1123 1127          return (NULL);
1124 1128  }
1125 1129  
1126 1130  /* ARGSUSED */
1127 1131  static void
1128 1132  svc_rdma_kfreeres(SVCXPRT *clone_xprt)
1129 1133  {
1130 1134  }
1131 1135  
1132 1136  /*
1133 1137   * the dup cacheing routines below provide a cache of non-failure
1134 1138   * transaction id's.  rpc service routines can use this to detect
1135 1139   * retransmissions and re-send a non-failure response.
1136 1140   */
1137 1141  
1138 1142  /*
1139 1143   * MAXDUPREQS is the number of cached items.  It should be adjusted
1140 1144   * to the service load so that there is likely to be a response entry
1141 1145   * when the first retransmission comes in.
1142 1146   */
1143 1147  #define MAXDUPREQS      8192
1144 1148  
1145 1149  /*
1146 1150   * This should be appropriately scaled to MAXDUPREQS.  To produce as less as
1147 1151   * possible collisions it is suggested to set this to a prime.
1148 1152   */
1149 1153  #define DRHASHSZ        2053
1150 1154  
1151 1155  #define XIDHASH(xid)    ((xid) % DRHASHSZ)
1152 1156  #define DRHASH(dr)      XIDHASH((dr)->dr_xid)
1153 1157  #define REQTOXID(req)   ((req)->rq_xprt->xp_xid)
1154 1158  
1155 1159  static int      rdmandupreqs = 0;
1156 1160  int     rdmamaxdupreqs = MAXDUPREQS;
1157 1161  static kmutex_t rdmadupreq_lock;
1158 1162  static struct dupreq *rdmadrhashtbl[DRHASHSZ];
1159 1163  static int      rdmadrhashstat[DRHASHSZ];
1160 1164  
1161 1165  static void unhash(struct dupreq *);
1162 1166  
1163 1167  /*
1164 1168   * rdmadrmru points to the head of a circular linked list in lru order.
1165 1169   * rdmadrmru->dr_next == drlru
1166 1170   */
1167 1171  struct dupreq *rdmadrmru;
1168 1172  
1169 1173  /*
1170 1174   * svc_rdma_kdup searches the request cache and returns 0 if the
1171 1175   * request is not found in the cache.  If it is found, then it
1172 1176   * returns the state of the request (in progress or done) and
1173 1177   * the status or attributes that were part of the original reply.
1174 1178   */
1175 1179  static int
1176 1180  svc_rdma_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp,
1177 1181      bool_t *dupcachedp)
1178 1182  {
1179 1183          struct dupreq *dr;
1180 1184          uint32_t xid;
1181 1185          uint32_t drhash;
1182 1186          int status;
1183 1187  
1184 1188          xid = REQTOXID(req);
1185 1189          mutex_enter(&rdmadupreq_lock);
1186 1190          RSSTAT_INCR(rsdupchecks);
1187 1191          /*
1188 1192           * Check to see whether an entry already exists in the cache.
1189 1193           */
1190 1194          dr = rdmadrhashtbl[XIDHASH(xid)];
1191 1195          while (dr != NULL) {
1192 1196                  if (dr->dr_xid == xid &&
1193 1197                      dr->dr_proc == req->rq_proc &&
1194 1198                      dr->dr_prog == req->rq_prog &&
1195 1199                      dr->dr_vers == req->rq_vers &&
1196 1200                      dr->dr_addr.len == req->rq_xprt->xp_rtaddr.len &&
1197 1201                      bcmp((caddr_t)dr->dr_addr.buf,
1198 1202                      (caddr_t)req->rq_xprt->xp_rtaddr.buf,
1199 1203                      dr->dr_addr.len) == 0) {
1200 1204                          status = dr->dr_status;
1201 1205                          if (status == DUP_DONE) {
1202 1206                                  bcopy(dr->dr_resp.buf, res, size);
1203 1207                                  if (dupcachedp != NULL)
1204 1208                                          *dupcachedp = (dr->dr_resfree != NULL);
1205 1209                          } else {
1206 1210                                  dr->dr_status = DUP_INPROGRESS;
1207 1211                                  *drpp = dr;
1208 1212                          }
1209 1213                          RSSTAT_INCR(rsdupreqs);
1210 1214                          mutex_exit(&rdmadupreq_lock);
1211 1215                          return (status);
1212 1216                  }
1213 1217                  dr = dr->dr_chain;
1214 1218          }
1215 1219  
1216 1220          /*
1217 1221           * There wasn't an entry, either allocate a new one or recycle
1218 1222           * an old one.
1219 1223           */
1220 1224          if (rdmandupreqs < rdmamaxdupreqs) {
1221 1225                  dr = kmem_alloc(sizeof (*dr), KM_NOSLEEP);
1222 1226                  if (dr == NULL) {
1223 1227                          mutex_exit(&rdmadupreq_lock);
1224 1228                          return (DUP_ERROR);
1225 1229                  }
1226 1230                  dr->dr_resp.buf = NULL;
1227 1231                  dr->dr_resp.maxlen = 0;
1228 1232                  dr->dr_addr.buf = NULL;
1229 1233                  dr->dr_addr.maxlen = 0;
1230 1234                  if (rdmadrmru) {
1231 1235                          dr->dr_next = rdmadrmru->dr_next;
1232 1236                          rdmadrmru->dr_next = dr;
1233 1237                  } else {
1234 1238                          dr->dr_next = dr;
1235 1239                  }
1236 1240                  rdmandupreqs++;
1237 1241          } else {
1238 1242                  dr = rdmadrmru->dr_next;
1239 1243                  while (dr->dr_status == DUP_INPROGRESS) {
1240 1244                          dr = dr->dr_next;
1241 1245                          if (dr == rdmadrmru->dr_next) {
1242 1246                                  mutex_exit(&rdmadupreq_lock);
1243 1247                                  return (DUP_ERROR);
1244 1248                          }
1245 1249                  }
1246 1250                  unhash(dr);
1247 1251                  if (dr->dr_resfree) {
1248 1252                          (*dr->dr_resfree)(dr->dr_resp.buf);
1249 1253                  }
1250 1254          }
1251 1255          dr->dr_resfree = NULL;
1252 1256          rdmadrmru = dr;
1253 1257  
1254 1258          dr->dr_xid = REQTOXID(req);
1255 1259          dr->dr_prog = req->rq_prog;
1256 1260          dr->dr_vers = req->rq_vers;
1257 1261          dr->dr_proc = req->rq_proc;
1258 1262          if (dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) {
1259 1263                  if (dr->dr_addr.buf != NULL)
1260 1264                          kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen);
1261 1265                  dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len;
1262 1266                  dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP);
1263 1267                  if (dr->dr_addr.buf == NULL) {
1264 1268                          dr->dr_addr.maxlen = 0;
1265 1269                          dr->dr_status = DUP_DROP;
1266 1270                          mutex_exit(&rdmadupreq_lock);
1267 1271                          return (DUP_ERROR);
1268 1272                  }
1269 1273          }
1270 1274          dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len;
1271 1275          bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf, dr->dr_addr.len);
1272 1276          if (dr->dr_resp.maxlen < size) {
1273 1277                  if (dr->dr_resp.buf != NULL)
1274 1278                          kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen);
1275 1279                  dr->dr_resp.maxlen = (unsigned int)size;
1276 1280                  dr->dr_resp.buf = kmem_alloc(size, KM_NOSLEEP);
1277 1281                  if (dr->dr_resp.buf == NULL) {
1278 1282                          dr->dr_resp.maxlen = 0;
1279 1283                          dr->dr_status = DUP_DROP;
1280 1284                          mutex_exit(&rdmadupreq_lock);
1281 1285                          return (DUP_ERROR);
1282 1286                  }
1283 1287          }
1284 1288          dr->dr_status = DUP_INPROGRESS;
1285 1289  
1286 1290          drhash = (uint32_t)DRHASH(dr);
1287 1291          dr->dr_chain = rdmadrhashtbl[drhash];
1288 1292          rdmadrhashtbl[drhash] = dr;
1289 1293          rdmadrhashstat[drhash]++;
1290 1294          mutex_exit(&rdmadupreq_lock);
1291 1295          *drpp = dr;
1292 1296          return (DUP_NEW);
1293 1297  }
1294 1298  
1295 1299  /*
1296 1300   * svc_rdma_kdupdone marks the request done (DUP_DONE or DUP_DROP)
1297 1301   * and stores the response.
1298 1302   */
1299 1303  static void
1300 1304  svc_rdma_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(),
1301 1305      int size, int status)
1302 1306  {
1303 1307          ASSERT(dr->dr_resfree == NULL);
1304 1308          if (status == DUP_DONE) {
1305 1309                  bcopy(res, dr->dr_resp.buf, size);
1306 1310                  dr->dr_resfree = dis_resfree;
1307 1311          }
1308 1312          dr->dr_status = status;
1309 1313  }
1310 1314  
1311 1315  /*
1312 1316   * This routine expects that the mutex, rdmadupreq_lock, is already held.
1313 1317   */
1314 1318  static void
1315 1319  unhash(struct dupreq *dr)
1316 1320  {
1317 1321          struct dupreq *drt;
1318 1322          struct dupreq *drtprev = NULL;
1319 1323          uint32_t drhash;
1320 1324  
1321 1325          ASSERT(MUTEX_HELD(&rdmadupreq_lock));
1322 1326  
1323 1327          drhash = (uint32_t)DRHASH(dr);
1324 1328          drt = rdmadrhashtbl[drhash];
1325 1329          while (drt != NULL) {
1326 1330                  if (drt == dr) {
1327 1331                          rdmadrhashstat[drhash]--;
1328 1332                          if (drtprev == NULL) {
1329 1333                                  rdmadrhashtbl[drhash] = drt->dr_chain;
1330 1334                          } else {
1331 1335                                  drtprev->dr_chain = drt->dr_chain;
1332 1336                          }
1333 1337                          return;
1334 1338                  }
1335 1339                  drtprev = drt;
1336 1340                  drt = drt->dr_chain;
1337 1341          }
1338 1342  }
1339 1343  
1340 1344  bool_t
1341 1345  rdma_get_wchunk(struct svc_req *req, iovec_t *iov, struct clist *wlist)
1342 1346  {
1343 1347          struct clist    *clist;
1344 1348          uint32_t        tlen;
1345 1349  
1346 1350          if (req->rq_xprt->xp_type != T_RDMA) {
1347 1351                  return (FALSE);
1348 1352          }
1349 1353  
1350 1354          tlen = 0;
1351 1355          clist = wlist;
1352 1356          while (clist) {
1353 1357                  tlen += clist->c_len;
1354 1358                  clist = clist->c_next;
1355 1359          }
1356 1360  
1357 1361          /*
1358 1362           * set iov to addr+len of first segment of first wchunk of
1359 1363           * wlist sent by client.  krecv() already malloc'd a buffer
1360 1364           * large enough, but registration is deferred until we write
1361 1365           * the buffer back to (NFS) client using RDMA_WRITE.
1362 1366           */
1363 1367          iov->iov_base = (caddr_t)(uintptr_t)wlist->w.c_saddr;
1364 1368          iov->iov_len = tlen;
1365 1369  
1366 1370          return (TRUE);
1367 1371  }
1368 1372  
1369 1373  /*
1370 1374   * routine to setup the read chunk lists
1371 1375   */
1372 1376  
1373 1377  int
1374 1378  rdma_setup_read_chunks(struct clist *wcl, uint32_t count, int *wcl_len)
1375 1379  {
1376 1380          int             data_len, avail_len;
1377 1381          uint_t          round_len;
1378 1382  
1379 1383          data_len = avail_len = 0;
1380 1384  
1381 1385          while (wcl != NULL && count > 0) {
1382 1386                  if (wcl->c_dmemhandle.mrc_rmr == 0)
1383 1387                          break;
1384 1388  
1385 1389                  if (wcl->c_len < count) {
1386 1390                          data_len += wcl->c_len;
1387 1391                          avail_len = 0;
1388 1392                  } else {
1389 1393                          data_len += count;
1390 1394                          avail_len = wcl->c_len - count;
1391 1395                          wcl->c_len = count;
1392 1396                  }
1393 1397                  count -= wcl->c_len;
1394 1398  
1395 1399                  if (count == 0)
1396 1400                          break;
1397 1401  
1398 1402                  wcl = wcl->c_next;
1399 1403          }
1400 1404  
1401 1405          /*
1402 1406           * MUST fail if there are still more data
1403 1407           */
1404 1408          if (count > 0) {
1405 1409                  DTRACE_PROBE2(krpc__e__rdma_setup_read_chunks_clist_len,
1406 1410                      int, data_len, int, count);
1407 1411                  return (FALSE);
1408 1412          }
1409 1413  
1410 1414          /*
1411 1415           * Round up the last chunk to 4-byte boundary
1412 1416           */
1413 1417          *wcl_len = roundup(data_len, BYTES_PER_XDR_UNIT);
1414 1418          round_len = *wcl_len - data_len;
1415 1419  
1416 1420          if (round_len) {
1417 1421  
1418 1422                  /*
1419 1423                   * If there is space in the current chunk,
1420 1424                   * add the roundup to the chunk.
1421 1425                   */
1422 1426                  if (avail_len >= round_len) {
1423 1427                          wcl->c_len += round_len;
1424 1428                  } else  {
1425 1429                          /*
1426 1430                           * try the next one.
1427 1431                           */
1428 1432                          wcl = wcl->c_next;
1429 1433                          if ((wcl == NULL) || (wcl->c_len < round_len)) {
1430 1434                                  DTRACE_PROBE1(
1431 1435                                      krpc__e__rdma_setup_read_chunks_rndup,
1432 1436                                      int, round_len);
1433 1437                                  return (FALSE);
1434 1438                          }
1435 1439                          wcl->c_len = round_len;
1436 1440                  }
1437 1441          }
1438 1442  
1439 1443          wcl = wcl->c_next;
1440 1444  
1441 1445          /*
1442 1446           * Make rest of the chunks 0-len
1443 1447           */
1444 1448  
1445 1449          clist_zero_len(wcl);
1446 1450  
1447 1451          return (TRUE);
1448 1452  }
  
    | 
      ↓ open down ↓ | 
    1287 lines elided | 
    
      ↑ open up ↑ | 
  
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX