Print this page
    
Support route deletion entries in SVP_R_LOG_ACK.
    
      
        | Split | 
	Close | 
      
      | Expand all | 
      | Collapse all | 
    
    
          --- old/usr/src/lib/varpd/svp/common/libvarpd_svp_shootdown.c
          +++ new/usr/src/lib/varpd/svp/common/libvarpd_svp_shootdown.c
   1    1  /*
   2    2   * This file and its contents are supplied under the terms of the
   3    3   * Common Development and Distribution License ("CDDL"), version 1.0.
   4    4   * You may only use this file in accordance with the terms of version
   5    5   * 1.0 of the CDDL.
   6    6   *
   7    7   * A full copy of the text of the CDDL should have accompanied this
   8    8   * source.  A copy of the CDDL is also available via the Internet at
   9    9   * http://www.illumos.org/license/CDDL.
  10   10   */
  11   11  
  12   12  /*
  13   13   * Copyright 2015 Joyent, Inc.
  14   14   */
  15   15  
  16   16  /*
  17   17   * Shootdown processing logic.
  18   18   *
  19   19   * For more information, see the big theory statement in
  20   20   * lib/varpd/svp/common/libvarpd_svp.c.
  21   21   */
  22   22  
  23   23  #include <umem.h>
  24   24  #include <sys/uuid.h>
  25   25  #include <assert.h>
  26   26  #include <strings.h>
  27   27  #include <errno.h>
  28   28  #include <sys/debug.h>
  29   29  
  30   30  #include <libvarpd_provider.h>
  31   31  #include <libvarpd_svp.h>
  32   32  
  33   33  /*
  34   34   * When we've determined that there's nothing left for us to do, then we go
  35   35   * ahead and wait svp_shootdown_base seconds + up to an additional
  36   36   * svp_shootdown_base seconds before asking again. However, if there is actually
  37   37   * some work going on, just use the svp_shootdown_cont time.
  38   38   */
  39   39  static int svp_shootdown_base = 5;
  40   40  static int svp_shootdown_cont = 1;
  41   41  
  42   42  /*
  43   43   * These are sizes for our logack and logrm buffers. The sizing of the shootdown
  44   44   * buffere would give us approximately 18 or so VL3 entries and 32 VL2 entries
  45   45   * or some combination thereof. While it's a bit of overkill, we just use the
  46   46   * same sized buffer for the list of uuids that we pass to remove log entries
  47   47   * that we've acted upon.
  48   48   */
  49   49  static int svp_shootdown_buf = 1024;
  50   50  
  51   51  static void
  52   52  svp_shootdown_schedule(svp_sdlog_t *sdl, boolean_t cont)
  53   53  {
  54   54          assert(MUTEX_HELD(&sdl->sdl_lock));
  55   55  
  56   56          if (cont == B_TRUE) {
  57   57                  sdl->sdl_timer.st_value = svp_shootdown_cont;
  58   58          } else {
  59   59                  sdl->sdl_timer.st_value = svp_shootdown_base +
  60   60                      arc4random_uniform(svp_shootdown_base + 1);
  61   61          }
  62   62          svp_timer_add(&sdl->sdl_timer);
  63   63  }
  64   64  
  65   65  void
  66   66  svp_shootdown_lrm_cb(svp_remote_t *srp, svp_status_t status)
  67   67  {
  68   68          svp_sdlog_t *sdl = &srp->sr_shoot;
  69   69  
  70   70          mutex_enter(&sdl->sdl_lock);
  71   71          sdl->sdl_flags &= ~SVP_SD_RUNNING;
  72   72          svp_shootdown_schedule(sdl, B_TRUE);
  73   73          mutex_exit(&sdl->sdl_lock);
  74   74  
  75   75          if (status != SVP_S_OK) {
  76   76                  (void) bunyan_warn(svp_bunyan, "SVP_R_LOG_RM failed",
  77   77                      BUNYAN_T_STRING, "remote_host", srp->sr_hostname,
  78   78                      BUNYAN_T_INT32, "remote_port", srp->sr_rport,
  79   79                      BUNYAN_T_INT32, "status", status,
  80   80                      BUNYAN_T_END);
  81   81          }
  82   82  }
  83   83  
  84   84  static void
  85   85  svp_shootdown_ref(svp_sdlog_t *sdl)
  86   86  {
  87   87          mutex_enter(&sdl->sdl_lock);
  88   88          sdl->sdl_ref++;
  89   89          mutex_exit(&sdl->sdl_lock);
  90   90  }
  91   91  
  92   92  static void
  93   93  svp_shootdown_rele(svp_sdlog_t *sdl)
  94   94  {
  95   95          svp_lrm_req_t *svrr = sdl->sdl_logrm;
  96   96          boolean_t next;
  97   97  
  98   98          mutex_enter(&sdl->sdl_lock);
  99   99          VERIFY(sdl->sdl_ref > 0);
 100  100          sdl->sdl_ref--;
 101  101          if (sdl->sdl_ref > 0) {
 102  102                  mutex_exit(&sdl->sdl_lock);
 103  103                  return;
 104  104          }
 105  105  
 106  106          /*
 107  107           * At this point we know that we hold the last reference, therefore it's
 108  108           * safe for us to go ahead and clean up and move on and attempt to
 109  109           * deliver the reply. We always deliver the reply by going through the
 110  110           * timer. This can be rather important as the final reference may be
 111  111           * coming through a failed query and it's not always safe for us to
 112  112           * callback into the remote routines from this context.
 113  113           *
 114  114           * We should only do this if we have a non-zero number of entries to
 115  115           * take down.
 116  116           */
 117  117          sdl->sdl_flags &= ~SVP_SD_RUNNING;
 118  118          if (svrr->svrr_count > 0) {
 119  119                  sdl->sdl_flags |= SVP_SD_DORM;
 120  120                  next = B_TRUE;
 121  121          } else {
 122  122                  next = B_FALSE;
 123  123          }
 124  124          svp_shootdown_schedule(sdl, next);
 125  125          mutex_exit(&sdl->sdl_lock);
 126  126  }
 127  127  
 128  128  /*
 129  129   * This is a callback used to indicate that the VL3 lookup has completed and an
 130  130   * entry, if any, has been injected. If the command succeeded, eg. we got that
 131  131   * the status was OK or that it was not found, then we will add it to he list to
 132  132   * shoot down. Otherwise, there's nothing else for us to really do here.
 133  133   */
 134  134  void
 135  135  svp_shootdown_vl3_cb(svp_status_t status, svp_log_vl3_t *vl3, svp_sdlog_t *sdl)
 136  136  {
 137  137          svp_lrm_req_t *svrr = sdl->sdl_logrm;
 138  138  
 139  139          mutex_enter(&sdl->sdl_lock);
 140  140          if (status == SVP_S_OK || status == SVP_S_NOTFOUND) {
 141  141                  bcopy(vl3->svl3_id, &svrr->svrr_ids[svrr->svrr_count * 16],
 142  142                      UUID_LEN);
 143  143                  svrr->svrr_count++;
 144  144          }
 145  145          mutex_exit(&sdl->sdl_lock);
 146  146  
  
    | 
      ↓ open down ↓ | 
    146 lines elided | 
    
      ↑ open up ↑ | 
  
 147  147          svp_shootdown_rele(sdl);
 148  148  }
 149  149  
 150  150  static int
 151  151  svp_shootdown_logr_shoot(void *data, svp_log_type_t type, void *arg)
 152  152  {
 153  153          svp_sdlog_t *sdl = arg;
 154  154          svp_remote_t *srp = sdl->sdl_remote;
 155  155          svp_lrm_req_t *svrr = sdl->sdl_logrm;
 156  156  
 157      -        if (type != SVP_LOG_VL2 && type != SVP_LOG_VL3)
      157 +        if (type != SVP_LOG_VL2 && type != SVP_LOG_VL3 && type != SVP_LOG_ROUTE)
 158  158                  libvarpd_panic("encountered unknown type: %d\n", type);
 159  159  
 160  160          if (type == SVP_LOG_VL2) {
 161  161                  svp_log_vl2_t *svl2 = data;
 162  162                  svp_remote_shootdown_vl2(srp, svl2);
 163  163                  mutex_enter(&sdl->sdl_lock);
 164  164                  bcopy(svl2->svl2_id, &svrr->svrr_ids[svrr->svrr_count * 16],
 165  165                      UUID_LEN);
 166  166                  svrr->svrr_count++;
 167  167                  mutex_exit(&sdl->sdl_lock);
 168      -        } else {
      168 +        } else if (type == SVP_LOG_VL3) {
 169  169                  svp_log_vl3_t *svl3 = data;
 170  170  
 171  171                  /* Take a hold for the duration of this request */
 172  172                  svp_shootdown_ref(sdl);
 173  173                  svp_remote_shootdown_vl3(srp, svl3, sdl);
      174 +        } else {
      175 +                svp_log_route_t *svlr = data;
      176 +
      177 +                svp_remote_shootdown_route(srp, svlr);
      178 +                mutex_enter(&sdl->sdl_lock);
      179 +                bcopy(svlr->svlr_id, &svrr->svrr_ids[svrr->svrr_count * 16],
      180 +                    UUID_LEN);
      181 +                svrr->svrr_count++;
      182 +                mutex_exit(&sdl->sdl_lock);
 174  183          }
 175  184  
 176  185          return (0);
 177  186  }
 178  187  
 179  188  static int
 180  189  svp_shootdown_logr_count(void *data, svp_log_type_t type, void *arg)
 181  190  {
 182  191          uint_t *u = arg;
 183  192          *u = *u + 1;
 184  193          return (0);
 185  194  }
 186  195  
 187  196  
 188  197  static int
 189  198  svp_shootdown_logr_iter(svp_remote_t *srp, void *buf, size_t len,
 190      -    int (*cb)(void *, svp_log_type_t, void *), void *arg)
      199 +    int (*cb)(void *, svp_log_type_t, void *), void *arg, uint16_t version)
 191  200  {
 192  201          int ret;
 193  202          off_t cboff = 0;
 194  203          uint32_t *typep, type;
 195  204          svp_log_vl2_t *svl2;
 196  205          svp_log_vl3_t *svl3;
 197  206  
 198  207          /* Adjust for initial status word */
 199  208          assert(len >= sizeof (uint32_t));
 200  209          len -= sizeof (uint32_t);
 201  210          cboff += sizeof (uint32_t);
 202  211  
 203  212          while (len > 0) {
 204  213                  size_t opsz;
      214 +                char *typestring;
 205  215  
 206  216                  if (len < sizeof (uint32_t)) {
 207  217                          (void) bunyan_warn(svp_bunyan,
 208  218                              "failed to get initial shootdown tag",
 209  219                              BUNYAN_T_STRING, "remote_host", srp->sr_hostname,
 210  220                              BUNYAN_T_INT32, "remote_port", srp->sr_rport,
 211  221                              BUNYAN_T_INT32, "response_size", cboff + len,
 212  222                              BUNYAN_T_INT32, "response_offset", cboff,
 213  223                              BUNYAN_T_END);
 214  224                          return (-1);
 215  225                  }
 216  226  
 217  227                  typep = buf + cboff;
 218  228                  type = ntohl(*typep);
 219      -                if (type == SVP_LOG_VL2) {
      229 +                switch (type) {
      230 +                case SVP_LOG_VL2:
 220  231                          opsz = sizeof (svp_log_vl2_t);
 221      -                        if (len < opsz) {
 222      -                                (void) bunyan_warn(svp_bunyan,
 223      -                                    "not enough data for svp_log_vl2_t",
 224      -                                    BUNYAN_T_STRING, "remote_host",
 225      -                                    srp->sr_hostname,
 226      -                                    BUNYAN_T_INT32, "remote_port",
 227      -                                    srp->sr_rport,
 228      -                                    BUNYAN_T_INT32, "response_size",
 229      -                                    cboff + len,
 230      -                                    BUNYAN_T_INT32, "response_offset", cboff,
 231      -                                    BUNYAN_T_END);
 232      -                                return (-1);
 233      -                        }
 234      -                        svl2 = (void *)typep;
 235      -                        if ((ret = cb(svl2, type, arg)) != 0)
 236      -                                return (ret);
 237      -                } else if (type == SVP_LOG_VL3) {
 238      -
      232 +                        typestring = "svp_log_vl2_t";
      233 +                        break;
      234 +                case SVP_LOG_VL3:
 239  235                          opsz = sizeof (svp_log_vl3_t);
 240      -                        if (len < opsz) {
      236 +                        typestring = "svp_log_vl3_t";
      237 +                        break;
      238 +                case SVP_LOG_ROUTE:
      239 +                        if (version < SVP_VERSION_TWO) {
 241  240                                  (void) bunyan_warn(svp_bunyan,
 242      -                                    "not enough data for svp_log_vl3_t",
      241 +                                    "insufficient version for SVP_LOG_ROUTE",
      242 +                                    BUNYAN_T_UINT32, "version", version,
 243  243                                      BUNYAN_T_STRING, "remote_host",
 244  244                                      srp->sr_hostname,
 245  245                                      BUNYAN_T_INT32, "remote_port",
 246  246                                      srp->sr_rport,
 247  247                                      BUNYAN_T_INT32, "response_size",
 248  248                                      cboff + len,
 249  249                                      BUNYAN_T_INT32, "response_offset", cboff,
 250  250                                      BUNYAN_T_END);
 251  251                                  return (-1);
 252  252                          }
 253      -                        svl3 = (void *)typep;
 254      -                        if ((ret = cb(svl3, type, arg)) != 0)
 255      -                                return (ret);
 256      -                } else {
      253 +                        opsz = sizeof (svp_log_route_t);
      254 +                        typestring = "svp_log_route_t";
      255 +                        break;
      256 +                default:
 257  257                          (void) bunyan_warn(svp_bunyan,
 258  258                              "unknown log structure type",
 259  259                              BUNYAN_T_STRING, "remote_host",
 260  260                              srp->sr_hostname,
 261  261                              BUNYAN_T_INT32, "remote_port", srp->sr_rport,
 262  262                              BUNYAN_T_INT32, "response_size", cboff + len,
 263  263                              BUNYAN_T_INT32, "response_offset", cboff,
 264  264                              BUNYAN_T_INT32, "structure_type", type,
 265  265                              BUNYAN_T_END);
 266  266                          return (-1);
 267  267                  }
      268 +                if (len < opsz) {
      269 +                        (void) bunyan_warn(svp_bunyan,
      270 +                            "not enough data for",
      271 +                            BUNYAN_T_STRING, "", typestring,
      272 +                            BUNYAN_T_STRING, "remote_host", srp->sr_hostname,
      273 +                            BUNYAN_T_INT32, "remote_port", srp->sr_rport,
      274 +                            BUNYAN_T_INT32, "response_size", cboff + len,
      275 +                            BUNYAN_T_INT32, "response_offset", cboff,
      276 +                            BUNYAN_T_END);
      277 +                        return (-1);
      278 +                }
      279 +                if ((ret = cb((void *)typep, type, arg)) != 0)
      280 +                        return (ret);
      281 +
 268  282                  len -= opsz;
 269  283                  cboff += opsz;
 270  284          }
 271  285  
 272  286          return (0);
 273  287  }
 274  288  
 275  289  void
 276  290  svp_shootdown_logr_cb(svp_remote_t *srp, svp_status_t status, void *cbdata,
 277      -    size_t cbsize)
      291 +    size_t cbsize, uint16_t version)
 278  292  {
 279  293          uint_t count;
 280  294          svp_sdlog_t *sdl = &srp->sr_shoot;
 281  295  
 282  296          if (status != SVP_S_OK) {
 283  297                  (void) bunyan_warn(svp_bunyan,
 284  298                      "log request not OK",
 285  299                      BUNYAN_T_STRING, "remote_host", srp->sr_hostname,
 286  300                      BUNYAN_T_INT32, "remote_port", srp->sr_rport,
 287  301                      BUNYAN_T_INT32, "response_size", cbsize,
 288  302                      BUNYAN_T_INT32, "status", status,
 289  303                      BUNYAN_T_END);
 290  304                  mutex_enter(&sdl->sdl_lock);
 291  305                  sdl->sdl_flags &= ~SVP_SD_RUNNING;
 292  306                  svp_shootdown_schedule(sdl, B_FALSE);
 293  307                  mutex_exit(&sdl->sdl_lock);
  
    | 
      ↓ open down ↓ | 
    6 lines elided | 
    
      ↑ open up ↑ | 
  
 294  308                  return;
 295  309          }
 296  310  
 297  311          /*
 298  312           * First go ahead and count the number of entries. This effectively
 299  313           * allows us to validate that all the data is valid, if this fails, then
 300  314           * we fail the request.
 301  315           */
 302  316          count = 0;
 303  317          if ((svp_shootdown_logr_iter(srp, cbdata, cbsize,
 304      -            svp_shootdown_logr_count, &count)) != 0) {
      318 +                svp_shootdown_logr_count, &count, version)) != 0) {
 305  319                  mutex_enter(&sdl->sdl_lock);
 306  320                  sdl->sdl_flags &= ~SVP_SD_RUNNING;
 307  321                  svp_shootdown_schedule(sdl, B_FALSE);
 308  322                  mutex_exit(&sdl->sdl_lock);
 309  323                  return;
 310  324          }
 311  325  
 312  326          /*
 313  327           * If we have no entries, then we're also done.
 314  328           */
 315  329          if (count == 0) {
 316  330                  mutex_enter(&sdl->sdl_lock);
 317  331                  sdl->sdl_flags &= ~SVP_SD_RUNNING;
 318  332                  svp_shootdown_schedule(sdl, B_FALSE);
 319  333                  mutex_exit(&sdl->sdl_lock);
 320  334                  return;
 321  335          }
 322  336  
 323  337          /*
 324  338           * We have work to do. Because we may have asynchronous VL3 tasks, we're
 325  339           * going to first grab a reference before we do the iteration. Then, for
 326  340           * each asynchronous VL3 request we make, that'll also grab a hold. Once
 327  341           * we're done with the iteration, we'll drop our hold. If that's the
 328  342           * last one, it'll move on accordingly.
 329  343           */
  
    | 
      ↓ open down ↓ | 
    15 lines elided | 
    
      ↑ open up ↑ | 
  
 330  344          svp_shootdown_ref(sdl);
 331  345          bzero(sdl->sdl_logrm, svp_shootdown_buf);
 332  346  
 333  347          /*
 334  348           * If this fails, we're going to determine what to do next based on the
 335  349           * number of entries that were entered into the log removal. At this
 336  350           * point success or failure don't really look different, all it changes
 337  351           * is how many entries we have to remove.
 338  352           */
 339  353          (void) svp_shootdown_logr_iter(srp, cbdata, cbsize,
 340      -            svp_shootdown_logr_shoot, sdl);
      354 +            svp_shootdown_logr_shoot, sdl, version);
 341  355  
 342  356          /*
 343  357           * Now that we're done with our work, release the hold. If we don't have
 344  358           * any vl3 tasks outstanding, this'll trigger the next phase of the log
 345  359           * removals.
 346  360           */
 347  361          svp_shootdown_rele(sdl);
 348  362  }
 349  363  
 350  364  static void
 351  365  svp_shootdown_timer(void *arg)
 352  366  {
 353  367          svp_sdlog_t *sdl = arg;
 354  368          svp_remote_t *srp = sdl->sdl_remote;
 355  369          boolean_t init = B_TRUE;
 356  370  
 357  371          mutex_enter(&sdl->sdl_lock);
 358  372  
 359  373          /*
 360  374           * If we've been asked to quiesce, we're done.
 361  375           */
 362  376          if ((sdl->sdl_flags & SVP_SD_QUIESCE) != 0) {
 363  377                  mutex_exit(&sdl->sdl_lock);
 364  378                  return;
 365  379          }
 366  380  
 367  381          /*
 368  382           * We shouldn't be able to have ourselves currently be running and reach
 369  383           * here. If that's the case, we should immediately panic.
 370  384           */
 371  385          if ((sdl->sdl_flags & SVP_SD_RUNNING) != 0) {
 372  386                  libvarpd_panic("remote %p shootdown timer fired while still "
 373  387                      "running", srp);
 374  388          }
 375  389  
 376  390          if ((sdl->sdl_flags & SVP_SD_DORM) != 0) {
 377  391                  sdl->sdl_flags &= ~SVP_SD_DORM;
 378  392                  init = B_FALSE;
 379  393          }
 380  394  
 381  395          sdl->sdl_flags |= SVP_SD_RUNNING;
 382  396          mutex_exit(&sdl->sdl_lock);
 383  397  
 384  398          if (init == B_FALSE) {
 385  399                  svp_lrm_req_t *svrr = sdl->sdl_logrm;
 386  400  
 387  401                  bzero(&sdl->sdl_query, sizeof (svp_query_t));
 388  402                  svp_remote_lrm_request(sdl->sdl_remote, &sdl->sdl_query, svrr,
 389  403                      sizeof (*svrr) + 16 * svrr->svrr_count);
 390  404          } else {
 391  405                  bzero(&sdl->sdl_query, sizeof (svp_query_t));
 392  406                  svp_remote_log_request(srp, &sdl->sdl_query, sdl->sdl_logack,
 393  407                      svp_shootdown_buf);
 394  408          }
 395  409  }
 396  410  
 397  411  void
 398  412  svp_shootdown_fini(svp_remote_t *srp)
 399  413  {
 400  414          svp_sdlog_t *sdl = &srp->sr_shoot;
 401  415  
 402  416          mutex_enter(&sdl->sdl_lock);
 403  417          sdl->sdl_flags |= SVP_SD_QUIESCE;
 404  418          mutex_exit(&sdl->sdl_lock);
 405  419  
 406  420          svp_timer_remove(&sdl->sdl_timer);
 407  421  
 408  422          mutex_enter(&sdl->sdl_lock);
 409  423  
 410  424          /*
 411  425           * Normally svp_timer_remove would be enough. However, the query could
 412  426           * have been put out again outside of the svp_timer interface. Therefore
 413  427           * we still need to check for SVP_SD_RUNNING.
 414  428           */
 415  429          while (sdl->sdl_flags & SVP_SD_RUNNING)
 416  430                  (void) cond_wait(&sdl->sdl_cond, &sdl->sdl_lock);
 417  431          mutex_exit(&sdl->sdl_lock);
 418  432  
 419  433          umem_free(sdl->sdl_logack, svp_shootdown_buf);
 420  434          umem_free(sdl->sdl_logrm, svp_shootdown_buf);
 421  435          sdl->sdl_logack = NULL;
 422  436          sdl->sdl_logrm = NULL;
 423  437          (void) cond_destroy(&sdl->sdl_cond);
 424  438          (void) mutex_destroy(&sdl->sdl_lock);
 425  439  }
 426  440  
 427  441  void
 428  442  svp_shootdown_start(svp_remote_t *srp)
 429  443  {
 430  444          svp_sdlog_t *sdl = &srp->sr_shoot;
 431  445  
 432  446          mutex_enter(&sdl->sdl_lock);
 433  447          svp_shootdown_schedule(sdl, B_FALSE);
 434  448          mutex_exit(&sdl->sdl_lock);
 435  449  }
 436  450  
 437  451  int
 438  452  svp_shootdown_init(svp_remote_t *srp)
 439  453  {
 440  454          int ret;
 441  455          svp_sdlog_t *sdl = &srp->sr_shoot;
 442  456          if ((ret = mutex_init(&sdl->sdl_lock, USYNC_THREAD | LOCK_ERRORCHECK,
 443  457              NULL)) != 0)
 444  458                  return (ret);
 445  459  
 446  460          if ((ret = cond_init(&sdl->sdl_cond, USYNC_THREAD, NULL)) != 0) {
 447  461                  (void) mutex_destroy(&sdl->sdl_lock);
 448  462                  return (ret);
 449  463          }
 450  464  
 451  465          if ((sdl->sdl_logack = umem_alloc(svp_shootdown_buf, UMEM_DEFAULT)) ==
 452  466              NULL) {
 453  467                  ret = errno;
 454  468                  (void) cond_destroy(&sdl->sdl_cond);
 455  469                  (void) mutex_destroy(&sdl->sdl_lock);
 456  470                  return (ret);
 457  471          }
 458  472  
 459  473          if ((sdl->sdl_logrm = umem_alloc(svp_shootdown_buf, UMEM_DEFAULT)) ==
 460  474              NULL) {
 461  475                  ret = errno;
 462  476                  umem_free(sdl->sdl_logack, svp_shootdown_buf);
 463  477                  (void) cond_destroy(&sdl->sdl_cond);
 464  478                  (void) mutex_destroy(&sdl->sdl_lock);
 465  479                  return (ret);
 466  480          }
 467  481  
 468  482          sdl->sdl_remote = srp;
 469  483          sdl->sdl_timer.st_oneshot = B_TRUE;
 470  484          sdl->sdl_timer.st_func = svp_shootdown_timer;
 471  485          sdl->sdl_timer.st_arg = sdl;
 472  486  
 473  487          return (0);
 474  488  }
  
    | 
      ↓ open down ↓ | 
    124 lines elided | 
    
      ↑ open up ↑ | 
  
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX