Print this page
NEX-20218 Backport Illumos #9474 txg_kick() fails to see that we are quiescing, forcing transactions to their next stages without leaving them accumulate changes
MFV illumos-gate@fa41d87de9ec9000964c605eb01d6dc19e4a1abe
9464 txg_kick() fails to see that we are quiescing, forcing transactions to their next stages without leaving them accumulate changes
Reviewed by: Matt Ahrens <matt@delphix.com>
Reviewed by: Brad Lewis <brad.lewis@delphix.com>
Reviewed by: Andriy Gapon <avg@FreeBSD.org>
Approved by: Dan McDonald <danmcd@joyent.com>
NEX-6859 TX-commit callback that is registered in sync-ctx causes system panic
Reviewed by: Sanjay Nadkarni <sanjay.nadkarni@nexenta.com>
Reviewed by: Saso Kiselkov <saso.kiselkov@nexenta.com>
| Split |
Close |
| Expand all |
| Collapse all |
--- old/usr/src/uts/common/fs/zfs/txg.c
+++ new/usr/src/uts/common/fs/zfs/txg.c
1 1 /*
2 2 * CDDL HEADER START
3 3 *
4 4 * The contents of this file are subject to the terms of the
5 5 * Common Development and Distribution License (the "License").
6 6 * You may not use this file except in compliance with the License.
7 7 *
8 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 9 * or http://www.opensolaris.org/os/licensing.
10 10 * See the License for the specific language governing permissions
11 11 * and limitations under the License.
12 12 *
13 13 * When distributing Covered Code, include this CDDL HEADER in each
14 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
|
↓ open down ↓ |
14 lines elided |
↑ open up ↑ |
15 15 * If applicable, add the following below this CDDL HEADER, with the
16 16 * fields enclosed by brackets "[]" replaced with your own identifying
17 17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 18 *
19 19 * CDDL HEADER END
20 20 */
21 21 /*
22 22 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23 23 * Portions Copyright 2011 Martin Matuska
24 24 * Copyright (c) 2012, 2017 by Delphix. All rights reserved.
25 + * Copyright 2016 Nexenta Systems, Inc. All rights reserved.
25 26 */
26 27
27 28 #include <sys/zfs_context.h>
28 29 #include <sys/txg_impl.h>
29 30 #include <sys/dmu_impl.h>
30 31 #include <sys/dmu_tx.h>
31 32 #include <sys/dsl_pool.h>
32 33 #include <sys/dsl_scan.h>
33 34 #include <sys/zil.h>
34 35 #include <sys/callb.h>
35 36
36 37 /*
37 38 * ZFS Transaction Groups
38 39 * ----------------------
39 40 *
40 41 * ZFS transaction groups are, as the name implies, groups of transactions
41 42 * that act on persistent state. ZFS asserts consistency at the granularity of
42 43 * these transaction groups. Each successive transaction group (txg) is
43 44 * assigned a 64-bit consecutive identifier. There are three active
44 45 * transaction group states: open, quiescing, or syncing. At any given time,
45 46 * there may be an active txg associated with each state; each active txg may
46 47 * either be processing, or blocked waiting to enter the next state. There may
47 48 * be up to three active txgs, and there is always a txg in the open state
48 49 * (though it may be blocked waiting to enter the quiescing state). In broad
49 50 * strokes, transactions -- operations that change in-memory structures -- are
50 51 * accepted into the txg in the open state, and are completed while the txg is
51 52 * in the open or quiescing states. The accumulated changes are written to
52 53 * disk in the syncing state.
53 54 *
54 55 * Open
55 56 *
56 57 * When a new txg becomes active, it first enters the open state. New
57 58 * transactions -- updates to in-memory structures -- are assigned to the
58 59 * currently open txg. There is always a txg in the open state so that ZFS can
59 60 * accept new changes (though the txg may refuse new changes if it has hit
60 61 * some limit). ZFS advances the open txg to the next state for a variety of
61 62 * reasons such as it hitting a time or size threshold, or the execution of an
62 63 * administrative action that must be completed in the syncing state.
63 64 *
64 65 * Quiescing
65 66 *
66 67 * After a txg exits the open state, it enters the quiescing state. The
67 68 * quiescing state is intended to provide a buffer between accepting new
68 69 * transactions in the open state and writing them out to stable storage in
69 70 * the syncing state. While quiescing, transactions can continue their
70 71 * operation without delaying either of the other states. Typically, a txg is
71 72 * in the quiescing state very briefly since the operations are bounded by
72 73 * software latencies rather than, say, slower I/O latencies. After all
73 74 * transactions complete, the txg is ready to enter the next state.
74 75 *
75 76 * Syncing
76 77 *
77 78 * In the syncing state, the in-memory state built up during the open and (to
78 79 * a lesser degree) the quiescing states is written to stable storage. The
79 80 * process of writing out modified data can, in turn modify more data. For
80 81 * example when we write new blocks, we need to allocate space for them; those
81 82 * allocations modify metadata (space maps)... which themselves must be
82 83 * written to stable storage. During the sync state, ZFS iterates, writing out
83 84 * data until it converges and all in-memory changes have been written out.
84 85 * The first such pass is the largest as it encompasses all the modified user
85 86 * data (as opposed to filesystem metadata). Subsequent passes typically have
86 87 * far less data to write as they consist exclusively of filesystem metadata.
87 88 *
88 89 * To ensure convergence, after a certain number of passes ZFS begins
89 90 * overwriting locations on stable storage that had been allocated earlier in
90 91 * the syncing state (and subsequently freed). ZFS usually allocates new
91 92 * blocks to optimize for large, continuous, writes. For the syncing state to
92 93 * converge however it must complete a pass where no new blocks are allocated
93 94 * since each allocation requires a modification of persistent metadata.
94 95 * Further, to hasten convergence, after a prescribed number of passes, ZFS
95 96 * also defers frees, and stops compressing.
96 97 *
|
↓ open down ↓ |
62 lines elided |
↑ open up ↑ |
97 98 * In addition to writing out user data, we must also execute synctasks during
98 99 * the syncing context. A synctask is the mechanism by which some
99 100 * administrative activities work such as creating and destroying snapshots or
100 101 * datasets. Note that when a synctask is initiated it enters the open txg,
101 102 * and ZFS then pushes that txg as quickly as possible to completion of the
102 103 * syncing state in order to reduce the latency of the administrative
103 104 * activity. To complete the syncing state, ZFS writes out a new uberblock,
104 105 * the root of the tree of blocks that comprise all state stored on the ZFS
105 106 * pool. Finally, if there is a quiesced txg waiting, we signal that it can
106 107 * now transition to the syncing state.
108 + *
109 + * It is possible to register a callback for a TX, so the callback will be
110 + * called after sync of the corresponding TX-group to disk.
111 + * Required callback and its optional argument can registered by using
112 + * dmu_tx_callback_register().
113 + * All callback are executed async via taskq (see txg_dispatch_callbacks).
114 + * There are 2 possible cases when a registered callback is called:
115 + * 1) the corresponding TX is commited to disk (the first arg is 0)
116 + * 2) the corresponding TX is aborted (the first arg is ECANCELED)
107 117 */
108 118
109 119 static void txg_sync_thread(void *arg);
110 120 static void txg_quiesce_thread(void *arg);
111 121
112 122 int zfs_txg_timeout = 5; /* max seconds worth of delta per txg */
113 123
114 124 /*
115 125 * Prepare the txg subsystem.
116 126 */
117 127 void
118 128 txg_init(dsl_pool_t *dp, uint64_t txg)
119 129 {
120 130 tx_state_t *tx = &dp->dp_tx;
121 131 int c;
122 132 bzero(tx, sizeof (tx_state_t));
123 133
124 134 tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
125 135
126 136 for (c = 0; c < max_ncpus; c++) {
127 137 int i;
128 138
129 139 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
130 140 mutex_init(&tx->tx_cpu[c].tc_open_lock, NULL, MUTEX_DEFAULT,
131 141 NULL);
132 142 for (i = 0; i < TXG_SIZE; i++) {
133 143 cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
134 144 NULL);
135 145 list_create(&tx->tx_cpu[c].tc_callbacks[i],
136 146 sizeof (dmu_tx_callback_t),
137 147 offsetof(dmu_tx_callback_t, dcb_node));
138 148 }
139 149 }
140 150
141 151 mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
142 152
143 153 cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
144 154 cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
145 155 cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
146 156 cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
147 157 cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
148 158
149 159 tx->tx_open_txg = txg;
150 160 }
151 161
152 162 /*
153 163 * Close down the txg subsystem.
154 164 */
155 165 void
156 166 txg_fini(dsl_pool_t *dp)
157 167 {
158 168 tx_state_t *tx = &dp->dp_tx;
159 169 int c;
160 170
161 171 ASSERT0(tx->tx_threads);
162 172
163 173 mutex_destroy(&tx->tx_sync_lock);
164 174
165 175 cv_destroy(&tx->tx_sync_more_cv);
166 176 cv_destroy(&tx->tx_sync_done_cv);
167 177 cv_destroy(&tx->tx_quiesce_more_cv);
168 178 cv_destroy(&tx->tx_quiesce_done_cv);
169 179 cv_destroy(&tx->tx_exit_cv);
170 180
171 181 for (c = 0; c < max_ncpus; c++) {
172 182 int i;
173 183
174 184 mutex_destroy(&tx->tx_cpu[c].tc_open_lock);
175 185 mutex_destroy(&tx->tx_cpu[c].tc_lock);
176 186 for (i = 0; i < TXG_SIZE; i++) {
177 187 cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
178 188 list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
179 189 }
180 190 }
181 191
182 192 if (tx->tx_commit_cb_taskq != NULL)
183 193 taskq_destroy(tx->tx_commit_cb_taskq);
184 194
185 195 kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
186 196
187 197 bzero(tx, sizeof (tx_state_t));
188 198 }
189 199
190 200 /*
191 201 * Start syncing transaction groups.
192 202 */
193 203 void
194 204 txg_sync_start(dsl_pool_t *dp)
195 205 {
196 206 tx_state_t *tx = &dp->dp_tx;
197 207
198 208 mutex_enter(&tx->tx_sync_lock);
199 209
200 210 dprintf("pool %p\n", dp);
201 211
202 212 ASSERT0(tx->tx_threads);
203 213
204 214 tx->tx_threads = 2;
205 215
206 216 tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
207 217 dp, 0, &p0, TS_RUN, minclsyspri);
208 218
209 219 /*
210 220 * The sync thread can need a larger-than-default stack size on
211 221 * 32-bit x86. This is due in part to nested pools and
212 222 * scrub_visitbp() recursion.
213 223 */
214 224 tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
215 225 dp, 0, &p0, TS_RUN, minclsyspri);
216 226
217 227 mutex_exit(&tx->tx_sync_lock);
218 228 }
219 229
220 230 static void
221 231 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
222 232 {
223 233 CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
224 234 mutex_enter(&tx->tx_sync_lock);
225 235 }
226 236
227 237 static void
228 238 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
229 239 {
230 240 ASSERT(*tpp != NULL);
231 241 *tpp = NULL;
232 242 tx->tx_threads--;
233 243 cv_broadcast(&tx->tx_exit_cv);
234 244 CALLB_CPR_EXIT(cpr); /* drops &tx->tx_sync_lock */
235 245 thread_exit();
236 246 }
237 247
238 248 static void
239 249 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, clock_t time)
240 250 {
241 251 CALLB_CPR_SAFE_BEGIN(cpr);
242 252
243 253 if (time)
244 254 (void) cv_timedwait(cv, &tx->tx_sync_lock,
245 255 ddi_get_lbolt() + time);
246 256 else
247 257 cv_wait(cv, &tx->tx_sync_lock);
248 258
249 259 CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
250 260 }
251 261
252 262 /*
253 263 * Stop syncing transaction groups.
254 264 */
255 265 void
256 266 txg_sync_stop(dsl_pool_t *dp)
257 267 {
258 268 tx_state_t *tx = &dp->dp_tx;
259 269
260 270 dprintf("pool %p\n", dp);
261 271 /*
262 272 * Finish off any work in progress.
263 273 */
264 274 ASSERT3U(tx->tx_threads, ==, 2);
265 275
266 276 /*
267 277 * We need to ensure that we've vacated the deferred space_maps.
268 278 */
269 279 txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
270 280
271 281 /*
272 282 * Wake all sync threads and wait for them to die.
273 283 */
274 284 mutex_enter(&tx->tx_sync_lock);
275 285
276 286 ASSERT3U(tx->tx_threads, ==, 2);
277 287
278 288 tx->tx_exiting = 1;
279 289
280 290 cv_broadcast(&tx->tx_quiesce_more_cv);
281 291 cv_broadcast(&tx->tx_quiesce_done_cv);
282 292 cv_broadcast(&tx->tx_sync_more_cv);
283 293
284 294 while (tx->tx_threads != 0)
285 295 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
286 296
287 297 tx->tx_exiting = 0;
288 298
289 299 mutex_exit(&tx->tx_sync_lock);
290 300 }
291 301
292 302 uint64_t
293 303 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
294 304 {
295 305 tx_state_t *tx = &dp->dp_tx;
296 306 tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
297 307 uint64_t txg;
298 308
299 309 mutex_enter(&tc->tc_open_lock);
300 310 txg = tx->tx_open_txg;
301 311
302 312 mutex_enter(&tc->tc_lock);
303 313 tc->tc_count[txg & TXG_MASK]++;
304 314 mutex_exit(&tc->tc_lock);
305 315
306 316 th->th_cpu = tc;
307 317 th->th_txg = txg;
308 318
309 319 return (txg);
310 320 }
311 321
312 322 void
313 323 txg_rele_to_quiesce(txg_handle_t *th)
314 324 {
315 325 tx_cpu_t *tc = th->th_cpu;
316 326
317 327 ASSERT(!MUTEX_HELD(&tc->tc_lock));
318 328 mutex_exit(&tc->tc_open_lock);
319 329 }
320 330
321 331 void
|
↓ open down ↓ |
205 lines elided |
↑ open up ↑ |
322 332 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
323 333 {
324 334 tx_cpu_t *tc = th->th_cpu;
325 335 int g = th->th_txg & TXG_MASK;
326 336
327 337 mutex_enter(&tc->tc_lock);
328 338 list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
329 339 mutex_exit(&tc->tc_lock);
330 340 }
331 341
342 +/* This register function can be called only from sync-context */
332 343 void
344 +txg_register_callbacks_sync(dsl_pool_t *dp, uint64_t txg, list_t *tx_callbacks)
345 +{
346 + tx_state_t *tx = &dp->dp_tx;
347 + tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
348 + txg_handle_t th;
349 +
350 + VERIFY3U(tx->tx_syncing_txg, ==, txg);
351 +
352 + th.th_cpu = tc;
353 + th.th_txg = txg;
354 +
355 + txg_register_callbacks(&th, tx_callbacks);
356 +}
357 +
358 +void
333 359 txg_rele_to_sync(txg_handle_t *th)
334 360 {
335 361 tx_cpu_t *tc = th->th_cpu;
336 362 int g = th->th_txg & TXG_MASK;
337 363
338 364 mutex_enter(&tc->tc_lock);
339 365 ASSERT(tc->tc_count[g] != 0);
340 366 if (--tc->tc_count[g] == 0)
341 367 cv_broadcast(&tc->tc_cv[g]);
342 368 mutex_exit(&tc->tc_lock);
343 369
344 370 th->th_cpu = NULL; /* defensive */
345 371 }
346 372
347 373 /*
348 374 * Blocks until all transactions in the group are committed.
349 375 *
350 376 * On return, the transaction group has reached a stable state in which it can
351 377 * then be passed off to the syncing context.
352 378 */
353 379 static void
354 380 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
355 381 {
356 382 tx_state_t *tx = &dp->dp_tx;
357 383 int g = txg & TXG_MASK;
358 384 int c;
359 385
360 386 /*
361 387 * Grab all tc_open_locks so nobody else can get into this txg.
362 388 */
363 389 for (c = 0; c < max_ncpus; c++)
364 390 mutex_enter(&tx->tx_cpu[c].tc_open_lock);
365 391
366 392 ASSERT(txg == tx->tx_open_txg);
367 393 tx->tx_open_txg++;
368 394 tx->tx_open_time = gethrtime();
369 395
370 396 DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg);
371 397 DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg);
372 398
373 399 /*
374 400 * Now that we've incremented tx_open_txg, we can let threads
375 401 * enter the next transaction group.
376 402 */
377 403 for (c = 0; c < max_ncpus; c++)
378 404 mutex_exit(&tx->tx_cpu[c].tc_open_lock);
379 405
380 406 /*
381 407 * Quiesce the transaction group by waiting for everyone to txg_exit().
382 408 */
383 409 for (c = 0; c < max_ncpus; c++) {
384 410 tx_cpu_t *tc = &tx->tx_cpu[c];
385 411 mutex_enter(&tc->tc_lock);
386 412 while (tc->tc_count[g] != 0)
387 413 cv_wait(&tc->tc_cv[g], &tc->tc_lock);
388 414 mutex_exit(&tc->tc_lock);
389 415 }
390 416 }
391 417
392 418 static void
393 419 txg_do_callbacks(list_t *cb_list)
394 420 {
395 421 dmu_tx_do_callbacks(cb_list, 0);
396 422
397 423 list_destroy(cb_list);
398 424
399 425 kmem_free(cb_list, sizeof (list_t));
400 426 }
401 427
402 428 /*
403 429 * Dispatch the commit callbacks registered on this txg to worker threads.
404 430 *
405 431 * If no callbacks are registered for a given TXG, nothing happens.
406 432 * This function creates a taskq for the associated pool, if needed.
407 433 */
408 434 static void
409 435 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
410 436 {
411 437 int c;
412 438 tx_state_t *tx = &dp->dp_tx;
413 439 list_t *cb_list;
414 440
415 441 for (c = 0; c < max_ncpus; c++) {
416 442 tx_cpu_t *tc = &tx->tx_cpu[c];
417 443 /*
418 444 * No need to lock tx_cpu_t at this point, since this can
419 445 * only be called once a txg has been synced.
420 446 */
421 447
422 448 int g = txg & TXG_MASK;
423 449
424 450 if (list_is_empty(&tc->tc_callbacks[g]))
425 451 continue;
426 452
427 453 if (tx->tx_commit_cb_taskq == NULL) {
428 454 /*
429 455 * Commit callback taskq hasn't been created yet.
430 456 */
431 457 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
432 458 max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
433 459 TASKQ_PREPOPULATE);
434 460 }
435 461
436 462 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
|
↓ open down ↓ |
94 lines elided |
↑ open up ↑ |
437 463 list_create(cb_list, sizeof (dmu_tx_callback_t),
438 464 offsetof(dmu_tx_callback_t, dcb_node));
439 465
440 466 list_move_tail(cb_list, &tc->tc_callbacks[g]);
441 467
442 468 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
443 469 txg_do_callbacks, cb_list, TQ_SLEEP);
444 470 }
445 471 }
446 472
473 +static boolean_t
474 +txg_is_syncing(dsl_pool_t *dp)
475 +{
476 + tx_state_t *tx = &dp->dp_tx;
477 + ASSERT(MUTEX_HELD(&tx->tx_sync_lock));
478 + return (tx->tx_syncing_txg != 0);
479 +}
480 +
481 +static boolean_t
482 +txg_is_quiescing(dsl_pool_t *dp)
483 +{
484 + tx_state_t *tx = &dp->dp_tx;
485 + ASSERT(MUTEX_HELD(&tx->tx_sync_lock));
486 + return (tx->tx_quiescing_txg != 0);
487 +}
488 +
489 +static boolean_t
490 +txg_has_quiesced_to_sync(dsl_pool_t *dp)
491 +{
492 + tx_state_t *tx = &dp->dp_tx;
493 + ASSERT(MUTEX_HELD(&tx->tx_sync_lock));
494 + return (tx->tx_quiesced_txg != 0);
495 +}
496 +
447 497 static void
448 498 txg_sync_thread(void *arg)
449 499 {
450 500 dsl_pool_t *dp = arg;
451 501 spa_t *spa = dp->dp_spa;
452 502 tx_state_t *tx = &dp->dp_tx;
453 503 callb_cpr_t cpr;
454 504 uint64_t start, delta;
455 505
456 506 txg_thread_enter(tx, &cpr);
457 507
458 508 start = delta = 0;
459 509 for (;;) {
460 510 uint64_t timeout = zfs_txg_timeout * hz;
461 511 uint64_t timer;
462 512 uint64_t txg;
|
↓ open down ↓ |
6 lines elided |
↑ open up ↑ |
463 513
464 514 /*
465 515 * We sync when we're scanning, there's someone waiting
466 516 * on us, or the quiesce thread has handed off a txg to
467 517 * us, or we have reached our timeout.
468 518 */
469 519 timer = (delta >= timeout ? 0 : timeout - delta);
470 520 while (!dsl_scan_active(dp->dp_scan) &&
471 521 !tx->tx_exiting && timer > 0 &&
472 522 tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
473 - tx->tx_quiesced_txg == 0 &&
523 + !txg_has_quiesced_to_sync(dp) &&
474 524 dp->dp_dirty_total < zfs_dirty_data_sync) {
475 525 dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
476 526 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
477 527 txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
478 528 delta = ddi_get_lbolt() - start;
479 529 timer = (delta > timeout ? 0 : timeout - delta);
480 530 }
481 531
482 532 /*
483 533 * Wait until the quiesce thread hands off a txg to us,
484 534 * prompting it to do so if necessary.
485 535 */
486 - while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
536 + while (!tx->tx_exiting && !txg_has_quiesced_to_sync(dp)) {
487 537 if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
488 538 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
489 539 cv_broadcast(&tx->tx_quiesce_more_cv);
490 540 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
491 541 }
492 542
493 543 if (tx->tx_exiting)
494 544 txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
495 545
496 546 /*
497 547 * Consume the quiesced txg which has been handed off to
498 548 * us. This may cause the quiescing thread to now be
499 549 * able to quiesce another txg, so we must signal it.
500 550 */
551 + ASSERT(tx->tx_quiesced_txg != 0);
501 552 txg = tx->tx_quiesced_txg;
502 553 tx->tx_quiesced_txg = 0;
503 554 tx->tx_syncing_txg = txg;
504 555 DTRACE_PROBE2(txg__syncing, dsl_pool_t *, dp, uint64_t, txg);
505 556 cv_broadcast(&tx->tx_quiesce_more_cv);
506 557
507 558 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
508 559 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
509 560 mutex_exit(&tx->tx_sync_lock);
510 561
511 562 start = ddi_get_lbolt();
512 563 spa_sync(spa, txg);
513 564 delta = ddi_get_lbolt() - start;
514 565
515 566 mutex_enter(&tx->tx_sync_lock);
516 567 tx->tx_synced_txg = txg;
517 568 tx->tx_syncing_txg = 0;
518 569 DTRACE_PROBE2(txg__synced, dsl_pool_t *, dp, uint64_t, txg);
519 570 cv_broadcast(&tx->tx_sync_done_cv);
520 571
521 572 /*
522 573 * Dispatch commit callbacks to worker threads.
523 574 */
524 575 txg_dispatch_callbacks(dp, txg);
525 576 }
526 577 }
527 578
528 579 static void
529 580 txg_quiesce_thread(void *arg)
530 581 {
531 582 dsl_pool_t *dp = arg;
532 583 tx_state_t *tx = &dp->dp_tx;
533 584 callb_cpr_t cpr;
534 585
535 586 txg_thread_enter(tx, &cpr);
536 587
537 588 for (;;) {
538 589 uint64_t txg;
|
↓ open down ↓ |
28 lines elided |
↑ open up ↑ |
539 590
540 591 /*
541 592 * We quiesce when there's someone waiting on us.
542 593 * However, we can only have one txg in "quiescing" or
543 594 * "quiesced, waiting to sync" state. So we wait until
544 595 * the "quiesced, waiting to sync" txg has been consumed
545 596 * by the sync thread.
546 597 */
547 598 while (!tx->tx_exiting &&
548 599 (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
549 - tx->tx_quiesced_txg != 0))
600 + txg_has_quiesced_to_sync(dp)))
550 601 txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
551 602
552 603 if (tx->tx_exiting)
553 604 txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
554 605
555 606 txg = tx->tx_open_txg;
556 607 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
557 608 txg, tx->tx_quiesce_txg_waiting,
558 609 tx->tx_sync_txg_waiting);
610 + tx->tx_quiescing_txg = txg;
611 +
559 612 mutex_exit(&tx->tx_sync_lock);
560 613 txg_quiesce(dp, txg);
561 614 mutex_enter(&tx->tx_sync_lock);
562 615
563 616 /*
564 617 * Hand this txg off to the sync thread.
565 618 */
566 619 dprintf("quiesce done, handing off txg %llu\n", txg);
620 + tx->tx_quiescing_txg = 0;
567 621 tx->tx_quiesced_txg = txg;
568 622 DTRACE_PROBE2(txg__quiesced, dsl_pool_t *, dp, uint64_t, txg);
569 623 cv_broadcast(&tx->tx_sync_more_cv);
570 624 cv_broadcast(&tx->tx_quiesce_done_cv);
571 625 }
572 626 }
573 627
574 628 /*
575 629 * Delay this thread by delay nanoseconds if we are still in the open
576 630 * transaction group and there is already a waiting txg quiescing or quiesced.
577 631 * Abort the delay if this txg stalls or enters the quiescing state.
578 632 */
579 633 void
580 634 txg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution)
581 635 {
582 636 tx_state_t *tx = &dp->dp_tx;
583 637 hrtime_t start = gethrtime();
584 638
585 639 /* don't delay if this txg could transition to quiescing immediately */
586 640 if (tx->tx_open_txg > txg ||
587 641 tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
588 642 return;
589 643
590 644 mutex_enter(&tx->tx_sync_lock);
591 645 if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
592 646 mutex_exit(&tx->tx_sync_lock);
593 647 return;
594 648 }
595 649
596 650 while (gethrtime() - start < delay &&
597 651 tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) {
598 652 (void) cv_timedwait_hires(&tx->tx_quiesce_more_cv,
599 653 &tx->tx_sync_lock, delay, resolution, 0);
600 654 }
601 655
602 656 mutex_exit(&tx->tx_sync_lock);
603 657 }
604 658
605 659 void
606 660 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
607 661 {
608 662 tx_state_t *tx = &dp->dp_tx;
609 663
610 664 ASSERT(!dsl_pool_config_held(dp));
611 665
612 666 mutex_enter(&tx->tx_sync_lock);
613 667 ASSERT3U(tx->tx_threads, ==, 2);
614 668 if (txg == 0)
615 669 txg = tx->tx_open_txg + TXG_DEFER_SIZE;
616 670 if (tx->tx_sync_txg_waiting < txg)
617 671 tx->tx_sync_txg_waiting = txg;
618 672 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
619 673 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
620 674 while (tx->tx_synced_txg < txg) {
621 675 dprintf("broadcasting sync more "
622 676 "tx_synced=%llu waiting=%llu dp=%p\n",
623 677 tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
624 678 cv_broadcast(&tx->tx_sync_more_cv);
625 679 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
626 680 }
627 681 mutex_exit(&tx->tx_sync_lock);
628 682 }
629 683
630 684 void
631 685 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
632 686 {
633 687 tx_state_t *tx = &dp->dp_tx;
634 688
635 689 ASSERT(!dsl_pool_config_held(dp));
636 690
637 691 mutex_enter(&tx->tx_sync_lock);
638 692 ASSERT3U(tx->tx_threads, ==, 2);
639 693 if (txg == 0)
640 694 txg = tx->tx_open_txg + 1;
641 695 if (tx->tx_quiesce_txg_waiting < txg)
642 696 tx->tx_quiesce_txg_waiting = txg;
643 697 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
644 698 txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
645 699 while (tx->tx_open_txg < txg) {
646 700 cv_broadcast(&tx->tx_quiesce_more_cv);
647 701 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
648 702 }
649 703 mutex_exit(&tx->tx_sync_lock);
650 704 }
651 705
652 706 /*
653 707 * If there isn't a txg syncing or in the pipeline, push another txg through
|
↓ open down ↓ |
77 lines elided |
↑ open up ↑ |
654 708 * the pipeline by queiscing the open txg.
655 709 */
656 710 void
657 711 txg_kick(dsl_pool_t *dp)
658 712 {
659 713 tx_state_t *tx = &dp->dp_tx;
660 714
661 715 ASSERT(!dsl_pool_config_held(dp));
662 716
663 717 mutex_enter(&tx->tx_sync_lock);
664 - if (tx->tx_syncing_txg == 0 &&
718 + if (!txg_is_syncing(dp) &&
719 + !txg_is_quiescing(dp) &&
665 720 tx->tx_quiesce_txg_waiting <= tx->tx_open_txg &&
666 721 tx->tx_sync_txg_waiting <= tx->tx_synced_txg &&
667 722 tx->tx_quiesced_txg <= tx->tx_synced_txg) {
668 723 tx->tx_quiesce_txg_waiting = tx->tx_open_txg + 1;
669 724 cv_broadcast(&tx->tx_quiesce_more_cv);
670 725 }
671 726 mutex_exit(&tx->tx_sync_lock);
672 727 }
673 728
674 729 boolean_t
675 730 txg_stalled(dsl_pool_t *dp)
676 731 {
677 732 tx_state_t *tx = &dp->dp_tx;
678 733 return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
679 734 }
680 735
681 736 boolean_t
682 737 txg_sync_waiting(dsl_pool_t *dp)
683 738 {
684 739 tx_state_t *tx = &dp->dp_tx;
685 740
686 741 return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
687 742 tx->tx_quiesced_txg != 0);
688 743 }
689 744
690 745 /*
691 746 * Verify that this txg is active (open, quiescing, syncing). Non-active
692 747 * txg's should not be manipulated.
693 748 */
694 749 void
695 750 txg_verify(spa_t *spa, uint64_t txg)
696 751 {
697 752 dsl_pool_t *dp = spa_get_dsl(spa);
698 753 if (txg <= TXG_INITIAL || txg == ZILTEST_TXG)
699 754 return;
700 755 ASSERT3U(txg, <=, dp->dp_tx.tx_open_txg);
701 756 ASSERT3U(txg, >=, dp->dp_tx.tx_synced_txg);
702 757 ASSERT3U(txg, >=, dp->dp_tx.tx_open_txg - TXG_CONCURRENT_STATES);
703 758 }
704 759
705 760 /*
706 761 * Per-txg object lists.
707 762 */
708 763 void
709 764 txg_list_create(txg_list_t *tl, spa_t *spa, size_t offset)
710 765 {
711 766 int t;
712 767
713 768 mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
714 769
715 770 tl->tl_offset = offset;
716 771 tl->tl_spa = spa;
717 772
718 773 for (t = 0; t < TXG_SIZE; t++)
719 774 tl->tl_head[t] = NULL;
720 775 }
721 776
722 777 void
723 778 txg_list_destroy(txg_list_t *tl)
724 779 {
725 780 int t;
726 781
727 782 for (t = 0; t < TXG_SIZE; t++)
728 783 ASSERT(txg_list_empty(tl, t));
729 784
730 785 mutex_destroy(&tl->tl_lock);
731 786 }
732 787
733 788 boolean_t
734 789 txg_list_empty(txg_list_t *tl, uint64_t txg)
735 790 {
736 791 txg_verify(tl->tl_spa, txg);
737 792 return (tl->tl_head[txg & TXG_MASK] == NULL);
738 793 }
739 794
740 795 /*
741 796 * Returns true if all txg lists are empty.
742 797 *
743 798 * Warning: this is inherently racy (an item could be added immediately
744 799 * after this function returns). We don't bother with the lock because
745 800 * it wouldn't change the semantics.
746 801 */
747 802 boolean_t
748 803 txg_all_lists_empty(txg_list_t *tl)
749 804 {
750 805 for (int i = 0; i < TXG_SIZE; i++) {
751 806 if (!txg_list_empty(tl, i)) {
752 807 return (B_FALSE);
753 808 }
754 809 }
755 810 return (B_TRUE);
756 811 }
757 812
758 813 /*
759 814 * Add an entry to the list (unless it's already on the list).
760 815 * Returns B_TRUE if it was actually added.
761 816 */
762 817 boolean_t
763 818 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
764 819 {
765 820 int t = txg & TXG_MASK;
766 821 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
767 822 boolean_t add;
768 823
769 824 txg_verify(tl->tl_spa, txg);
770 825 mutex_enter(&tl->tl_lock);
771 826 add = (tn->tn_member[t] == 0);
772 827 if (add) {
773 828 tn->tn_member[t] = 1;
774 829 tn->tn_next[t] = tl->tl_head[t];
775 830 tl->tl_head[t] = tn;
776 831 }
777 832 mutex_exit(&tl->tl_lock);
778 833
779 834 return (add);
780 835 }
781 836
782 837 /*
783 838 * Add an entry to the end of the list, unless it's already on the list.
784 839 * (walks list to find end)
785 840 * Returns B_TRUE if it was actually added.
786 841 */
787 842 boolean_t
788 843 txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
789 844 {
790 845 int t = txg & TXG_MASK;
791 846 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
792 847 boolean_t add;
793 848
794 849 txg_verify(tl->tl_spa, txg);
795 850 mutex_enter(&tl->tl_lock);
796 851 add = (tn->tn_member[t] == 0);
797 852 if (add) {
798 853 txg_node_t **tp;
799 854
800 855 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
801 856 continue;
802 857
803 858 tn->tn_member[t] = 1;
804 859 tn->tn_next[t] = NULL;
805 860 *tp = tn;
806 861 }
807 862 mutex_exit(&tl->tl_lock);
808 863
809 864 return (add);
810 865 }
811 866
812 867 /*
813 868 * Remove the head of the list and return it.
814 869 */
|
↓ open down ↓ |
140 lines elided |
↑ open up ↑ |
815 870 void *
816 871 txg_list_remove(txg_list_t *tl, uint64_t txg)
817 872 {
818 873 int t = txg & TXG_MASK;
819 874 txg_node_t *tn;
820 875 void *p = NULL;
821 876
822 877 txg_verify(tl->tl_spa, txg);
823 878 mutex_enter(&tl->tl_lock);
824 879 if ((tn = tl->tl_head[t]) != NULL) {
825 - ASSERT(tn->tn_member[t]);
826 - ASSERT(tn->tn_next[t] == NULL || tn->tn_next[t]->tn_member[t]);
827 880 p = (char *)tn - tl->tl_offset;
828 881 tl->tl_head[t] = tn->tn_next[t];
829 882 tn->tn_next[t] = NULL;
830 883 tn->tn_member[t] = 0;
831 884 }
832 885 mutex_exit(&tl->tl_lock);
833 886
834 887 return (p);
835 888 }
836 889
837 890 /*
838 891 * Remove a specific item from the list and return it.
839 892 */
840 893 void *
841 894 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
842 895 {
843 896 int t = txg & TXG_MASK;
844 897 txg_node_t *tn, **tp;
845 898
846 899 txg_verify(tl->tl_spa, txg);
847 900 mutex_enter(&tl->tl_lock);
848 901
849 902 for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
850 903 if ((char *)tn - tl->tl_offset == p) {
851 904 *tp = tn->tn_next[t];
852 905 tn->tn_next[t] = NULL;
853 906 tn->tn_member[t] = 0;
854 907 mutex_exit(&tl->tl_lock);
855 908 return (p);
856 909 }
857 910 }
858 911
859 912 mutex_exit(&tl->tl_lock);
860 913
861 914 return (NULL);
862 915 }
863 916
864 917 boolean_t
865 918 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
866 919 {
867 920 int t = txg & TXG_MASK;
868 921 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
869 922
870 923 txg_verify(tl->tl_spa, txg);
871 924 return (tn->tn_member[t] != 0);
872 925 }
873 926
874 927 /*
875 928 * Walk a txg list -- only safe if you know it's not changing.
876 929 */
877 930 void *
878 931 txg_list_head(txg_list_t *tl, uint64_t txg)
879 932 {
880 933 int t = txg & TXG_MASK;
881 934 txg_node_t *tn = tl->tl_head[t];
882 935
883 936 txg_verify(tl->tl_spa, txg);
884 937 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
885 938 }
886 939
887 940 void *
888 941 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
889 942 {
890 943 int t = txg & TXG_MASK;
891 944 txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
892 945
893 946 txg_verify(tl->tl_spa, txg);
894 947 tn = tn->tn_next[t];
895 948
896 949 return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
897 950 }
|
↓ open down ↓ |
61 lines elided |
↑ open up ↑ |
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX