Print this page
    
7651 default maximum nfs server threads is insufficient
    
      
        | Split | 
	Close | 
      
      | Expand all | 
      | Collapse all | 
    
    
          --- old/usr/src/uts/common/rpc/svc_rdma.c
          +++ new/usr/src/uts/common/rpc/svc_rdma.c
   1    1  /*
   2    2   * CDDL HEADER START
   3    3   *
   4    4   * The contents of this file are subject to the terms of the
   5    5   * Common Development and Distribution License (the "License").
   6    6   * You may not use this file except in compliance with the License.
   7    7   *
   8    8   * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9    9   * or http://www.opensolaris.org/os/licensing.
  10   10   * See the License for the specific language governing permissions
  11   11   * and limitations under the License.
  12   12   *
  
    | 
      ↓ open down ↓ | 
    12 lines elided | 
    
      ↑ open up ↑ | 
  
  13   13   * When distributing Covered Code, include this CDDL HEADER in each
  14   14   * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15   15   * If applicable, add the following below this CDDL HEADER, with the
  16   16   * fields enclosed by brackets "[]" replaced with your own identifying
  17   17   * information: Portions Copyright [yyyy] [name of copyright owner]
  18   18   *
  19   19   * CDDL HEADER END
  20   20   */
  21   21  /*
  22   22   * Copyright (c) 1983, 2010, Oracle and/or its affiliates. All rights reserved.
       23 + * Copyright (c) 2012 by Delphix. All rights reserved.
  23   24   * Copyright 2013 Nexenta Systems, Inc.  All rights reserved.
  24   25   */
  25   26  /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
  26   27  /* All Rights Reserved */
  27   28  /*
  28   29   * Portions of this source code were derived from Berkeley
  29   30   * 4.3 BSD under license from the Regents of the University of
  30   31   * California.
  31   32   */
  32   33  
  33   34  /*
  34   35   * Server side of RPC over RDMA in the kernel.
  35   36   */
  36   37  
  37   38  #include <sys/param.h>
  38   39  #include <sys/types.h>
  39   40  #include <sys/user.h>
  40   41  #include <sys/sysmacros.h>
  41   42  #include <sys/proc.h>
  42   43  #include <sys/file.h>
  43   44  #include <sys/errno.h>
  44   45  #include <sys/kmem.h>
  45   46  #include <sys/debug.h>
  46   47  #include <sys/systm.h>
  47   48  #include <sys/cmn_err.h>
  48   49  #include <sys/kstat.h>
  49   50  #include <sys/vtrace.h>
  50   51  #include <sys/debug.h>
  51   52  
  52   53  #include <rpc/types.h>
  53   54  #include <rpc/xdr.h>
  54   55  #include <rpc/auth.h>
  55   56  #include <rpc/clnt.h>
  56   57  #include <rpc/rpc_msg.h>
  57   58  #include <rpc/svc.h>
  58   59  #include <rpc/rpc_rdma.h>
  59   60  #include <sys/ddi.h>
  60   61  #include <sys/sunddi.h>
  61   62  
  62   63  #include <inet/common.h>
  63   64  #include <inet/ip.h>
  64   65  #include <inet/ip6.h>
  65   66  
  66   67  #include <nfs/nfs.h>
  67   68  #include <sys/sdt.h>
  68   69  
  69   70  #define SVC_RDMA_SUCCESS 0
  70   71  #define SVC_RDMA_FAIL -1
  71   72  
  72   73  #define SVC_CREDIT_FACTOR (0.5)
  73   74  
  74   75  #define MSG_IS_RPCSEC_GSS(msg)          \
  75   76          ((msg)->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS)
  76   77  
  77   78  
  78   79  uint32_t rdma_bufs_granted = RDMA_BUFS_GRANT;
  79   80  
  80   81  /*
  81   82   * RDMA transport specific data associated with SVCMASTERXPRT
  82   83   */
  83   84  struct rdma_data {
  84   85          SVCMASTERXPRT   *rd_xprt;       /* back ptr to SVCMASTERXPRT */
  85   86          struct rdma_svc_data rd_data;   /* rdma data */
  86   87          rdma_mod_t      *r_mod;         /* RDMA module containing ops ptr */
  87   88  };
  88   89  
  89   90  /*
  90   91   * Plugin connection specific data stashed away in clone SVCXPRT
  91   92   */
  92   93  struct clone_rdma_data {
  93   94          bool_t          cloned;         /* xprt cloned for thread processing */
  94   95          CONN            *conn;          /* RDMA connection */
  95   96          rdma_buf_t      rpcbuf;         /* RPC req/resp buffer */
  96   97          struct clist    *cl_reply;      /* reply chunk buffer info */
  97   98          struct clist    *cl_wlist;              /* write list clist */
  98   99  };
  99  100  
 100  101  
 101  102  #define MAXADDRLEN      128     /* max length for address mask */
 102  103  
 103  104  /*
 104  105   * Routines exported through ops vector.
 105  106   */
 106  107  static bool_t           svc_rdma_krecv(SVCXPRT *, mblk_t *, struct rpc_msg *);
 107  108  static bool_t           svc_rdma_ksend(SVCXPRT *, struct rpc_msg *);
 108  109  static bool_t           svc_rdma_kgetargs(SVCXPRT *, xdrproc_t, caddr_t);
 109  110  static bool_t           svc_rdma_kfreeargs(SVCXPRT *, xdrproc_t, caddr_t);
 110  111  void                    svc_rdma_kdestroy(SVCMASTERXPRT *);
 111  112  static int              svc_rdma_kdup(struct svc_req *, caddr_t, int,
 112  113                                  struct dupreq **, bool_t *);
 113  114  static void             svc_rdma_kdupdone(struct dupreq *, caddr_t,
 114  115                                  void (*)(), int, int);
 115  116  static int32_t          *svc_rdma_kgetres(SVCXPRT *, int);
 116  117  static void             svc_rdma_kfreeres(SVCXPRT *);
 117  118  static void             svc_rdma_kclone_destroy(SVCXPRT *);
 118  119  static void             svc_rdma_kstart(SVCMASTERXPRT *);
 119  120  void                    svc_rdma_kstop(SVCMASTERXPRT *);
 120  121  static void             svc_rdma_kclone_xprt(SVCXPRT *, SVCXPRT *);
 121  122  static void             svc_rdma_ktattrs(SVCXPRT *, int, void **);
 122  123  
 123  124  static int      svc_process_long_reply(SVCXPRT *, xdrproc_t,
 124  125                          caddr_t, struct rpc_msg *, bool_t, int *,
 125  126                          int *, int *, unsigned int *);
 126  127  
 127  128  static int      svc_compose_rpcmsg(SVCXPRT *, CONN *, xdrproc_t,
 128  129                          caddr_t, rdma_buf_t *, XDR **, struct rpc_msg *,
 129  130                          bool_t, uint_t *);
 130  131  static bool_t rpcmsg_length(xdrproc_t,
 131  132                  caddr_t,
 132  133                  struct rpc_msg *, bool_t, int);
 133  134  
 134  135  /*
 135  136   * Server transport operations vector.
 136  137   */
 137  138  struct svc_ops rdma_svc_ops = {
 138  139          svc_rdma_krecv,         /* Get requests */
 139  140          svc_rdma_kgetargs,      /* Deserialize arguments */
 140  141          svc_rdma_ksend,         /* Send reply */
 141  142          svc_rdma_kfreeargs,     /* Free argument data space */
 142  143          svc_rdma_kdestroy,      /* Destroy transport handle */
 143  144          svc_rdma_kdup,          /* Check entry in dup req cache */
 144  145          svc_rdma_kdupdone,      /* Mark entry in dup req cache as done */
 145  146          svc_rdma_kgetres,       /* Get pointer to response buffer */
 146  147          svc_rdma_kfreeres,      /* Destroy pre-serialized response header */
 147  148          svc_rdma_kclone_destroy,        /* Destroy a clone xprt */
 148  149          svc_rdma_kstart,        /* Tell `ready-to-receive' to rpcmod */
 149  150          svc_rdma_kclone_xprt,   /* Transport specific clone xprt */
 150  151          svc_rdma_ktattrs        /* Get Transport Attributes */
 151  152  };
 152  153  
 153  154  /*
 154  155   * Server statistics
 155  156   * NOTE: This structure type is duplicated in the NFS fast path.
 156  157   */
 157  158  struct {
 158  159          kstat_named_t   rscalls;
 159  160          kstat_named_t   rsbadcalls;
 160  161          kstat_named_t   rsnullrecv;
 161  162          kstat_named_t   rsbadlen;
 162  163          kstat_named_t   rsxdrcall;
 163  164          kstat_named_t   rsdupchecks;
 164  165          kstat_named_t   rsdupreqs;
 165  166          kstat_named_t   rslongrpcs;
 166  167          kstat_named_t   rstotalreplies;
 167  168          kstat_named_t   rstotallongreplies;
 168  169          kstat_named_t   rstotalinlinereplies;
 169  170  } rdmarsstat = {
 170  171          { "calls",      KSTAT_DATA_UINT64 },
 171  172          { "badcalls",   KSTAT_DATA_UINT64 },
 172  173          { "nullrecv",   KSTAT_DATA_UINT64 },
 173  174          { "badlen",     KSTAT_DATA_UINT64 },
 174  175          { "xdrcall",    KSTAT_DATA_UINT64 },
 175  176          { "dupchecks",  KSTAT_DATA_UINT64 },
 176  177          { "dupreqs",    KSTAT_DATA_UINT64 },
 177  178          { "longrpcs",   KSTAT_DATA_UINT64 },
 178  179          { "totalreplies",       KSTAT_DATA_UINT64 },
 179  180          { "totallongreplies",   KSTAT_DATA_UINT64 },
 180  181          { "totalinlinereplies", KSTAT_DATA_UINT64 },
 181  182  };
 182  183  
 183  184  kstat_named_t *rdmarsstat_ptr = (kstat_named_t *)&rdmarsstat;
 184  185  uint_t rdmarsstat_ndata = sizeof (rdmarsstat) / sizeof (kstat_named_t);
 185  186  
 186  187  #define RSSTAT_INCR(x)  atomic_inc_64(&rdmarsstat.x.value.ui64)
 187  188  /*
 188  189   * Create a transport record.
 189  190   * The transport record, output buffer, and private data structure
 190  191   * are allocated.  The output buffer is serialized into using xdrmem.
 191  192   * There is one transport record per user process which implements a
 192  193   * set of services.
 193  194   */
 194  195  /* ARGSUSED */
 195  196  int
 196  197  svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id,
 197  198      rdma_xprt_group_t *started_xprts)
 198  199  {
 199  200          int error;
 200  201          SVCMASTERXPRT *xprt;
 201  202          struct rdma_data *rd;
 202  203          rdma_registry_t *rmod;
 203  204          rdma_xprt_record_t *xprt_rec;
 204  205          queue_t *q;
 205  206          /*
 206  207           * modload the RDMA plugins is not already done.
 207  208           */
 208  209          if (!rdma_modloaded) {
 209  210                  /*CONSTANTCONDITION*/
 210  211                  ASSERT(sizeof (struct clone_rdma_data) <= SVC_P2LEN);
 211  212  
 212  213                  mutex_enter(&rdma_modload_lock);
 213  214                  if (!rdma_modloaded) {
 214  215                          error = rdma_modload();
 215  216                  }
 216  217                  mutex_exit(&rdma_modload_lock);
 217  218  
 218  219                  if (error)
 219  220                          return (error);
 220  221          }
 221  222  
 222  223          /*
 223  224           * master_xprt_count is the count of master transport handles
 224  225           * that were successfully created and are ready to recieve for
 225  226           * RDMA based access.
 226  227           */
 227  228          error = 0;
 228  229          xprt_rec = NULL;
 229  230          rw_enter(&rdma_lock, RW_READER);
 230  231          if (rdma_mod_head == NULL) {
 231  232                  started_xprts->rtg_count = 0;
 232  233                  rw_exit(&rdma_lock);
 233  234                  if (rdma_dev_available)
 234  235                          return (EPROTONOSUPPORT);
 235  236                  else
 236  237                          return (ENODEV);
 237  238          }
 238  239  
 239  240          /*
 240  241           * If we have reached here, then atleast one RDMA plugin has loaded.
 241  242           * Create a master_xprt, make it start listenining on the device,
 242  243           * if an error is generated, record it, we might need to shut
 243  244           * the master_xprt.
 244  245           * SVC_START() calls svc_rdma_kstart which calls plugin binding
 245  246           * routines.
 246  247           */
 247  248          for (rmod = rdma_mod_head; rmod != NULL; rmod = rmod->r_next) {
 248  249  
 249  250                  /*
 250  251                   * One SVCMASTERXPRT per RDMA plugin.
 251  252                   */
 252  253                  xprt = kmem_zalloc(sizeof (*xprt), KM_SLEEP);
 253  254                  xprt->xp_ops = &rdma_svc_ops;
 254  255                  xprt->xp_sct = sct;
 255  256                  xprt->xp_type = T_RDMA;
 256  257                  mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL);
 257  258                  mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL);
 258  259                  xprt->xp_req_head = (mblk_t *)0;
 259  260                  xprt->xp_req_tail = (mblk_t *)0;
 260  261                  xprt->xp_full = FALSE;
 261  262                  xprt->xp_enable = FALSE;
 262  263                  xprt->xp_reqs = 0;
 263  264                  xprt->xp_size = 0;
 264  265                  xprt->xp_threads = 0;
 265  266                  xprt->xp_detached_threads = 0;
 266  267  
 267  268                  rd = kmem_zalloc(sizeof (*rd), KM_SLEEP);
 268  269                  xprt->xp_p2 = (caddr_t)rd;
 269  270                  rd->rd_xprt = xprt;
 270  271                  rd->r_mod = rmod->r_mod;
 271  272  
 272  273                  q = &rd->rd_data.q;
 273  274                  xprt->xp_wq = q;
 274  275                  q->q_ptr = &rd->rd_xprt;
 275  276                  xprt->xp_netid = NULL;
 276  277  
 277  278                  /*
 278  279                   * Each of the plugins will have their own Service ID
 279  280                   * to listener specific mapping, like port number for VI
 280  281                   * and service name for IB.
 281  282                   */
 282  283                  rd->rd_data.svcid = id;
 283  284                  error = svc_xprt_register(xprt, id);
 284  285                  if (error) {
 285  286                          DTRACE_PROBE(krpc__e__svcrdma__xprt__reg);
 286  287                          goto cleanup;
 287  288                  }
 288  289  
 289  290                  SVC_START(xprt);
 290  291                  if (!rd->rd_data.active) {
 291  292                          svc_xprt_unregister(xprt);
 292  293                          error = rd->rd_data.err_code;
 293  294                          goto cleanup;
 294  295                  }
 295  296  
 296  297                  /*
 297  298                   * This is set only when there is atleast one or more
 298  299                   * transports successfully created. We insert the pointer
 299  300                   * to the created RDMA master xprt into a separately maintained
 300  301                   * list. This way we can easily reference it later to cleanup,
 301  302                   * when NFS kRPC service pool is going away/unregistered.
 302  303                   */
 303  304                  started_xprts->rtg_count ++;
 304  305                  xprt_rec = kmem_alloc(sizeof (*xprt_rec), KM_SLEEP);
 305  306                  xprt_rec->rtr_xprt_ptr = xprt;
 306  307                  xprt_rec->rtr_next = started_xprts->rtg_listhead;
 307  308                  started_xprts->rtg_listhead = xprt_rec;
 308  309                  continue;
 309  310  cleanup:
 310  311                  SVC_DESTROY(xprt);
 311  312                  if (error == RDMA_FAILED)
 312  313                          error = EPROTONOSUPPORT;
 313  314          }
 314  315  
 315  316          rw_exit(&rdma_lock);
 316  317  
 317  318          /*
 318  319           * Don't return any error even if a single plugin was started
 319  320           * successfully.
 320  321           */
 321  322          if (started_xprts->rtg_count == 0)
 322  323                  return (error);
 323  324          return (0);
 324  325  }
 325  326  
 326  327  /*
 327  328   * Cleanup routine for freeing up memory allocated by
 328  329   * svc_rdma_kcreate()
 329  330   */
 330  331  void
 331  332  svc_rdma_kdestroy(SVCMASTERXPRT *xprt)
 332  333  {
 333  334          struct rdma_data *rd = (struct rdma_data *)xprt->xp_p2;
 334  335  
 335  336  
 336  337          mutex_destroy(&xprt->xp_req_lock);
 337  338          mutex_destroy(&xprt->xp_thread_lock);
 338  339          kmem_free(rd, sizeof (*rd));
 339  340          kmem_free(xprt, sizeof (*xprt));
 340  341  }
 341  342  
 342  343  
 343  344  static void
 344  345  svc_rdma_kstart(SVCMASTERXPRT *xprt)
 345  346  {
 346  347          struct rdma_svc_data *svcdata;
 347  348          rdma_mod_t *rmod;
 348  349  
 349  350          svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data;
 350  351          rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
 351  352  
 352  353          /*
 353  354           * Create a listener for  module at this port
 354  355           */
 355  356  
 356  357          if (rmod->rdma_count != 0)
 357  358                  (*rmod->rdma_ops->rdma_svc_listen)(svcdata);
 358  359          else
 359  360                  svcdata->err_code = RDMA_FAILED;
 360  361  }
 361  362  
 362  363  void
 363  364  svc_rdma_kstop(SVCMASTERXPRT *xprt)
 364  365  {
 365  366          struct rdma_svc_data *svcdata;
 366  367          rdma_mod_t *rmod;
 367  368  
 368  369          svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data;
 369  370          rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
 370  371  
 371  372          /*
 372  373           * Call the stop listener routine for each plugin. If rdma_count is
 373  374           * already zero set active to zero.
 374  375           */
 375  376          if (rmod->rdma_count != 0)
 376  377                  (*rmod->rdma_ops->rdma_svc_stop)(svcdata);
 377  378          else
 378  379                  svcdata->active = 0;
 379  380          if (svcdata->active)
 380  381                  DTRACE_PROBE(krpc__e__svcrdma__kstop);
 381  382  }
 382  383  
 383  384  /* ARGSUSED */
 384  385  static void
 385  386  svc_rdma_kclone_destroy(SVCXPRT *clone_xprt)
 386  387  {
 387  388  
 388  389          struct clone_rdma_data *cdrp;
 389  390          cdrp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 390  391  
 391  392          /*
 392  393           * Only free buffers and release connection when cloned is set.
 393  394           */
 394  395          if (cdrp->cloned != TRUE)
 395  396                  return;
 396  397  
 397  398          rdma_buf_free(cdrp->conn, &cdrp->rpcbuf);
 398  399          if (cdrp->cl_reply) {
 399  400                  clist_free(cdrp->cl_reply);
 400  401                  cdrp->cl_reply = NULL;
 401  402          }
 402  403          RDMA_REL_CONN(cdrp->conn);
 403  404  
 404  405          cdrp->cloned = 0;
 405  406  }
 406  407  
 407  408  /*
 408  409   * Clone the xprt specific information.  It will be freed by
 409  410   * SVC_CLONE_DESTROY.
 410  411   */
 411  412  static void
 412  413  svc_rdma_kclone_xprt(SVCXPRT *src_xprt, SVCXPRT *dst_xprt)
 413  414  {
 414  415          struct clone_rdma_data *srcp2;
 415  416          struct clone_rdma_data *dstp2;
 416  417  
 417  418          srcp2 = (struct clone_rdma_data *)src_xprt->xp_p2buf;
 418  419          dstp2 = (struct clone_rdma_data *)dst_xprt->xp_p2buf;
 419  420  
 420  421          if (srcp2->conn != NULL) {
 421  422                  srcp2->cloned = TRUE;
 422  423                  *dstp2 = *srcp2;
 423  424          }
 424  425  }
 425  426  
 426  427  static void
 427  428  svc_rdma_ktattrs(SVCXPRT *clone_xprt, int attrflag, void **tattr)
 428  429  {
 429  430          CONN    *conn;
 430  431          *tattr = NULL;
 431  432  
 432  433          switch (attrflag) {
 433  434          case SVC_TATTR_ADDRMASK:
 434  435                  conn = ((struct clone_rdma_data *)clone_xprt->xp_p2buf)->conn;
 435  436                  ASSERT(conn != NULL);
 436  437                  if (conn)
 437  438                          *tattr = (void *)&conn->c_addrmask;
 438  439          }
 439  440  }
 440  441  
 441  442  static bool_t
 442  443  svc_rdma_krecv(SVCXPRT *clone_xprt, mblk_t *mp, struct rpc_msg *msg)
 443  444  {
 444  445          XDR     *xdrs;
 445  446          CONN    *conn;
 446  447          rdma_recv_data_t        *rdp = (rdma_recv_data_t *)mp->b_rptr;
 447  448          struct clone_rdma_data *crdp;
 448  449          struct clist    *cl = NULL;
 449  450          struct clist    *wcl = NULL;
 450  451          struct clist    *cllong = NULL;
 451  452  
 452  453          rdma_stat       status;
 453  454          uint32_t vers, op, pos, xid;
 454  455          uint32_t rdma_credit;
 455  456          uint32_t wcl_total_length = 0;
 456  457          bool_t  wwl = FALSE;
 457  458  
 458  459          crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 459  460          RSSTAT_INCR(rscalls);
 460  461          conn = rdp->conn;
 461  462  
 462  463          status = rdma_svc_postrecv(conn);
 463  464          if (status != RDMA_SUCCESS) {
 464  465                  DTRACE_PROBE(krpc__e__svcrdma__krecv__postrecv);
 465  466                  goto badrpc_call;
 466  467          }
 467  468  
 468  469          xdrs = &clone_xprt->xp_xdrin;
 469  470          xdrmem_create(xdrs, rdp->rpcmsg.addr, rdp->rpcmsg.len, XDR_DECODE);
 470  471          xid = *(uint32_t *)rdp->rpcmsg.addr;
 471  472          XDR_SETPOS(xdrs, sizeof (uint32_t));
 472  473  
 473  474          if (! xdr_u_int(xdrs, &vers) ||
 474  475              ! xdr_u_int(xdrs, &rdma_credit) ||
 475  476              ! xdr_u_int(xdrs, &op)) {
 476  477                  DTRACE_PROBE(krpc__e__svcrdma__krecv__uint);
 477  478                  goto xdr_err;
 478  479          }
 479  480  
 480  481          /* Checking if the status of the recv operation was normal */
 481  482          if (rdp->status != 0) {
 482  483                  DTRACE_PROBE1(krpc__e__svcrdma__krecv__invalid__status,
 483  484                      int, rdp->status);
 484  485                  goto badrpc_call;
 485  486          }
 486  487  
 487  488          if (! xdr_do_clist(xdrs, &cl)) {
 488  489                  DTRACE_PROBE(krpc__e__svcrdma__krecv__do__clist);
 489  490                  goto xdr_err;
 490  491          }
 491  492  
 492  493          if (!xdr_decode_wlist_svc(xdrs, &wcl, &wwl, &wcl_total_length, conn)) {
 493  494                  DTRACE_PROBE(krpc__e__svcrdma__krecv__decode__wlist);
 494  495                  if (cl)
 495  496                          clist_free(cl);
 496  497                  goto xdr_err;
 497  498          }
 498  499          crdp->cl_wlist = wcl;
 499  500  
 500  501          crdp->cl_reply = NULL;
 501  502          (void) xdr_decode_reply_wchunk(xdrs, &crdp->cl_reply);
 502  503  
 503  504          /*
 504  505           * A chunk at 0 offset indicates that the RPC call message
 505  506           * is in a chunk. Get the RPC call message chunk.
 506  507           */
 507  508          if (cl != NULL && op == RDMA_NOMSG) {
 508  509  
 509  510                  /* Remove RPC call message chunk from chunklist */
 510  511                  cllong = cl;
 511  512                  cl = cl->c_next;
 512  513                  cllong->c_next = NULL;
 513  514  
 514  515  
 515  516                  /* Allocate and register memory for the RPC call msg chunk */
 516  517                  cllong->rb_longbuf.type = RDMA_LONG_BUFFER;
 517  518                  cllong->rb_longbuf.len = cllong->c_len > LONG_REPLY_LEN ?
 518  519                      cllong->c_len : LONG_REPLY_LEN;
 519  520  
 520  521                  if (rdma_buf_alloc(conn, &cllong->rb_longbuf)) {
 521  522                          clist_free(cllong);
 522  523                          goto cll_malloc_err;
 523  524                  }
 524  525  
 525  526                  cllong->u.c_daddr3 = cllong->rb_longbuf.addr;
 526  527  
 527  528                  if (cllong->u.c_daddr == NULL) {
 528  529                          DTRACE_PROBE(krpc__e__svcrdma__krecv__nomem);
 529  530                          rdma_buf_free(conn, &cllong->rb_longbuf);
 530  531                          clist_free(cllong);
 531  532                          goto cll_malloc_err;
 532  533                  }
 533  534  
 534  535                  status = clist_register(conn, cllong, CLIST_REG_DST);
 535  536                  if (status) {
 536  537                          DTRACE_PROBE(krpc__e__svcrdma__krecv__clist__reg);
 537  538                          rdma_buf_free(conn, &cllong->rb_longbuf);
 538  539                          clist_free(cllong);
 539  540                          goto cll_malloc_err;
 540  541                  }
 541  542  
 542  543                  /*
 543  544                   * Now read the RPC call message in
 544  545                   */
 545  546                  status = RDMA_READ(conn, cllong, WAIT);
 546  547                  if (status) {
 547  548                          DTRACE_PROBE(krpc__e__svcrdma__krecv__read);
 548  549                          (void) clist_deregister(conn, cllong);
 549  550                          rdma_buf_free(conn, &cllong->rb_longbuf);
 550  551                          clist_free(cllong);
 551  552                          goto cll_malloc_err;
 552  553                  }
 553  554  
 554  555                  status = clist_syncmem(conn, cllong, CLIST_REG_DST);
 555  556                  (void) clist_deregister(conn, cllong);
 556  557  
 557  558                  xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->u.c_daddr3,
 558  559                      cllong->c_len, 0, cl, XDR_DECODE, conn);
 559  560  
 560  561                  crdp->rpcbuf = cllong->rb_longbuf;
 561  562                  crdp->rpcbuf.len = cllong->c_len;
 562  563                  clist_free(cllong);
 563  564                  RDMA_BUF_FREE(conn, &rdp->rpcmsg);
 564  565          } else {
 565  566                  pos = XDR_GETPOS(xdrs);
 566  567                  xdrrdma_create(xdrs, rdp->rpcmsg.addr + pos,
 567  568                      rdp->rpcmsg.len - pos, 0, cl, XDR_DECODE, conn);
 568  569                  crdp->rpcbuf = rdp->rpcmsg;
 569  570  
 570  571                  /* Use xdrrdmablk_ops to indicate there is a read chunk list */
 571  572                  if (cl != NULL) {
 572  573                          int32_t flg = XDR_RDMA_RLIST_REG;
 573  574  
 574  575                          XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg);
 575  576                          xdrs->x_ops = &xdrrdmablk_ops;
 576  577                  }
 577  578          }
 578  579  
 579  580          if (crdp->cl_wlist) {
 580  581                  int32_t flg = XDR_RDMA_WLIST_REG;
 581  582  
 582  583                  XDR_CONTROL(xdrs, XDR_RDMA_SET_WLIST, crdp->cl_wlist);
 583  584                  XDR_CONTROL(xdrs, XDR_RDMA_SET_FLAGS, &flg);
 584  585          }
 585  586  
 586  587          if (! xdr_callmsg(xdrs, msg)) {
 587  588                  DTRACE_PROBE(krpc__e__svcrdma__krecv__callmsg);
 588  589                  RSSTAT_INCR(rsxdrcall);
 589  590                  goto callmsg_err;
 590  591          }
 591  592  
 592  593          /*
 593  594           * Point the remote transport address in the service_transport
 594  595           * handle at the address in the request.
 595  596           */
 596  597          clone_xprt->xp_rtaddr.buf = conn->c_raddr.buf;
 597  598          clone_xprt->xp_rtaddr.len = conn->c_raddr.len;
 598  599          clone_xprt->xp_rtaddr.maxlen = conn->c_raddr.len;
 599  600  
 600  601          clone_xprt->xp_lcladdr.buf = conn->c_laddr.buf;
 601  602          clone_xprt->xp_lcladdr.len = conn->c_laddr.len;
 602  603          clone_xprt->xp_lcladdr.maxlen = conn->c_laddr.len;
 603  604  
 604  605          /*
 605  606           * In case of RDMA, connection management is
 606  607           * entirely done in rpcib module and netid in the
 607  608           * SVCMASTERXPRT is NULL. Initialize the clone netid
 608  609           * from the connection.
 609  610           */
 610  611  
 611  612          clone_xprt->xp_netid = conn->c_netid;
 612  613  
 613  614          clone_xprt->xp_xid = xid;
 614  615          crdp->conn = conn;
 615  616  
 616  617          freeb(mp);
 617  618  
 618  619          return (TRUE);
 619  620  
 620  621  callmsg_err:
 621  622          rdma_buf_free(conn, &crdp->rpcbuf);
 622  623  
 623  624  cll_malloc_err:
 624  625          if (cl)
 625  626                  clist_free(cl);
 626  627  xdr_err:
 627  628          XDR_DESTROY(xdrs);
 628  629  
 629  630  badrpc_call:
 630  631          RDMA_BUF_FREE(conn, &rdp->rpcmsg);
 631  632          RDMA_REL_CONN(conn);
 632  633          freeb(mp);
 633  634          RSSTAT_INCR(rsbadcalls);
 634  635          return (FALSE);
 635  636  }
 636  637  
 637  638  static int
 638  639  svc_process_long_reply(SVCXPRT * clone_xprt,
 639  640      xdrproc_t xdr_results, caddr_t xdr_location,
 640  641      struct rpc_msg *msg, bool_t has_args, int *msglen,
 641  642      int *freelen, int *numchunks, unsigned int *final_len)
 642  643  {
 643  644          int status;
 644  645          XDR xdrslong;
 645  646          struct clist *wcl = NULL;
 646  647          int count = 0;
 647  648          int alloc_len;
 648  649          char  *memp;
 649  650          rdma_buf_t long_rpc = {0};
 650  651          struct clone_rdma_data *crdp;
 651  652  
 652  653          crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 653  654  
 654  655          bzero(&xdrslong, sizeof (xdrslong));
 655  656  
 656  657          /* Choose a size for the long rpc response */
 657  658          if (MSG_IS_RPCSEC_GSS(msg)) {
 658  659                  alloc_len = RNDUP(MAX_AUTH_BYTES + *msglen);
 659  660          } else {
 660  661                  alloc_len = RNDUP(*msglen);
 661  662          }
 662  663  
 663  664          if (alloc_len <= 64 * 1024) {
 664  665                  if (alloc_len > 32 * 1024) {
 665  666                          alloc_len = 64 * 1024;
 666  667                  } else {
 667  668                          if (alloc_len > 16 * 1024) {
 668  669                                  alloc_len = 32 * 1024;
 669  670                          } else {
 670  671                                  alloc_len = 16 * 1024;
 671  672                          }
 672  673                  }
 673  674          }
 674  675  
 675  676          long_rpc.type = RDMA_LONG_BUFFER;
 676  677          long_rpc.len = alloc_len;
 677  678          if (rdma_buf_alloc(crdp->conn, &long_rpc)) {
 678  679                  return (SVC_RDMA_FAIL);
 679  680          }
 680  681  
 681  682          memp = long_rpc.addr;
 682  683          xdrmem_create(&xdrslong, memp, alloc_len, XDR_ENCODE);
 683  684  
 684  685          msg->rm_xid = clone_xprt->xp_xid;
 685  686  
 686  687          if (!(xdr_replymsg(&xdrslong, msg) &&
 687  688              (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, &xdrslong,
 688  689              xdr_results, xdr_location)))) {
 689  690                  rdma_buf_free(crdp->conn, &long_rpc);
 690  691                  DTRACE_PROBE(krpc__e__svcrdma__longrep__authwrap);
 691  692                  return (SVC_RDMA_FAIL);
 692  693          }
 693  694  
 694  695          *final_len = XDR_GETPOS(&xdrslong);
 695  696  
 696  697          DTRACE_PROBE1(krpc__i__replylen, uint_t, *final_len);
 697  698          *numchunks = 0;
 698  699          *freelen = 0;
 699  700  
 700  701          wcl = crdp->cl_reply;
 701  702          wcl->rb_longbuf = long_rpc;
 702  703  
 703  704          count = *final_len;
 704  705          while ((wcl != NULL) && (count > 0)) {
 705  706  
 706  707                  if (wcl->c_dmemhandle.mrc_rmr == 0)
 707  708                          break;
 708  709  
 709  710                  DTRACE_PROBE2(krpc__i__write__chunks, uint32_t, count,
 710  711                      uint32_t, wcl->c_len);
 711  712  
 712  713                  if (wcl->c_len > count) {
 713  714                          wcl->c_len = count;
 714  715                  }
 715  716                  wcl->w.c_saddr3 = (caddr_t)memp;
 716  717  
 717  718                  count -= wcl->c_len;
 718  719                  *numchunks +=  1;
 719  720                  memp += wcl->c_len;
 720  721                  wcl = wcl->c_next;
 721  722          }
 722  723  
 723  724          /*
 724  725           * Make rest of the chunks 0-len
 725  726           */
 726  727          while (wcl != NULL) {
 727  728                  if (wcl->c_dmemhandle.mrc_rmr == 0)
 728  729                          break;
 729  730                  wcl->c_len = 0;
 730  731                  wcl = wcl->c_next;
 731  732          }
 732  733  
 733  734          wcl = crdp->cl_reply;
 734  735  
 735  736          /*
 736  737           * MUST fail if there are still more data
 737  738           */
 738  739          if (count > 0) {
 739  740                  rdma_buf_free(crdp->conn, &long_rpc);
 740  741                  DTRACE_PROBE(krpc__e__svcrdma__longrep__dlen__clist);
 741  742                  return (SVC_RDMA_FAIL);
 742  743          }
 743  744  
 744  745          if (clist_register(crdp->conn, wcl, CLIST_REG_SOURCE) != RDMA_SUCCESS) {
 745  746                  rdma_buf_free(crdp->conn, &long_rpc);
 746  747                  DTRACE_PROBE(krpc__e__svcrdma__longrep__clistreg);
 747  748                  return (SVC_RDMA_FAIL);
 748  749          }
 749  750  
 750  751          status = clist_syncmem(crdp->conn, wcl, CLIST_REG_SOURCE);
 751  752  
 752  753          if (status) {
 753  754                  (void) clist_deregister(crdp->conn, wcl);
 754  755                  rdma_buf_free(crdp->conn, &long_rpc);
 755  756                  DTRACE_PROBE(krpc__e__svcrdma__longrep__syncmem);
 756  757                  return (SVC_RDMA_FAIL);
 757  758          }
 758  759  
 759  760          status = RDMA_WRITE(crdp->conn, wcl, WAIT);
 760  761  
 761  762          (void) clist_deregister(crdp->conn, wcl);
 762  763          rdma_buf_free(crdp->conn, &wcl->rb_longbuf);
 763  764  
 764  765          if (status != RDMA_SUCCESS) {
 765  766                  DTRACE_PROBE(krpc__e__svcrdma__longrep__write);
 766  767                  return (SVC_RDMA_FAIL);
 767  768          }
 768  769  
 769  770          return (SVC_RDMA_SUCCESS);
 770  771  }
 771  772  
 772  773  
 773  774  static int
 774  775  svc_compose_rpcmsg(SVCXPRT * clone_xprt, CONN * conn, xdrproc_t xdr_results,
 775  776      caddr_t xdr_location, rdma_buf_t *rpcreply, XDR ** xdrs,
 776  777      struct rpc_msg *msg, bool_t has_args, uint_t *len)
 777  778  {
 778  779          /*
 779  780           * Get a pre-allocated buffer for rpc reply
 780  781           */
 781  782          rpcreply->type = SEND_BUFFER;
 782  783          if (rdma_buf_alloc(conn, rpcreply)) {
 783  784                  DTRACE_PROBE(krpc__e__svcrdma__rpcmsg__reply__nofreebufs);
 784  785                  return (SVC_RDMA_FAIL);
 785  786          }
 786  787  
 787  788          xdrrdma_create(*xdrs, rpcreply->addr, rpcreply->len,
 788  789              0, NULL, XDR_ENCODE, conn);
 789  790  
 790  791          msg->rm_xid = clone_xprt->xp_xid;
 791  792  
 792  793          if (has_args) {
 793  794                  if (!(xdr_replymsg(*xdrs, msg) &&
 794  795                      (!has_args ||
 795  796                      SVCAUTH_WRAP(&clone_xprt->xp_auth, *xdrs,
 796  797                      xdr_results, xdr_location)))) {
 797  798                          rdma_buf_free(conn, rpcreply);
 798  799                          DTRACE_PROBE(
 799  800                              krpc__e__svcrdma__rpcmsg__reply__authwrap1);
 800  801                          return (SVC_RDMA_FAIL);
 801  802                  }
 802  803          } else {
 803  804                  if (!xdr_replymsg(*xdrs, msg)) {
 804  805                          rdma_buf_free(conn, rpcreply);
 805  806                          DTRACE_PROBE(
 806  807                              krpc__e__svcrdma__rpcmsg__reply__authwrap2);
 807  808                          return (SVC_RDMA_FAIL);
 808  809                  }
 809  810          }
 810  811  
 811  812          *len = XDR_GETPOS(*xdrs);
 812  813  
 813  814          return (SVC_RDMA_SUCCESS);
 814  815  }
 815  816  
 816  817  /*
 817  818   * Send rpc reply.
 818  819   */
 819  820  static bool_t
 820  821  svc_rdma_ksend(SVCXPRT * clone_xprt, struct rpc_msg *msg)
 821  822  {
 822  823          XDR *xdrs_rpc = &(clone_xprt->xp_xdrout);
 823  824          XDR xdrs_rhdr;
 824  825          CONN *conn = NULL;
 825  826          rdma_buf_t rbuf_resp = {0}, rbuf_rpc_resp = {0};
 826  827  
 827  828          struct clone_rdma_data *crdp;
 828  829          struct clist *cl_read = NULL;
 829  830          struct clist *cl_send = NULL;
 830  831          struct clist *cl_write = NULL;
 831  832          xdrproc_t xdr_results;          /* results XDR encoding function */
 832  833          caddr_t xdr_location;           /* response results pointer */
 833  834  
 834  835          int retval = FALSE;
 835  836          int status, msglen, num_wreply_segments = 0;
 836  837          uint32_t rdma_credit = 0;
 837  838          int freelen = 0;
 838  839          bool_t has_args;
 839  840          uint_t  final_resp_len, rdma_response_op, vers;
 840  841  
 841  842          bzero(&xdrs_rhdr, sizeof (XDR));
 842  843          crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
 843  844          conn = crdp->conn;
 844  845  
 845  846          /*
 846  847           * If there is a result procedure specified in the reply message,
 847  848           * it will be processed in the xdr_replymsg and SVCAUTH_WRAP.
 848  849           * We need to make sure it won't be processed twice, so we null
 849  850           * it for xdr_replymsg here.
 850  851           */
 851  852          has_args = FALSE;
 852  853          if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
 853  854              msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
 854  855                  if ((xdr_results = msg->acpted_rply.ar_results.proc) != NULL) {
 855  856                          has_args = TRUE;
 856  857                          xdr_location = msg->acpted_rply.ar_results.where;
 857  858                          msg->acpted_rply.ar_results.proc = xdr_void;
 858  859                          msg->acpted_rply.ar_results.where = NULL;
 859  860                  }
 860  861          }
 861  862  
 862  863          /*
 863  864           * Given the limit on the inline response size (RPC_MSG_SZ),
 864  865           * there is a need to make a guess as to the overall size of
 865  866           * the response.  If the resultant size is beyond the inline
 866  867           * size, then the server needs to use the "reply chunk list"
 867  868           * provided by the client (if the client provided one).  An
 868  869           * example of this type of response would be a READDIR
 869  870           * response (e.g. a small directory read would fit in RPC_MSG_SZ
 870  871           * and that is the preference but it may not fit)
 871  872           *
 872  873           * Combine the encoded size and the size of the true results
 873  874           * and then make the decision about where to encode and send results.
 874  875           *
 875  876           * One important note, this calculation is ignoring the size
 876  877           * of the encoding of the authentication overhead.  The reason
 877  878           * for this is rooted in the complexities of access to the
 878  879           * encoded size of RPCSEC_GSS related authentiation,
 879  880           * integrity, and privacy.
 880  881           *
 881  882           * If it turns out that the encoded authentication bumps the
 882  883           * response over the RPC_MSG_SZ limit, then it may need to
 883  884           * attempt to encode for the reply chunk list.
 884  885           */
 885  886  
 886  887          /*
 887  888           * Calculating the "sizeof" the RPC response header and the
 888  889           * encoded results.
 889  890           */
 890  891          msglen = xdr_sizeof(xdr_replymsg, msg);
 891  892  
 892  893          if (msglen > 0) {
 893  894                  RSSTAT_INCR(rstotalreplies);
 894  895          }
 895  896          if (has_args)
 896  897                  msglen += xdrrdma_sizeof(xdr_results, xdr_location,
 897  898                      rdma_minchunk, NULL, NULL);
 898  899  
 899  900          DTRACE_PROBE1(krpc__i__svcrdma__ksend__msglen, int, msglen);
 900  901  
 901  902          status = SVC_RDMA_SUCCESS;
 902  903  
 903  904          if (msglen < RPC_MSG_SZ) {
 904  905                  /*
 905  906                   * Looks like the response will fit in the inline
 906  907                   * response; let's try
 907  908                   */
 908  909                  RSSTAT_INCR(rstotalinlinereplies);
 909  910  
 910  911                  rdma_response_op = RDMA_MSG;
 911  912  
 912  913                  status = svc_compose_rpcmsg(clone_xprt, conn, xdr_results,
 913  914                      xdr_location, &rbuf_rpc_resp, &xdrs_rpc, msg,
 914  915                      has_args, &final_resp_len);
 915  916  
 916  917                  DTRACE_PROBE1(krpc__i__srdma__ksend__compose_status,
 917  918                      int, status);
 918  919                  DTRACE_PROBE1(krpc__i__srdma__ksend__compose_len,
 919  920                      int, final_resp_len);
 920  921  
 921  922                  if (status == SVC_RDMA_SUCCESS && crdp->cl_reply) {
 922  923                          clist_free(crdp->cl_reply);
 923  924                          crdp->cl_reply = NULL;
 924  925                  }
 925  926          }
 926  927  
 927  928          /*
 928  929           * If the encode failed (size?) or the message really is
 929  930           * larger than what is allowed, try the response chunk list.
 930  931           */
 931  932          if (status != SVC_RDMA_SUCCESS || msglen >= RPC_MSG_SZ) {
 932  933                  /*
 933  934                   * attempting to use a reply chunk list when there
 934  935                   * isn't one won't get very far...
 935  936                   */
 936  937                  if (crdp->cl_reply == NULL) {
 937  938                          DTRACE_PROBE(krpc__e__svcrdma__ksend__noreplycl);
 938  939                          goto out;
 939  940                  }
 940  941  
 941  942                  RSSTAT_INCR(rstotallongreplies);
 942  943  
 943  944                  msglen = xdr_sizeof(xdr_replymsg, msg);
 944  945                  msglen += xdrrdma_sizeof(xdr_results, xdr_location, 0,
 945  946                      NULL, NULL);
 946  947  
 947  948                  status = svc_process_long_reply(clone_xprt, xdr_results,
 948  949                      xdr_location, msg, has_args, &msglen, &freelen,
 949  950                      &num_wreply_segments, &final_resp_len);
 950  951  
 951  952                  DTRACE_PROBE1(krpc__i__svcrdma__ksend__longreplen,
 952  953                      int, final_resp_len);
 953  954  
 954  955                  if (status != SVC_RDMA_SUCCESS) {
 955  956                          DTRACE_PROBE(krpc__e__svcrdma__ksend__compose__failed);
 956  957                          goto out;
 957  958                  }
 958  959  
 959  960                  rdma_response_op = RDMA_NOMSG;
 960  961          }
 961  962  
 962  963          DTRACE_PROBE1(krpc__i__svcrdma__ksend__rdmamsg__len,
 963  964              int, final_resp_len);
 964  965  
 965  966          rbuf_resp.type = SEND_BUFFER;
 966  967          if (rdma_buf_alloc(conn, &rbuf_resp)) {
 967  968                  rdma_buf_free(conn, &rbuf_rpc_resp);
 968  969                  DTRACE_PROBE(krpc__e__svcrdma__ksend__nofreebufs);
 969  970                  goto out;
 970  971          }
 971  972  
 972  973          rdma_credit = rdma_bufs_granted;
 973  974  
 974  975          vers = RPCRDMA_VERS;
 975  976          xdrmem_create(&xdrs_rhdr, rbuf_resp.addr, rbuf_resp.len, XDR_ENCODE);
 976  977          (*(uint32_t *)rbuf_resp.addr) = msg->rm_xid;
 977  978          /* Skip xid and set the xdr position accordingly. */
 978  979          XDR_SETPOS(&xdrs_rhdr, sizeof (uint32_t));
 979  980          if (!xdr_u_int(&xdrs_rhdr, &vers) ||
 980  981              !xdr_u_int(&xdrs_rhdr, &rdma_credit) ||
 981  982              !xdr_u_int(&xdrs_rhdr, &rdma_response_op)) {
 982  983                  rdma_buf_free(conn, &rbuf_rpc_resp);
 983  984                  rdma_buf_free(conn, &rbuf_resp);
 984  985                  DTRACE_PROBE(krpc__e__svcrdma__ksend__uint);
 985  986                  goto out;
 986  987          }
 987  988  
 988  989          /*
 989  990           * Now XDR the read chunk list, actually always NULL
 990  991           */
 991  992          (void) xdr_encode_rlist_svc(&xdrs_rhdr, cl_read);
 992  993  
 993  994          /*
 994  995           * encode write list -- we already drove RDMA_WRITEs
 995  996           */
 996  997          cl_write = crdp->cl_wlist;
 997  998          if (!xdr_encode_wlist(&xdrs_rhdr, cl_write)) {
 998  999                  DTRACE_PROBE(krpc__e__svcrdma__ksend__enc__wlist);
 999 1000                  rdma_buf_free(conn, &rbuf_rpc_resp);
1000 1001                  rdma_buf_free(conn, &rbuf_resp);
1001 1002                  goto out;
1002 1003          }
1003 1004  
1004 1005          /*
1005 1006           * XDR encode the RDMA_REPLY write chunk
1006 1007           */
1007 1008          if (!xdr_encode_reply_wchunk(&xdrs_rhdr, crdp->cl_reply,
1008 1009              num_wreply_segments)) {
1009 1010                  rdma_buf_free(conn, &rbuf_rpc_resp);
1010 1011                  rdma_buf_free(conn, &rbuf_resp);
1011 1012                  goto out;
1012 1013          }
1013 1014  
1014 1015          clist_add(&cl_send, 0, XDR_GETPOS(&xdrs_rhdr), &rbuf_resp.handle,
1015 1016              rbuf_resp.addr, NULL, NULL);
1016 1017  
1017 1018          if (rdma_response_op == RDMA_MSG) {
1018 1019                  clist_add(&cl_send, 0, final_resp_len, &rbuf_rpc_resp.handle,
1019 1020                      rbuf_rpc_resp.addr, NULL, NULL);
1020 1021          }
1021 1022  
1022 1023          status = RDMA_SEND(conn, cl_send, msg->rm_xid);
1023 1024  
1024 1025          if (status == RDMA_SUCCESS) {
1025 1026                  retval = TRUE;
1026 1027          }
1027 1028  
1028 1029  out:
1029 1030          /*
1030 1031           * Free up sendlist chunks
1031 1032           */
1032 1033          if (cl_send != NULL)
1033 1034                  clist_free(cl_send);
1034 1035  
1035 1036          /*
1036 1037           * Destroy private data for xdr rdma
1037 1038           */
1038 1039          if (clone_xprt->xp_xdrout.x_ops != NULL) {
1039 1040                  XDR_DESTROY(&(clone_xprt->xp_xdrout));
1040 1041          }
1041 1042  
1042 1043          if (crdp->cl_reply) {
1043 1044                  clist_free(crdp->cl_reply);
1044 1045                  crdp->cl_reply = NULL;
1045 1046          }
1046 1047  
1047 1048          /*
1048 1049           * This is completely disgusting.  If public is set it is
1049 1050           * a pointer to a structure whose first field is the address
1050 1051           * of the function to free that structure and any related
1051 1052           * stuff.  (see rrokfree in nfs_xdr.c).
1052 1053           */
1053 1054          if (xdrs_rpc->x_public) {
1054 1055                  /* LINTED pointer alignment */
1055 1056                  (**((int (**)()) xdrs_rpc->x_public)) (xdrs_rpc->x_public);
1056 1057          }
1057 1058  
1058 1059          if (xdrs_rhdr.x_ops != NULL) {
1059 1060                  XDR_DESTROY(&xdrs_rhdr);
1060 1061          }
1061 1062  
1062 1063          return (retval);
1063 1064  }
1064 1065  
1065 1066  /*
1066 1067   * Deserialize arguments.
1067 1068   */
1068 1069  static bool_t
1069 1070  svc_rdma_kgetargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, caddr_t args_ptr)
1070 1071  {
1071 1072          if ((SVCAUTH_UNWRAP(&clone_xprt->xp_auth, &clone_xprt->xp_xdrin,
1072 1073              xdr_args, args_ptr)) != TRUE)
1073 1074                  return (FALSE);
1074 1075          return (TRUE);
1075 1076  }
1076 1077  
1077 1078  static bool_t
1078 1079  svc_rdma_kfreeargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args,
1079 1080      caddr_t args_ptr)
1080 1081  {
1081 1082          struct clone_rdma_data *crdp;
1082 1083          bool_t retval;
1083 1084  
1084 1085          /*
1085 1086           * If the cloned bit is true, then this transport specific
1086 1087           * rmda data has been duplicated into another cloned xprt. Do
1087 1088           * not free, or release the connection, it is still in use.  The
1088 1089           * buffers will be freed and the connection released later by
1089 1090           * SVC_CLONE_DESTROY().
1090 1091           */
1091 1092          crdp = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
1092 1093          if (crdp->cloned == TRUE) {
1093 1094                  crdp->cloned = 0;
1094 1095                  return (TRUE);
1095 1096          }
1096 1097  
1097 1098          /*
1098 1099           * Free the args if needed then XDR_DESTROY
1099 1100           */
1100 1101          if (args_ptr) {
1101 1102                  XDR     *xdrs = &clone_xprt->xp_xdrin;
1102 1103  
1103 1104                  xdrs->x_op = XDR_FREE;
1104 1105                  retval = (*xdr_args)(xdrs, args_ptr);
1105 1106          }
1106 1107  
1107 1108          XDR_DESTROY(&(clone_xprt->xp_xdrin));
1108 1109          rdma_buf_free(crdp->conn, &crdp->rpcbuf);
1109 1110          if (crdp->cl_reply) {
1110 1111                  clist_free(crdp->cl_reply);
1111 1112                  crdp->cl_reply = NULL;
1112 1113          }
1113 1114          RDMA_REL_CONN(crdp->conn);
1114 1115  
1115 1116          return (retval);
1116 1117  }
1117 1118  
1118 1119  /* ARGSUSED */
1119 1120  static int32_t *
1120 1121  svc_rdma_kgetres(SVCXPRT *clone_xprt, int size)
1121 1122  {
1122 1123          return (NULL);
1123 1124  }
1124 1125  
1125 1126  /* ARGSUSED */
1126 1127  static void
1127 1128  svc_rdma_kfreeres(SVCXPRT *clone_xprt)
1128 1129  {
1129 1130  }
1130 1131  
1131 1132  /*
  
    | 
      ↓ open down ↓ | 
    1099 lines elided | 
    
      ↑ open up ↑ | 
  
1132 1133   * the dup cacheing routines below provide a cache of non-failure
1133 1134   * transaction id's.  rpc service routines can use this to detect
1134 1135   * retransmissions and re-send a non-failure response.
1135 1136   */
1136 1137  
1137 1138  /*
1138 1139   * MAXDUPREQS is the number of cached items.  It should be adjusted
1139 1140   * to the service load so that there is likely to be a response entry
1140 1141   * when the first retransmission comes in.
1141 1142   */
1142      -#define MAXDUPREQS      1024
     1143 +#define MAXDUPREQS      8192
1143 1144  
1144 1145  /*
1145 1146   * This should be appropriately scaled to MAXDUPREQS.
1146 1147   */
1147      -#define DRHASHSZ        257
     1148 +#define DRHASHSZ        2053
1148 1149  
1149 1150  #if ((DRHASHSZ & (DRHASHSZ - 1)) == 0)
1150 1151  #define XIDHASH(xid)    ((xid) & (DRHASHSZ - 1))
1151 1152  #else
1152 1153  #define XIDHASH(xid)    ((xid) % DRHASHSZ)
1153 1154  #endif
1154 1155  #define DRHASH(dr)      XIDHASH((dr)->dr_xid)
1155 1156  #define REQTOXID(req)   ((req)->rq_xprt->xp_xid)
1156 1157  
1157 1158  static int      rdmandupreqs = 0;
1158 1159  int     rdmamaxdupreqs = MAXDUPREQS;
1159 1160  static kmutex_t rdmadupreq_lock;
1160 1161  static struct dupreq *rdmadrhashtbl[DRHASHSZ];
1161 1162  static int      rdmadrhashstat[DRHASHSZ];
1162 1163  
1163 1164  static void unhash(struct dupreq *);
1164 1165  
1165 1166  /*
1166 1167   * rdmadrmru points to the head of a circular linked list in lru order.
1167 1168   * rdmadrmru->dr_next == drlru
1168 1169   */
1169 1170  struct dupreq *rdmadrmru;
1170 1171  
1171 1172  /*
1172 1173   * svc_rdma_kdup searches the request cache and returns 0 if the
1173 1174   * request is not found in the cache.  If it is found, then it
1174 1175   * returns the state of the request (in progress or done) and
1175 1176   * the status or attributes that were part of the original reply.
1176 1177   */
1177 1178  static int
1178 1179  svc_rdma_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp,
1179 1180          bool_t *dupcachedp)
1180 1181  {
1181 1182          struct dupreq *dr;
1182 1183          uint32_t xid;
1183 1184          uint32_t drhash;
1184 1185          int status;
1185 1186  
1186 1187          xid = REQTOXID(req);
1187 1188          mutex_enter(&rdmadupreq_lock);
1188 1189          RSSTAT_INCR(rsdupchecks);
1189 1190          /*
1190 1191           * Check to see whether an entry already exists in the cache.
1191 1192           */
1192 1193          dr = rdmadrhashtbl[XIDHASH(xid)];
1193 1194          while (dr != NULL) {
1194 1195                  if (dr->dr_xid == xid &&
1195 1196                      dr->dr_proc == req->rq_proc &&
1196 1197                      dr->dr_prog == req->rq_prog &&
1197 1198                      dr->dr_vers == req->rq_vers &&
1198 1199                      dr->dr_addr.len == req->rq_xprt->xp_rtaddr.len &&
1199 1200                      bcmp((caddr_t)dr->dr_addr.buf,
1200 1201                      (caddr_t)req->rq_xprt->xp_rtaddr.buf,
1201 1202                      dr->dr_addr.len) == 0) {
1202 1203                          status = dr->dr_status;
1203 1204                          if (status == DUP_DONE) {
1204 1205                                  bcopy(dr->dr_resp.buf, res, size);
1205 1206                                  if (dupcachedp != NULL)
1206 1207                                          *dupcachedp = (dr->dr_resfree != NULL);
1207 1208                          } else {
1208 1209                                  dr->dr_status = DUP_INPROGRESS;
1209 1210                                  *drpp = dr;
1210 1211                          }
1211 1212                          RSSTAT_INCR(rsdupreqs);
1212 1213                          mutex_exit(&rdmadupreq_lock);
1213 1214                          return (status);
1214 1215                  }
1215 1216                  dr = dr->dr_chain;
1216 1217          }
1217 1218  
1218 1219          /*
1219 1220           * There wasn't an entry, either allocate a new one or recycle
1220 1221           * an old one.
1221 1222           */
1222 1223          if (rdmandupreqs < rdmamaxdupreqs) {
1223 1224                  dr = kmem_alloc(sizeof (*dr), KM_NOSLEEP);
1224 1225                  if (dr == NULL) {
1225 1226                          mutex_exit(&rdmadupreq_lock);
1226 1227                          return (DUP_ERROR);
1227 1228                  }
1228 1229                  dr->dr_resp.buf = NULL;
1229 1230                  dr->dr_resp.maxlen = 0;
1230 1231                  dr->dr_addr.buf = NULL;
1231 1232                  dr->dr_addr.maxlen = 0;
1232 1233                  if (rdmadrmru) {
1233 1234                          dr->dr_next = rdmadrmru->dr_next;
1234 1235                          rdmadrmru->dr_next = dr;
1235 1236                  } else {
1236 1237                          dr->dr_next = dr;
1237 1238                  }
1238 1239                  rdmandupreqs++;
1239 1240          } else {
1240 1241                  dr = rdmadrmru->dr_next;
1241 1242                  while (dr->dr_status == DUP_INPROGRESS) {
1242 1243                          dr = dr->dr_next;
1243 1244                          if (dr == rdmadrmru->dr_next) {
1244 1245                                  mutex_exit(&rdmadupreq_lock);
1245 1246                                  return (DUP_ERROR);
1246 1247                          }
1247 1248                  }
1248 1249                  unhash(dr);
1249 1250                  if (dr->dr_resfree) {
1250 1251                          (*dr->dr_resfree)(dr->dr_resp.buf);
1251 1252                  }
1252 1253          }
1253 1254          dr->dr_resfree = NULL;
1254 1255          rdmadrmru = dr;
1255 1256  
1256 1257          dr->dr_xid = REQTOXID(req);
1257 1258          dr->dr_prog = req->rq_prog;
1258 1259          dr->dr_vers = req->rq_vers;
1259 1260          dr->dr_proc = req->rq_proc;
1260 1261          if (dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) {
1261 1262                  if (dr->dr_addr.buf != NULL)
1262 1263                          kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen);
1263 1264                  dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len;
1264 1265                  dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP);
1265 1266                  if (dr->dr_addr.buf == NULL) {
1266 1267                          dr->dr_addr.maxlen = 0;
1267 1268                          dr->dr_status = DUP_DROP;
1268 1269                          mutex_exit(&rdmadupreq_lock);
1269 1270                          return (DUP_ERROR);
1270 1271                  }
1271 1272          }
1272 1273          dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len;
1273 1274          bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf, dr->dr_addr.len);
1274 1275          if (dr->dr_resp.maxlen < size) {
1275 1276                  if (dr->dr_resp.buf != NULL)
1276 1277                          kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen);
1277 1278                  dr->dr_resp.maxlen = (unsigned int)size;
1278 1279                  dr->dr_resp.buf = kmem_alloc(size, KM_NOSLEEP);
1279 1280                  if (dr->dr_resp.buf == NULL) {
1280 1281                          dr->dr_resp.maxlen = 0;
1281 1282                          dr->dr_status = DUP_DROP;
1282 1283                          mutex_exit(&rdmadupreq_lock);
1283 1284                          return (DUP_ERROR);
1284 1285                  }
1285 1286          }
1286 1287          dr->dr_status = DUP_INPROGRESS;
1287 1288  
1288 1289          drhash = (uint32_t)DRHASH(dr);
1289 1290          dr->dr_chain = rdmadrhashtbl[drhash];
1290 1291          rdmadrhashtbl[drhash] = dr;
1291 1292          rdmadrhashstat[drhash]++;
1292 1293          mutex_exit(&rdmadupreq_lock);
1293 1294          *drpp = dr;
1294 1295          return (DUP_NEW);
1295 1296  }
1296 1297  
1297 1298  /*
1298 1299   * svc_rdma_kdupdone marks the request done (DUP_DONE or DUP_DROP)
1299 1300   * and stores the response.
1300 1301   */
1301 1302  static void
1302 1303  svc_rdma_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(),
1303 1304          int size, int status)
1304 1305  {
1305 1306          ASSERT(dr->dr_resfree == NULL);
1306 1307          if (status == DUP_DONE) {
1307 1308                  bcopy(res, dr->dr_resp.buf, size);
1308 1309                  dr->dr_resfree = dis_resfree;
1309 1310          }
1310 1311          dr->dr_status = status;
1311 1312  }
1312 1313  
1313 1314  /*
1314 1315   * This routine expects that the mutex, rdmadupreq_lock, is already held.
1315 1316   */
1316 1317  static void
1317 1318  unhash(struct dupreq *dr)
1318 1319  {
1319 1320          struct dupreq *drt;
1320 1321          struct dupreq *drtprev = NULL;
1321 1322          uint32_t drhash;
1322 1323  
1323 1324          ASSERT(MUTEX_HELD(&rdmadupreq_lock));
1324 1325  
1325 1326          drhash = (uint32_t)DRHASH(dr);
1326 1327          drt = rdmadrhashtbl[drhash];
1327 1328          while (drt != NULL) {
1328 1329                  if (drt == dr) {
1329 1330                          rdmadrhashstat[drhash]--;
1330 1331                          if (drtprev == NULL) {
1331 1332                                  rdmadrhashtbl[drhash] = drt->dr_chain;
1332 1333                          } else {
1333 1334                                  drtprev->dr_chain = drt->dr_chain;
1334 1335                          }
1335 1336                          return;
1336 1337                  }
1337 1338                  drtprev = drt;
1338 1339                  drt = drt->dr_chain;
1339 1340          }
1340 1341  }
1341 1342  
1342 1343  bool_t
1343 1344  rdma_get_wchunk(struct svc_req *req, iovec_t *iov, struct clist *wlist)
1344 1345  {
1345 1346          struct clist    *clist;
1346 1347          uint32_t        tlen;
1347 1348  
1348 1349          if (req->rq_xprt->xp_type != T_RDMA) {
1349 1350                  return (FALSE);
1350 1351          }
1351 1352  
1352 1353          tlen = 0;
1353 1354          clist = wlist;
1354 1355          while (clist) {
1355 1356                  tlen += clist->c_len;
1356 1357                  clist = clist->c_next;
1357 1358          }
1358 1359  
1359 1360          /*
1360 1361           * set iov to addr+len of first segment of first wchunk of
1361 1362           * wlist sent by client.  krecv() already malloc'd a buffer
1362 1363           * large enough, but registration is deferred until we write
1363 1364           * the buffer back to (NFS) client using RDMA_WRITE.
1364 1365           */
1365 1366          iov->iov_base = (caddr_t)(uintptr_t)wlist->w.c_saddr;
1366 1367          iov->iov_len = tlen;
1367 1368  
1368 1369          return (TRUE);
1369 1370  }
1370 1371  
1371 1372  /*
1372 1373   * routine to setup the read chunk lists
1373 1374   */
1374 1375  
1375 1376  int
1376 1377  rdma_setup_read_chunks(struct clist *wcl, uint32_t count, int *wcl_len)
1377 1378  {
1378 1379          int             data_len, avail_len;
1379 1380          uint_t          round_len;
1380 1381  
1381 1382          data_len = avail_len = 0;
1382 1383  
1383 1384          while (wcl != NULL && count > 0) {
1384 1385                  if (wcl->c_dmemhandle.mrc_rmr == 0)
1385 1386                          break;
1386 1387  
1387 1388                  if (wcl->c_len < count) {
1388 1389                          data_len += wcl->c_len;
1389 1390                          avail_len = 0;
1390 1391                  } else {
1391 1392                          data_len += count;
1392 1393                          avail_len = wcl->c_len - count;
1393 1394                          wcl->c_len = count;
1394 1395                  }
1395 1396                  count -= wcl->c_len;
1396 1397  
1397 1398                  if (count == 0)
1398 1399                          break;
1399 1400  
1400 1401                  wcl = wcl->c_next;
1401 1402          }
1402 1403  
1403 1404          /*
1404 1405           * MUST fail if there are still more data
1405 1406           */
1406 1407          if (count > 0) {
1407 1408                  DTRACE_PROBE2(krpc__e__rdma_setup_read_chunks_clist_len,
1408 1409                      int, data_len, int, count);
1409 1410                  return (FALSE);
1410 1411          }
1411 1412  
1412 1413          /*
1413 1414           * Round up the last chunk to 4-byte boundary
1414 1415           */
1415 1416          *wcl_len = roundup(data_len, BYTES_PER_XDR_UNIT);
1416 1417          round_len = *wcl_len - data_len;
1417 1418  
1418 1419          if (round_len) {
1419 1420  
1420 1421                  /*
1421 1422                   * If there is space in the current chunk,
1422 1423                   * add the roundup to the chunk.
1423 1424                   */
1424 1425                  if (avail_len >= round_len) {
1425 1426                          wcl->c_len += round_len;
1426 1427                  } else  {
1427 1428                          /*
1428 1429                           * try the next one.
1429 1430                           */
1430 1431                          wcl = wcl->c_next;
1431 1432                          if ((wcl == NULL) || (wcl->c_len < round_len)) {
1432 1433                                  DTRACE_PROBE1(
1433 1434                                      krpc__e__rdma_setup_read_chunks_rndup,
1434 1435                                      int, round_len);
1435 1436                                  return (FALSE);
1436 1437                          }
1437 1438                          wcl->c_len = round_len;
1438 1439                  }
1439 1440          }
1440 1441  
1441 1442          wcl = wcl->c_next;
1442 1443  
1443 1444          /*
1444 1445           * Make rest of the chunks 0-len
1445 1446           */
1446 1447  
1447 1448          clist_zero_len(wcl);
1448 1449  
1449 1450          return (TRUE);
1450 1451  }
  
    | 
      ↓ open down ↓ | 
    293 lines elided | 
    
      ↑ open up ↑ | 
  
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX