1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  */
  24 
  25 #include <sys/zfs_context.h>
  26 #include <sys/txg_impl.h>
  27 #include <sys/dmu_impl.h>
  28 #include <sys/dmu_tx.h>
  29 #include <sys/dsl_pool.h>
  30 #include <sys/dsl_scan.h>
  31 #include <sys/callb.h>
  32 
  33 /*
  34  * Pool-wide transaction groups.
  35  */
  36 
  37 static void txg_sync_thread(dsl_pool_t *dp);
  38 static void txg_quiesce_thread(dsl_pool_t *dp);
  39 
  40 int zfs_txg_timeout = 5;        /* max seconds worth of delta per txg */
  41 
  42 /*
  43  * Prepare the txg subsystem.
  44  */
  45 void
  46 txg_init(dsl_pool_t *dp, uint64_t txg)
  47 {
  48         tx_state_t *tx = &dp->dp_tx;
  49         int c;
  50         bzero(tx, sizeof (tx_state_t));
  51 
  52         tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
  53 
  54         for (c = 0; c < max_ncpus; c++) {
  55                 int i;
  56 
  57                 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
  58                 for (i = 0; i < TXG_SIZE; i++) {
  59                         cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
  60                             NULL);
  61                         list_create(&tx->tx_cpu[c].tc_callbacks[i],
  62                             sizeof (dmu_tx_callback_t),
  63                             offsetof(dmu_tx_callback_t, dcb_node));
  64                 }
  65         }
  66 
  67         mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
  68 
  69         cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
  70         cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
  71         cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
  72         cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
  73         cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
  74 
  75         tx->tx_open_txg = txg;
  76 }
  77 
  78 /*
  79  * Close down the txg subsystem.
  80  */
  81 void
  82 txg_fini(dsl_pool_t *dp)
  83 {
  84         tx_state_t *tx = &dp->dp_tx;
  85         int c;
  86 
  87         ASSERT(tx->tx_threads == 0);
  88 
  89         mutex_destroy(&tx->tx_sync_lock);
  90 
  91         cv_destroy(&tx->tx_sync_more_cv);
  92         cv_destroy(&tx->tx_sync_done_cv);
  93         cv_destroy(&tx->tx_quiesce_more_cv);
  94         cv_destroy(&tx->tx_quiesce_done_cv);
  95         cv_destroy(&tx->tx_exit_cv);
  96 
  97         for (c = 0; c < max_ncpus; c++) {
  98                 int i;
  99 
 100                 mutex_destroy(&tx->tx_cpu[c].tc_lock);
 101                 for (i = 0; i < TXG_SIZE; i++) {
 102                         cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
 103                         list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
 104                 }
 105         }
 106 
 107         if (tx->tx_commit_cb_taskq != NULL)
 108                 taskq_destroy(tx->tx_commit_cb_taskq);
 109 
 110         kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
 111 
 112         bzero(tx, sizeof (tx_state_t));
 113 }
 114 
 115 /*
 116  * Start syncing transaction groups.
 117  */
 118 void
 119 txg_sync_start(dsl_pool_t *dp)
 120 {
 121         tx_state_t *tx = &dp->dp_tx;
 122 
 123         mutex_enter(&tx->tx_sync_lock);
 124 
 125         dprintf("pool %p\n", dp);
 126 
 127         ASSERT(tx->tx_threads == 0);
 128 
 129         tx->tx_threads = 2;
 130 
 131         tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
 132             dp, 0, &p0, TS_RUN, minclsyspri);
 133 
 134         /*
 135          * The sync thread can need a larger-than-default stack size on
 136          * 32-bit x86.  This is due in part to nested pools and
 137          * scrub_visitbp() recursion.
 138          */
 139         tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
 140             dp, 0, &p0, TS_RUN, minclsyspri);
 141 
 142         mutex_exit(&tx->tx_sync_lock);
 143 }
 144 
 145 static void
 146 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
 147 {
 148         CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
 149         mutex_enter(&tx->tx_sync_lock);
 150 }
 151 
 152 static void
 153 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
 154 {
 155         ASSERT(*tpp != NULL);
 156         *tpp = NULL;
 157         tx->tx_threads--;
 158         cv_broadcast(&tx->tx_exit_cv);
 159         CALLB_CPR_EXIT(cpr);            /* drops &tx->tx_sync_lock */
 160         thread_exit();
 161 }
 162 
 163 static void
 164 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, uint64_t time)
 165 {
 166         CALLB_CPR_SAFE_BEGIN(cpr);
 167 
 168         if (time)
 169                 (void) cv_timedwait(cv, &tx->tx_sync_lock,
 170                     ddi_get_lbolt() + time);
 171         else
 172                 cv_wait(cv, &tx->tx_sync_lock);
 173 
 174         CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
 175 }
 176 
 177 /*
 178  * Stop syncing transaction groups.
 179  */
 180 void
 181 txg_sync_stop(dsl_pool_t *dp)
 182 {
 183         tx_state_t *tx = &dp->dp_tx;
 184 
 185         dprintf("pool %p\n", dp);
 186         /*
 187          * Finish off any work in progress.
 188          */
 189         ASSERT(tx->tx_threads == 2);
 190 
 191         /*
 192          * We need to ensure that we've vacated the deferred space_maps.
 193          */
 194         txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
 195 
 196         /*
 197          * Wake all sync threads and wait for them to die.
 198          */
 199         mutex_enter(&tx->tx_sync_lock);
 200 
 201         ASSERT(tx->tx_threads == 2);
 202 
 203         tx->tx_exiting = 1;
 204 
 205         cv_broadcast(&tx->tx_quiesce_more_cv);
 206         cv_broadcast(&tx->tx_quiesce_done_cv);
 207         cv_broadcast(&tx->tx_sync_more_cv);
 208 
 209         while (tx->tx_threads != 0)
 210                 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
 211 
 212         tx->tx_exiting = 0;
 213 
 214         mutex_exit(&tx->tx_sync_lock);
 215 }
 216 
 217 uint64_t
 218 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
 219 {
 220         tx_state_t *tx = &dp->dp_tx;
 221         tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
 222         uint64_t txg;
 223 
 224         mutex_enter(&tc->tc_lock);
 225 
 226         txg = tx->tx_open_txg;
 227         tc->tc_count[txg & TXG_MASK]++;
 228 
 229         th->th_cpu = tc;
 230         th->th_txg = txg;
 231 
 232         return (txg);
 233 }
 234 
 235 void
 236 txg_rele_to_quiesce(txg_handle_t *th)
 237 {
 238         tx_cpu_t *tc = th->th_cpu;
 239 
 240         mutex_exit(&tc->tc_lock);
 241 }
 242 
 243 void
 244 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
 245 {
 246         tx_cpu_t *tc = th->th_cpu;
 247         int g = th->th_txg & TXG_MASK;
 248 
 249         mutex_enter(&tc->tc_lock);
 250         list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
 251         mutex_exit(&tc->tc_lock);
 252 }
 253 
 254 void
 255 txg_rele_to_sync(txg_handle_t *th)
 256 {
 257         tx_cpu_t *tc = th->th_cpu;
 258         int g = th->th_txg & TXG_MASK;
 259 
 260         mutex_enter(&tc->tc_lock);
 261         ASSERT(tc->tc_count[g] != 0);
 262         if (--tc->tc_count[g] == 0)
 263                 cv_broadcast(&tc->tc_cv[g]);
 264         mutex_exit(&tc->tc_lock);
 265 
 266         th->th_cpu = NULL;   /* defensive */
 267 }
 268 
 269 static void
 270 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
 271 {
 272         tx_state_t *tx = &dp->dp_tx;
 273         int g = txg & TXG_MASK;
 274         int c;
 275 
 276         /*
 277          * Grab all tx_cpu locks so nobody else can get into this txg.
 278          */
 279         for (c = 0; c < max_ncpus; c++)
 280                 mutex_enter(&tx->tx_cpu[c].tc_lock);
 281 
 282         ASSERT(txg == tx->tx_open_txg);
 283         tx->tx_open_txg++;
 284 
 285         /*
 286          * Now that we've incremented tx_open_txg, we can let threads
 287          * enter the next transaction group.
 288          */
 289         for (c = 0; c < max_ncpus; c++)
 290                 mutex_exit(&tx->tx_cpu[c].tc_lock);
 291 
 292         /*
 293          * Quiesce the transaction group by waiting for everyone to txg_exit().
 294          */
 295         for (c = 0; c < max_ncpus; c++) {
 296                 tx_cpu_t *tc = &tx->tx_cpu[c];
 297                 mutex_enter(&tc->tc_lock);
 298                 while (tc->tc_count[g] != 0)
 299                         cv_wait(&tc->tc_cv[g], &tc->tc_lock);
 300                 mutex_exit(&tc->tc_lock);
 301         }
 302 }
 303 
 304 static void
 305 txg_do_callbacks(list_t *cb_list)
 306 {
 307         dmu_tx_do_callbacks(cb_list, 0);
 308 
 309         list_destroy(cb_list);
 310 
 311         kmem_free(cb_list, sizeof (list_t));
 312 }
 313 
 314 /*
 315  * Dispatch the commit callbacks registered on this txg to worker threads.
 316  */
 317 static void
 318 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
 319 {
 320         int c;
 321         tx_state_t *tx = &dp->dp_tx;
 322         list_t *cb_list;
 323 
 324         for (c = 0; c < max_ncpus; c++) {
 325                 tx_cpu_t *tc = &tx->tx_cpu[c];
 326                 /* No need to lock tx_cpu_t at this point */
 327 
 328                 int g = txg & TXG_MASK;
 329 
 330                 if (list_is_empty(&tc->tc_callbacks[g]))
 331                         continue;
 332 
 333                 if (tx->tx_commit_cb_taskq == NULL) {
 334                         /*
 335                          * Commit callback taskq hasn't been created yet.
 336                          */
 337                         tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
 338                             max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
 339                             TASKQ_PREPOPULATE);
 340                 }
 341 
 342                 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
 343                 list_create(cb_list, sizeof (dmu_tx_callback_t),
 344                     offsetof(dmu_tx_callback_t, dcb_node));
 345 
 346                 list_move_tail(&tc->tc_callbacks[g], cb_list);
 347 
 348                 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
 349                     txg_do_callbacks, cb_list, TQ_SLEEP);
 350         }
 351 }
 352 
 353 static void
 354 txg_sync_thread(dsl_pool_t *dp)
 355 {
 356         spa_t *spa = dp->dp_spa;
 357         tx_state_t *tx = &dp->dp_tx;
 358         callb_cpr_t cpr;
 359         uint64_t start, delta;
 360 
 361         txg_thread_enter(tx, &cpr);
 362 
 363         start = delta = 0;
 364         for (;;) {
 365                 uint64_t timer, timeout = zfs_txg_timeout * hz;
 366                 uint64_t txg;
 367 
 368                 /*
 369                  * We sync when we're scanning, there's someone waiting
 370                  * on us, or the quiesce thread has handed off a txg to
 371                  * us, or we have reached our timeout.
 372                  */
 373                 timer = (delta >= timeout ? 0 : timeout - delta);
 374                 while (!dsl_scan_active(dp->dp_scan) &&
 375                     !tx->tx_exiting && timer > 0 &&
 376                     tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
 377                     tx->tx_quiesced_txg == 0) {
 378                         dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
 379                             tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
 380                         txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
 381                         delta = ddi_get_lbolt() - start;
 382                         timer = (delta > timeout ? 0 : timeout - delta);
 383                 }
 384 
 385                 /*
 386                  * Wait until the quiesce thread hands off a txg to us,
 387                  * prompting it to do so if necessary.
 388                  */
 389                 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
 390                         if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
 391                                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
 392                         cv_broadcast(&tx->tx_quiesce_more_cv);
 393                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
 394                 }
 395 
 396                 if (tx->tx_exiting)
 397                         txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
 398 
 399                 /*
 400                  * Consume the quiesced txg which has been handed off to
 401                  * us.  This may cause the quiescing thread to now be
 402                  * able to quiesce another txg, so we must signal it.
 403                  */
 404                 txg = tx->tx_quiesced_txg;
 405                 tx->tx_quiesced_txg = 0;
 406                 tx->tx_syncing_txg = txg;
 407                 cv_broadcast(&tx->tx_quiesce_more_cv);
 408 
 409                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 410                     txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 411                 mutex_exit(&tx->tx_sync_lock);
 412 
 413                 start = ddi_get_lbolt();
 414                 spa_sync(spa, txg);
 415                 delta = ddi_get_lbolt() - start;
 416 
 417                 mutex_enter(&tx->tx_sync_lock);
 418                 tx->tx_synced_txg = txg;
 419                 tx->tx_syncing_txg = 0;
 420                 cv_broadcast(&tx->tx_sync_done_cv);
 421 
 422                 /*
 423                  * Dispatch commit callbacks to worker threads.
 424                  */
 425                 txg_dispatch_callbacks(dp, txg);
 426         }
 427 }
 428 
 429 static void
 430 txg_quiesce_thread(dsl_pool_t *dp)
 431 {
 432         tx_state_t *tx = &dp->dp_tx;
 433         callb_cpr_t cpr;
 434 
 435         txg_thread_enter(tx, &cpr);
 436 
 437         for (;;) {
 438                 uint64_t txg;
 439 
 440                 /*
 441                  * We quiesce when there's someone waiting on us.
 442                  * However, we can only have one txg in "quiescing" or
 443                  * "quiesced, waiting to sync" state.  So we wait until
 444                  * the "quiesced, waiting to sync" txg has been consumed
 445                  * by the sync thread.
 446                  */
 447                 while (!tx->tx_exiting &&
 448                     (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
 449                     tx->tx_quiesced_txg != 0))
 450                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
 451 
 452                 if (tx->tx_exiting)
 453                         txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
 454 
 455                 txg = tx->tx_open_txg;
 456                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 457                     txg, tx->tx_quiesce_txg_waiting,
 458                     tx->tx_sync_txg_waiting);
 459                 mutex_exit(&tx->tx_sync_lock);
 460                 txg_quiesce(dp, txg);
 461                 mutex_enter(&tx->tx_sync_lock);
 462 
 463                 /*
 464                  * Hand this txg off to the sync thread.
 465                  */
 466                 dprintf("quiesce done, handing off txg %llu\n", txg);
 467                 tx->tx_quiesced_txg = txg;
 468                 cv_broadcast(&tx->tx_sync_more_cv);
 469                 cv_broadcast(&tx->tx_quiesce_done_cv);
 470         }
 471 }
 472 
 473 /*
 474  * Delay this thread by 'ticks' if we are still in the open transaction
 475  * group and there is already a waiting txg quiesing or quiesced.  Abort
 476  * the delay if this txg stalls or enters the quiesing state.
 477  */
 478 void
 479 txg_delay(dsl_pool_t *dp, uint64_t txg, int ticks)
 480 {
 481         tx_state_t *tx = &dp->dp_tx;
 482         int timeout = ddi_get_lbolt() + ticks;
 483 
 484         /* don't delay if this txg could transition to quiesing immediately */
 485         if (tx->tx_open_txg > txg ||
 486             tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
 487                 return;
 488 
 489         mutex_enter(&tx->tx_sync_lock);
 490         if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
 491                 mutex_exit(&tx->tx_sync_lock);
 492                 return;
 493         }
 494 
 495         while (ddi_get_lbolt() < timeout &&
 496             tx->tx_syncing_txg < txg-1 && !txg_stalled(dp))
 497                 (void) cv_timedwait(&tx->tx_quiesce_more_cv, &tx->tx_sync_lock,
 498                     timeout);
 499 
 500         mutex_exit(&tx->tx_sync_lock);
 501 }
 502 
 503 void
 504 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
 505 {
 506         tx_state_t *tx = &dp->dp_tx;
 507 
 508         mutex_enter(&tx->tx_sync_lock);
 509         ASSERT(tx->tx_threads == 2);
 510         if (txg == 0)
 511                 txg = tx->tx_open_txg + TXG_DEFER_SIZE;
 512         if (tx->tx_sync_txg_waiting < txg)
 513                 tx->tx_sync_txg_waiting = txg;
 514         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 515             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 516         while (tx->tx_synced_txg < txg) {
 517                 dprintf("broadcasting sync more "
 518                     "tx_synced=%llu waiting=%llu dp=%p\n",
 519                     tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
 520                 cv_broadcast(&tx->tx_sync_more_cv);
 521                 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
 522         }
 523         mutex_exit(&tx->tx_sync_lock);
 524 }
 525 
 526 void
 527 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
 528 {
 529         tx_state_t *tx = &dp->dp_tx;
 530 
 531         mutex_enter(&tx->tx_sync_lock);
 532         ASSERT(tx->tx_threads == 2);
 533         if (txg == 0)
 534                 txg = tx->tx_open_txg + 1;
 535         if (tx->tx_quiesce_txg_waiting < txg)
 536                 tx->tx_quiesce_txg_waiting = txg;
 537         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 538             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 539         while (tx->tx_open_txg < txg) {
 540                 cv_broadcast(&tx->tx_quiesce_more_cv);
 541                 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
 542         }
 543         mutex_exit(&tx->tx_sync_lock);
 544 }
 545 
 546 boolean_t
 547 txg_stalled(dsl_pool_t *dp)
 548 {
 549         tx_state_t *tx = &dp->dp_tx;
 550         return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
 551 }
 552 
 553 boolean_t
 554 txg_sync_waiting(dsl_pool_t *dp)
 555 {
 556         tx_state_t *tx = &dp->dp_tx;
 557 
 558         return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
 559             tx->tx_quiesced_txg != 0);
 560 }
 561 
 562 /*
 563  * Per-txg object lists.
 564  */
 565 void
 566 txg_list_create(txg_list_t *tl, size_t offset)
 567 {
 568         int t;
 569 
 570         mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
 571 
 572         tl->tl_offset = offset;
 573 
 574         for (t = 0; t < TXG_SIZE; t++)
 575                 tl->tl_head[t] = NULL;
 576 }
 577 
 578 void
 579 txg_list_destroy(txg_list_t *tl)
 580 {
 581         int t;
 582 
 583         for (t = 0; t < TXG_SIZE; t++)
 584                 ASSERT(txg_list_empty(tl, t));
 585 
 586         mutex_destroy(&tl->tl_lock);
 587 }
 588 
 589 int
 590 txg_list_empty(txg_list_t *tl, uint64_t txg)
 591 {
 592         return (tl->tl_head[txg & TXG_MASK] == NULL);
 593 }
 594 
 595 /*
 596  * Add an entry to the list.
 597  * Returns 0 if it's a new entry, 1 if it's already there.
 598  */
 599 int
 600 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
 601 {
 602         int t = txg & TXG_MASK;
 603         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 604         int already_on_list;
 605 
 606         mutex_enter(&tl->tl_lock);
 607         already_on_list = tn->tn_member[t];
 608         if (!already_on_list) {
 609                 tn->tn_member[t] = 1;
 610                 tn->tn_next[t] = tl->tl_head[t];
 611                 tl->tl_head[t] = tn;
 612         }
 613         mutex_exit(&tl->tl_lock);
 614 
 615         return (already_on_list);
 616 }
 617 
 618 /*
 619  * Add an entry to the end of the list (walks list to find end).
 620  * Returns 0 if it's a new entry, 1 if it's already there.
 621  */
 622 int
 623 txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
 624 {
 625         int t = txg & TXG_MASK;
 626         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 627         int already_on_list;
 628 
 629         mutex_enter(&tl->tl_lock);
 630         already_on_list = tn->tn_member[t];
 631         if (!already_on_list) {
 632                 txg_node_t **tp;
 633 
 634                 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
 635                         continue;
 636 
 637                 tn->tn_member[t] = 1;
 638                 tn->tn_next[t] = NULL;
 639                 *tp = tn;
 640         }
 641         mutex_exit(&tl->tl_lock);
 642 
 643         return (already_on_list);
 644 }
 645 
 646 /*
 647  * Remove the head of the list and return it.
 648  */
 649 void *
 650 txg_list_remove(txg_list_t *tl, uint64_t txg)
 651 {
 652         int t = txg & TXG_MASK;
 653         txg_node_t *tn;
 654         void *p = NULL;
 655 
 656         mutex_enter(&tl->tl_lock);
 657         if ((tn = tl->tl_head[t]) != NULL) {
 658                 p = (char *)tn - tl->tl_offset;
 659                 tl->tl_head[t] = tn->tn_next[t];
 660                 tn->tn_next[t] = NULL;
 661                 tn->tn_member[t] = 0;
 662         }
 663         mutex_exit(&tl->tl_lock);
 664 
 665         return (p);
 666 }
 667 
 668 /*
 669  * Remove a specific item from the list and return it.
 670  */
 671 void *
 672 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
 673 {
 674         int t = txg & TXG_MASK;
 675         txg_node_t *tn, **tp;
 676 
 677         mutex_enter(&tl->tl_lock);
 678 
 679         for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
 680                 if ((char *)tn - tl->tl_offset == p) {
 681                         *tp = tn->tn_next[t];
 682                         tn->tn_next[t] = NULL;
 683                         tn->tn_member[t] = 0;
 684                         mutex_exit(&tl->tl_lock);
 685                         return (p);
 686                 }
 687         }
 688 
 689         mutex_exit(&tl->tl_lock);
 690 
 691         return (NULL);
 692 }
 693 
 694 int
 695 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
 696 {
 697         int t = txg & TXG_MASK;
 698         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 699 
 700         return (tn->tn_member[t]);
 701 }
 702 
 703 /*
 704  * Walk a txg list -- only safe if you know it's not changing.
 705  */
 706 void *
 707 txg_list_head(txg_list_t *tl, uint64_t txg)
 708 {
 709         int t = txg & TXG_MASK;
 710         txg_node_t *tn = tl->tl_head[t];
 711 
 712         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
 713 }
 714 
 715 void *
 716 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
 717 {
 718         int t = txg & TXG_MASK;
 719         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 720 
 721         tn = tn->tn_next[t];
 722 
 723         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
 724 }