1 /*
   2  * This file and its contents are supplied under the terms of the
   3  * Common Development and Distribution License ("CDDL"), version 1.0.
   4  * You may only use this file in accordance with the terms of version
   5  * 1.0 of the CDDL.
   6  *
   7  * A full copy of the text of the CDDL should have accompanied this
   8  * source.  A copy of the CDDL is also available via the Internet at
   9  * http://www.illumos.org/license/CDDL.
  10  */
  11 
  12 /*
  13  * Copyright 2018 Nexenta Systems, Inc.  All rights reserved.
  14  */
  15 
  16 #include <sys/spa.h>
  17 #include <sys/autosnap.h>
  18 #include <sys/dmu_objset.h>
  19 #include <sys/dsl_dataset.h>
  20 #include <sys/dsl_dir.h>
  21 #include <sys/dsl_destroy.h>
  22 #include <sys/zfs_ioctl.h>
  23 #include <sys/unique.h>
  24 #include <sys/ctype.h>
  25 
  26 static void autosnap_notify_created(const char *name, uint64_t txg,
  27     autosnap_zone_t *zone);
  28 static void autosnap_reject_snap(const char *name, uint64_t txg,
  29     zfs_autosnap_t *autosnap);
  30 
  31 typedef struct {
  32         autosnap_handler_t *hdl;
  33         list_node_t node;
  34 } autosnap_ref_t;
  35 
  36 typedef struct {
  37         autosnap_zone_t *azone;
  38         dsl_sync_task_t *dst;
  39 } autosnap_commit_cb_arg_t;
  40 
  41 static void
  42 autosnap_refcount_add(list_t *ref_cnt,
  43     autosnap_handler_t *owner)
  44 {
  45         autosnap_ref_t *ref;
  46 
  47         ref = kmem_alloc(sizeof (autosnap_ref_t), KM_SLEEP);
  48         ref->hdl = owner;
  49         list_insert_tail(ref_cnt, ref);
  50 }
  51 
  52 static void
  53 autosnap_refcount_remove(list_t *ref_cnt,
  54     autosnap_handler_t *owner)
  55 {
  56         autosnap_ref_t *ref;
  57 
  58         ASSERT(!list_is_empty(ref_cnt));
  59 
  60         for (ref = list_head(ref_cnt); ref != NULL;
  61             ref = list_next(ref_cnt, ref)) {
  62                 if (ref->hdl == owner) {
  63                         list_remove(ref_cnt, ref);
  64                         kmem_free(ref, sizeof (autosnap_ref_t));
  65 
  66                         return;
  67                 }
  68         }
  69 
  70         /*
  71          * FIXME: After merge of latest illumos code
  72          * this will be removed with all autosnap_refcount_*
  73          * All autosnap_refcount_*() calls will be replaced by
  74          * the corresponding ref_counter_*()
  75          */
  76         panic("No such hold %p", (void *)owner);
  77 }
  78 
  79 static void
  80 autosnap_refcount_remove_all(list_t *ref_cnt)
  81 {
  82         autosnap_ref_t *ref;
  83 
  84         while ((ref = list_head(ref_cnt)) != NULL) {
  85                 list_remove(ref_cnt, ref);
  86                 kmem_free(ref, sizeof (autosnap_ref_t));
  87         }
  88 }
  89 
  90 static boolean_t
  91 autosnap_refcount_held(list_t *ref_cnt,
  92     autosnap_handler_t *owner)
  93 {
  94         autosnap_ref_t *ref;
  95 
  96         for (ref = list_head(ref_cnt); ref != NULL;
  97             ref = list_next(ref_cnt, ref)) {
  98                 if (ref->hdl == owner)
  99                         return (B_TRUE);
 100         }
 101 
 102         return (B_FALSE);
 103 }
 104 
 105 static boolean_t
 106 autosnap_refcount_is_zero(list_t *ref_cnt)
 107 {
 108         return (list_is_empty(ref_cnt));
 109 }
 110 
 111 /* AUTOSNAP-recollect routines */
 112 
 113 static autosnap_snapshot_t *
 114 autosnap_create_snap_node(const char *snap_name, uint64_t txg,
 115     uint64_t etxg, boolean_t recursive, boolean_t orphaned)
 116 {
 117         autosnap_snapshot_t *snap_node;
 118 
 119         snap_node = kmem_zalloc(sizeof (autosnap_snapshot_t), KM_SLEEP);
 120 
 121         (void) strlcpy(snap_node->name, snap_name, sizeof (snap_node->name));
 122         snap_node->recursive = recursive;
 123         snap_node->txg = txg;
 124         snap_node->etxg = etxg;
 125         snap_node->orphaned = orphaned;
 126 
 127         list_create(&snap_node->ref_cnt,
 128             sizeof (autosnap_ref_t),
 129             offsetof(autosnap_ref_t, node));
 130 
 131         return (snap_node);
 132 }
 133 
 134 /*
 135  * Callback for dmu_objset_find_dp().
 136  * This function is called for all DSs, but processes only
 137  * autosnaps.
 138  *
 139  * The constructed autosnap-structure is marked as "orphaned" and
 140  * placed to common AVL of autosnap
 141  */
 142 /* ARGSUSED */
 143 static int
 144 autosnap_collect_orphaned_snapshots_cb(dsl_pool_t *dp,
 145     dsl_dataset_t *ds, void *arg)
 146 {
 147         autosnap_zone_t *zone = arg;
 148         char snap_name[ZFS_MAX_DATASET_NAME_LEN];
 149         autosnap_snapshot_t *snap_node;
 150         uint64_t txg;
 151 
 152         if (!ds->ds_is_snapshot)
 153                 return (0);
 154 
 155         dsl_dataset_name(ds, snap_name);
 156         if (!autosnap_check_name(strchr(snap_name, '@')))
 157                 return (0);
 158 
 159         txg = dsl_dataset_phys(ds)->ds_creation_txg;
 160         snap_node = autosnap_create_snap_node(snap_name,
 161             txg, txg, B_FALSE, B_TRUE);
 162 
 163         mutex_enter(&zone->avl_lock);
 164         avl_add(&zone->snapshots, snap_node);
 165         mutex_exit(&zone->avl_lock);
 166 
 167         return (0);
 168 }
 169 
 170 /*
 171  * Collect orphaned snapshots for given "ds_name" and all its
 172  * children if recursive is TRUE
 173  *
 174  * This function is called during registration of an autosnap-listener
 175  * The registration process can be initiated by
 176  *    - WBC that restores configuration when ZFS activates a pool
 177  *    - an user that has enabled WBC or KRRP for a dataset
 178  */
 179 static void
 180 autosnap_collect_orphaned_snapshots(spa_t *spa, autosnap_zone_t *zone)
 181 {
 182         int flags = DS_FIND_SNAPSHOTS;
 183         dsl_pool_t *dp = spa_get_dsl(spa);
 184         dsl_dataset_t *ds = NULL;
 185         uint64_t dd_object;
 186         boolean_t held;
 187 
 188 
 189         /*
 190          * If the top-level caller is ZFS that activates
 191          * the given pool, then the pool's config already held
 192          */
 193         held = dsl_pool_config_held(dp);
 194         if (!held)
 195                 dsl_pool_config_enter(dp, FTAG);
 196 
 197         if (dsl_dataset_hold(dp, zone->dataset, FTAG, &ds) != 0)
 198                 goto out;
 199 
 200         dd_object = ds->ds_dir->dd_object;
 201         dsl_dataset_rele(ds, FTAG);
 202 
 203         if ((zone->flags & AUTOSNAP_RECURSIVE) != 0)
 204                 flags |= DS_FIND_CHILDREN;
 205 
 206         VERIFY0(dmu_objset_find_dp(spa_get_dsl(spa), dd_object,
 207             autosnap_collect_orphaned_snapshots_cb, zone, flags));
 208 
 209 out:
 210         if (!held)
 211                 dsl_pool_config_exit(dp, FTAG);
 212 }
 213 
 214 /*
 215  * Return list of the snapshots which are owned by the caller
 216  * The function is used to reclaim orphaned snapshots
 217  */
 218 nvlist_t *
 219 autosnap_get_owned_snapshots(void *opaque)
 220 {
 221         nvlist_t *dup;
 222         autosnap_snapshot_t *snap;
 223         autosnap_handler_t *hdl = opaque;
 224         autosnap_zone_t *zone = hdl->zone;
 225         zfs_autosnap_t *autosnap = zone->autosnap;
 226 
 227         if (!(hdl->flags & AUTOSNAP_OWNER))
 228                 return (NULL);
 229 
 230         mutex_enter(&autosnap->autosnap_lock);
 231 
 232         dup = fnvlist_alloc();
 233 
 234         /* iterate though snapshots and find requested */
 235         for (snap = avl_first(&zone->snapshots);
 236             snap != NULL;
 237             snap = AVL_NEXT(&zone->snapshots, snap)) {
 238                 char ds_name[ZFS_MAX_DATASET_NAME_LEN];
 239                 uint64_t data[2];
 240 
 241                 if (!snap->orphaned)
 242                         continue;
 243 
 244                 (void) strlcpy(ds_name, snap->name, sizeof (ds_name));
 245                 *(strchr(ds_name, '@')) = '\0';
 246 
 247                 if (strcmp(ds_name, zone->dataset) != 0)
 248                         continue;
 249 
 250                 data[0] = snap->txg;
 251                 data[1] = snap->recursive;
 252 
 253                 fnvlist_add_uint64_array(dup, snap->name, data, 2);
 254                 snap->orphaned = B_FALSE;
 255         }
 256 
 257         mutex_exit(&autosnap->autosnap_lock);
 258 
 259         return (dup);
 260 }
 261 
 262 /*
 263  * Insert owners handler to snapshots
 264  */
 265 static void
 266 autosnap_claim_orphaned_snaps(autosnap_handler_t *hdl)
 267 {
 268         autosnap_zone_t *zone = hdl->zone;
 269         autosnap_snapshot_t *snap, *r_snap = NULL;
 270 
 271         ASSERT(MUTEX_HELD(&zone->autosnap->autosnap_lock));
 272 
 273         snap = avl_first(&zone->snapshots);
 274 
 275         while (snap != NULL) {
 276                 char ds_name[ZFS_MAX_DATASET_NAME_LEN];
 277                 autosnap_snapshot_t *next_snap =
 278                     AVL_NEXT(&zone->snapshots, snap);
 279 
 280                 if (snap->orphaned) {
 281                         (void) strlcpy(ds_name, snap->name, sizeof (ds_name));
 282                         *(strchr(ds_name, '@')) = '\0';
 283 
 284                         if (strcmp(ds_name, zone->dataset) == 0) {
 285                                 autosnap_refcount_add(&snap->ref_cnt, hdl);
 286                                 r_snap = snap;
 287                         } else if (strncmp(ds_name,
 288                             zone->dataset, strlen(zone->dataset)) == 0 &&
 289                             (hdl->flags & AUTOSNAP_RECURSIVE) &&
 290                             r_snap != NULL) {
 291                                 avl_remove(&zone->snapshots, snap);
 292                                 kmem_free(snap, sizeof (autosnap_snapshot_t));
 293                                 r_snap->recursive = B_TRUE;
 294                         }
 295                 }
 296 
 297                 snap = next_snap;
 298         }
 299 }
 300 
 301 /* AUTOSNAP_RELE routines */
 302 
 303 static void
 304 autosnap_release_snapshots_by_txg_no_lock_impl(autosnap_handler_t *hdl,
 305     uint64_t from_txg, uint64_t to_txg, boolean_t destroy)
 306 {
 307         autosnap_zone_t *zone = hdl->zone;
 308         zfs_autosnap_t *autosnap = zone->autosnap;
 309         avl_index_t where;
 310         int search_len;
 311 
 312         ASSERT(MUTEX_HELD(&autosnap->autosnap_lock));
 313 
 314         autosnap_snapshot_t search = { 0 };
 315         autosnap_snapshot_t *walker, *prev;
 316 
 317         search.txg = from_txg;
 318         (void) strlcpy(search.name, zone->dataset, sizeof (search.name));
 319         search_len = strlen(search.name);
 320         walker = avl_find(&zone->snapshots, &search, &where);
 321 
 322         if (walker == NULL) {
 323                 walker = avl_nearest(&zone->snapshots,
 324                     where, AVL_AFTER);
 325         }
 326 
 327         if (walker == NULL)
 328                 return;
 329 
 330         /* if we specifies only one txg then it must be present */
 331         if (to_txg == AUTOSNAP_NO_SNAP && walker->txg != from_txg)
 332                 return;
 333 
 334         if (walker->txg < from_txg)
 335                 walker = AVL_NEXT(&zone->snapshots, walker);
 336 
 337         if (walker->txg > to_txg)
 338                 return;
 339 
 340         if (to_txg == AUTOSNAP_NO_SNAP)
 341                 to_txg = from_txg;
 342 
 343         /* iterate over the specified range */
 344         do {
 345                 boolean_t exact, pref, held = B_FALSE;
 346 
 347                 if (strncmp(search.name, walker->name, search_len) == 0) {
 348                         exact = (walker->name[search_len] == '@');
 349                         pref = (walker->name[search_len] == '/');
 350 
 351                         if (exact ||
 352                             (pref &&
 353                             (zone->flags & AUTOSNAP_RECURSIVE) != 0)) {
 354                                 held = autosnap_refcount_held(
 355                                     &walker->ref_cnt, hdl);
 356                         }
 357                 }
 358 
 359                 prev = walker;
 360 
 361                 walker = AVL_NEXT(&zone->snapshots, walker);
 362 
 363                 /*
 364                  * If client holds reference to the snapshot
 365                  * then remove it
 366                  */
 367                 if (held) {
 368                         autosnap_refcount_remove(&prev->ref_cnt, hdl);
 369 
 370                         /*
 371                          * If it is the last reference and autosnap should
 372                          * not be destroyed then just free the structure.
 373                          * Otherwise put it on the destroyer's queue.
 374                          */
 375                         if (autosnap_refcount_is_zero(&prev->ref_cnt)) {
 376                                 avl_remove(&zone->snapshots, prev);
 377                                 if (!destroy) {
 378                                         kmem_free(prev,
 379                                             sizeof (autosnap_snapshot_t));
 380                                 } else {
 381                                         list_insert_tail(
 382                                             &autosnap->autosnap_destroy_queue,
 383                                             prev);
 384                                         cv_broadcast(&autosnap->autosnap_cv);
 385                                 }
 386                         }
 387                 }
 388 
 389         } while (walker != NULL && walker->txg <= to_txg);
 390 }
 391 
 392 /* No lock version should be used from autosnap callbacks */
 393 void
 394 autosnap_release_snapshots_by_txg_no_lock(void *opaque,
 395     uint64_t from_txg, uint64_t to_txg)
 396 {
 397         autosnap_handler_t *hdl = opaque;
 398 
 399         autosnap_release_snapshots_by_txg_no_lock_impl(hdl,
 400             from_txg, to_txg, B_TRUE);
 401 }
 402 
 403 /*
 404  * Release snapshot and remove a handler from it
 405  */
 406 void
 407 autosnap_release_snapshots_by_txg(void *opaque,
 408     uint64_t from_txg, uint64_t to_txg)
 409 {
 410         autosnap_handler_t *hdl = opaque;
 411         autosnap_zone_t *zone = hdl->zone;
 412         mutex_enter(&zone->autosnap->autosnap_lock);
 413         autosnap_release_snapshots_by_txg_no_lock_impl(hdl,
 414             from_txg, to_txg, B_TRUE);
 415         mutex_exit(&zone->autosnap->autosnap_lock);
 416 }
 417 
 418 static int
 419 snapshot_txg_compare(const void *arg1, const void *arg2)
 420 {
 421         const autosnap_snapshot_t *snap1 = arg1;
 422         const autosnap_snapshot_t *snap2 = arg2;
 423 
 424         if (snap1->txg < snap2->txg) {
 425                 return (-1);
 426         } else if (snap1->txg == snap2->txg) {
 427                 int res = 0;
 428                 int l1 = strlen(snap1->name);
 429                 int l2 = strlen(snap2->name);
 430                 int i;
 431 
 432                 /* we need our own strcmp to ensure depth-first order */
 433                 for (i = 0; i <= MIN(l1, l2); i++) {
 434                         char c1 = snap1->name[i];
 435                         char c2 = snap2->name[i];
 436 
 437                         if (c1 != c2) {
 438                                 if (c1 == '\0') {
 439                                         res = -1;
 440                                 } else if (c2 == '\0') {
 441                                         res = +1;
 442                                 } else if (c1 == '@') {
 443                                         res = -1;
 444                                 } else if (c2 == '@') {
 445                                         res = +1;
 446                                 } else if (c1 == '/') {
 447                                         res = -1;
 448                                 } else if (c2 == '/') {
 449                                         res = +1;
 450                                 } else if (c1 < c2) {
 451                                         res = -1;
 452                                 } else {
 453                                         res = +1;
 454                                 }
 455                                 break;
 456                         }
 457                 }
 458 
 459                 if (res < 0) {
 460                         return (-1);
 461                 } else if (res > 0) {
 462                         return (+1);
 463                 } else {
 464                         return (0);
 465                 }
 466         } else {
 467                 return (+1);
 468         }
 469 }
 470 
 471 /* AUTOSNAP-HDL routines */
 472 
 473 void *
 474 autosnap_register_handler_impl(spa_t *spa,
 475     const char *name, uint64_t flags,
 476     autosnap_confirm_cb confirm_cb,
 477     autosnap_notify_created_cb nc_cb,
 478     autosnap_error_cb err_cb, void *cb_arg)
 479 {
 480         zfs_autosnap_t *autosnap = spa_get_autosnap(spa);
 481         autosnap_handler_t *hdl = NULL;
 482         autosnap_zone_t *zone, *rzone;
 483         boolean_t children_have_zone;
 484 
 485 
 486         mutex_enter(&autosnap->autosnap_lock);
 487         while (autosnap->register_busy) {
 488                 (void) cv_wait(&autosnap->autosnap_cv,
 489                     &autosnap->autosnap_lock);
 490         }
 491 
 492         zone = autosnap_find_zone(autosnap, name, B_FALSE);
 493         rzone = autosnap_find_zone(autosnap, name, B_TRUE);
 494 
 495         children_have_zone =
 496             autosnap_has_children_zone(autosnap, name, B_FALSE);
 497 
 498         if (rzone && !zone) {
 499                 cmn_err(CE_WARN, "AUTOSNAP: the dataset is already under"
 500                     " an autosnap zone [%s under %s]\n",
 501                     name, rzone->dataset);
 502                 goto out;
 503         } else if (children_have_zone && (flags & AUTOSNAP_RECURSIVE)) {
 504                 cmn_err(CE_WARN, "AUTOSNAP: can't register recursive zone"
 505                     " when there is a child under autosnap%s\n",
 506                     name);
 507                 goto out;
 508         }
 509 
 510         /* Create a new zone if it is absent */
 511         if (zone == NULL) {
 512                 zone = kmem_zalloc(sizeof (autosnap_zone_t), KM_SLEEP);
 513                 (void) strlcpy(zone->dataset, name, sizeof (zone->dataset));
 514 
 515                 mutex_init(&zone->avl_lock, NULL, MUTEX_ADAPTIVE, NULL);
 516 
 517                 list_create(&zone->listeners,
 518                     sizeof (autosnap_handler_t),
 519                     offsetof(autosnap_handler_t, node));
 520 
 521                 avl_create(&zone->snapshots,
 522                     snapshot_txg_compare,
 523                     sizeof (autosnap_snapshot_t),
 524                     offsetof(autosnap_snapshot_t, node));
 525 
 526                 zone->flags = flags;
 527                 zone->autosnap = autosnap;
 528 
 529                 /*
 530                  * This is a new zone and we need to collect orphaned
 531                  * snapshots for it. It is safe to drop autosnap_lock,
 532                  * because the zone is not on the list of available
 533                  * zones.
 534                  * Disallow registering a handler until the process
 535                  * is finished.
 536                  */
 537                 autosnap->register_busy = B_TRUE;
 538                 mutex_exit(&autosnap->autosnap_lock);
 539 
 540                 autosnap_collect_orphaned_snapshots(spa, zone);
 541 
 542                 mutex_enter(&autosnap->autosnap_lock);
 543                 cv_broadcast(&autosnap->autosnap_cv);
 544                 autosnap->register_busy = B_FALSE;
 545 
 546                 list_insert_tail(&autosnap->autosnap_zones, zone);
 547         } else {
 548                 if ((list_head(&zone->listeners) != NULL) &&
 549                     ((flags & AUTOSNAP_CREATOR) ^
 550                     (zone->flags & AUTOSNAP_CREATOR))) {
 551                         cmn_err(CE_WARN,
 552                             "AUTOSNAP: can't register two different"
 553                             " modes for the same autosnap zone %s %s\n",
 554                             name, flags & AUTOSNAP_RECURSIVE ? "[r]" : "");
 555                         goto out;
 556                 } else if ((list_head(&zone->listeners) != NULL) &&
 557                     ((flags & AUTOSNAP_RECURSIVE) ^
 558                     (zone->flags & AUTOSNAP_RECURSIVE))) {
 559                         cmn_err(CE_WARN,
 560                             "AUTOSNAP: can't register two different"
 561                             " recursion modes for the same autosnap zone "
 562                             "%s %s\n",
 563                             name, flags & AUTOSNAP_RECURSIVE ? "[r]" : "");
 564                         goto out;
 565                 }
 566 
 567                 zone->flags |= flags;
 568         }
 569 
 570         hdl = kmem_zalloc(sizeof (autosnap_handler_t), KM_SLEEP);
 571 
 572         hdl->confirm_cb = confirm_cb;
 573         hdl->nc_cb = nc_cb;
 574         hdl->err_cb = err_cb;
 575         hdl->cb_arg = cb_arg;
 576         hdl->zone = zone;
 577         hdl->flags = flags;
 578 
 579         list_insert_tail(&zone->listeners, hdl);
 580 
 581         if (flags & AUTOSNAP_OWNER)
 582                 autosnap_claim_orphaned_snaps(hdl);
 583 
 584 out:
 585         mutex_exit(&autosnap->autosnap_lock);
 586 
 587         return (hdl);
 588 }
 589 
 590 void *
 591 autosnap_register_handler(const char *name, uint64_t flags,
 592     autosnap_confirm_cb confirm_cb,
 593     autosnap_notify_created_cb nc_cb,
 594     autosnap_error_cb err_cb, void *cb_arg)
 595 {
 596         spa_t *spa;
 597         autosnap_handler_t *hdl = NULL;
 598         boolean_t namespace_alteration = B_TRUE;
 599 
 600         if (nc_cb == NULL)
 601                 return (NULL);
 602 
 603         /* special case for unregistering on deletion */
 604         if (!MUTEX_HELD(&spa_namespace_lock)) {
 605                 mutex_enter(&spa_namespace_lock);
 606                 namespace_alteration = B_FALSE;
 607         }
 608 
 609         spa = spa_lookup(name);
 610         if (spa != NULL)
 611                 spa_open_ref(spa, FTAG);
 612 
 613         if (!namespace_alteration)
 614                 mutex_exit(&spa_namespace_lock);
 615 
 616         if (spa == NULL)
 617                 return (NULL);
 618 
 619         hdl = autosnap_register_handler_impl(spa,
 620             name, flags, confirm_cb, nc_cb, err_cb, cb_arg);
 621 
 622         spa_close(spa, FTAG);
 623 
 624         return (hdl);
 625 }
 626 
 627 void
 628 autosnap_unregister_handler(void *opaque)
 629 {
 630         spa_t *spa;
 631         autosnap_handler_t *hdl = opaque;
 632         autosnap_zone_t *zone = hdl->zone;
 633         zfs_autosnap_t *autosnap = NULL;
 634         boolean_t namespace_alteration = B_TRUE;
 635 
 636         /* special case for unregistering on deletion */
 637         if (!MUTEX_HELD(&spa_namespace_lock)) {
 638                 mutex_enter(&spa_namespace_lock);
 639                 namespace_alteration = B_FALSE;
 640         }
 641 
 642         spa = spa_lookup(zone->dataset);
 643         if (spa != NULL)
 644                 spa_open_ref(spa, FTAG);
 645 
 646         if (!namespace_alteration)
 647                 mutex_exit(&spa_namespace_lock);
 648 
 649         /* if zone is absent, then just destroy handler */
 650         if (spa == NULL)
 651                 goto free_hdl;
 652 
 653         autosnap = spa_get_autosnap(spa);
 654 
 655         mutex_enter(&autosnap->autosnap_lock);
 656 
 657         autosnap_release_snapshots_by_txg_no_lock_impl(hdl,
 658             AUTOSNAP_FIRST_SNAP, AUTOSNAP_LAST_SNAP, B_FALSE);
 659 
 660         /*
 661          * Remove the client from zone. If it is a last client
 662          * then destroy the zone.
 663          */
 664         if (zone != NULL) {
 665                 list_remove(&zone->listeners, hdl);
 666 
 667                 if (list_head(&zone->listeners) == NULL) {
 668                         void *cookie = NULL;
 669                         autosnap_snapshot_t *snap;
 670 
 671                         while ((snap = avl_destroy_nodes(&zone->snapshots,
 672                             &cookie)) != NULL) {
 673                                 /*
 674                                  * Only orphans can be in
 675                                  * the AVL-tree at this stage
 676                                  */
 677                                 VERIFY(snap->orphaned);
 678                                 VERIFY(autosnap_refcount_is_zero(
 679                                     &snap->ref_cnt));
 680                                 kmem_free(snap, sizeof (autosnap_snapshot_t));
 681                         }
 682 
 683                         avl_destroy(&zone->snapshots);
 684                         mutex_destroy(&zone->avl_lock);
 685                         list_remove(&autosnap->autosnap_zones, zone);
 686                         list_destroy(&zone->listeners);
 687                         kmem_free(zone, sizeof (autosnap_zone_t));
 688                 } else {
 689                         autosnap_handler_t *walk;
 690                         boolean_t drop_owner_flag = B_TRUE;
 691                         boolean_t drop_krrp_flag = B_TRUE;
 692 
 693                         for (walk = list_head(&zone->listeners);
 694                             walk != NULL;
 695                             walk = list_next(&zone->listeners, walk)) {
 696                                 if ((walk->flags & AUTOSNAP_OWNER) != 0)
 697                                         drop_owner_flag = B_FALSE;
 698 
 699                                 if ((walk->flags & AUTOSNAP_KRRP) != 0)
 700                                         drop_krrp_flag = B_FALSE;
 701                         }
 702 
 703                         if (drop_owner_flag)
 704                                 zone->flags &= ~AUTOSNAP_OWNER;
 705 
 706                         if (drop_krrp_flag)
 707                                 zone->flags &= ~AUTOSNAP_KRRP;
 708                 }
 709         }
 710 
 711 free_hdl:
 712         kmem_free(hdl, sizeof (autosnap_handler_t));
 713 
 714 out:
 715         if (spa != NULL) {
 716                 spa_close(spa, FTAG);
 717                 mutex_exit(&autosnap->autosnap_lock);
 718         }
 719 }
 720 
 721 int
 722 autosnap_check_for_destroy(zfs_autosnap_t *autosnap, const char *name)
 723 {
 724         autosnap_zone_t *rzone, *zone;
 725         boolean_t children_have_zone;
 726 
 727         mutex_enter(&autosnap->autosnap_lock);
 728         zone = autosnap_find_zone(autosnap, name, B_FALSE);
 729         rzone = autosnap_find_zone(autosnap, name, B_TRUE);
 730         children_have_zone =
 731             autosnap_has_children_zone(autosnap, name, B_TRUE);
 732         mutex_exit(&autosnap->autosnap_lock);
 733 
 734         if (zone != NULL && (zone->flags & AUTOSNAP_KRRP) != 0)
 735                 return (EBUSY);
 736 
 737         if (children_have_zone)
 738                 return (ECHILD);
 739 
 740         if (rzone != NULL && (rzone->flags & AUTOSNAP_KRRP) != 0)
 741                 return (EUSERS);
 742 
 743         return (0);
 744 }
 745 
 746 boolean_t
 747 autosnap_has_children_zone(zfs_autosnap_t *autosnap,
 748     const char *name, boolean_t krrp_only)
 749 {
 750         autosnap_zone_t *zone;
 751         char dataset[ZFS_MAX_DATASET_NAME_LEN];
 752         char *snapshot;
 753         size_t ds_name_len;
 754 
 755         ASSERT(MUTEX_HELD(&autosnap->autosnap_lock));
 756 
 757         (void) strlcpy(dataset, name, sizeof (dataset));
 758         if ((snapshot = strchr(dataset, '@')) != NULL)
 759                 *snapshot++ = '\0';
 760 
 761         ds_name_len = strlen(dataset);
 762         zone = list_head(&autosnap->autosnap_zones);
 763         while (zone != NULL) {
 764                 int cmp = strncmp(dataset,
 765                     zone->dataset, ds_name_len);
 766                 boolean_t skip =
 767                     krrp_only && ((zone->flags & AUTOSNAP_KRRP) == 0);
 768                 if (cmp == 0 && zone->dataset[ds_name_len] == '/' &&
 769                     !skip)
 770                         return (B_TRUE);
 771 
 772                 zone = list_next(&autosnap->autosnap_zones, zone);
 773         }
 774 
 775         return (B_FALSE);
 776 }
 777 
 778 autosnap_zone_t *
 779 autosnap_find_zone(zfs_autosnap_t *autosnap,
 780     const char *name, boolean_t recursive)
 781 {
 782         char dataset[ZFS_MAX_DATASET_NAME_LEN];
 783         char *snapshot;
 784         autosnap_zone_t *zone;
 785 
 786         ASSERT(MUTEX_HELD(&autosnap->autosnap_lock));
 787 
 788         (void) strlcpy(dataset, name, sizeof (dataset));
 789         if ((snapshot = strchr(dataset, '@')) != NULL)
 790                 *snapshot++ = '\0';
 791 
 792         zone = list_head(&autosnap->autosnap_zones);
 793         while (zone != NULL) {
 794                 if (strcmp(dataset, zone->dataset) == 0) {
 795                         return (zone);
 796                 } else if (recursive) {
 797                         size_t ds_name_len = strlen(zone->dataset);
 798                         int cmp = strncmp(dataset, zone->dataset,
 799                             ds_name_len);
 800                         boolean_t zone_is_recursive =
 801                             zone->flags & AUTOSNAP_RECURSIVE;
 802                         if (cmp == 0 && zone_is_recursive &&
 803                             dataset[ds_name_len] == '/')
 804                                 return (zone);
 805                 }
 806 
 807                 zone = list_next(&autosnap->autosnap_zones, zone);
 808         }
 809 
 810         return (NULL);
 811 }
 812 
 813 /* AUTOSNAP-LOCK routines */
 814 
 815 /*
 816  * This function is used to serialize atomically-destroy
 817  * and start a KRRP replication session (send side).
 818  *
 819  * Atomically-destroy logic allows a DS and nested DSs
 820  * to be destroyed in one TXG.
 821  *
 822  * This function uses RW_LOCK, so multiple KRRP replication
 823  * sessions may start in parallel. However atomically-destroy
 824  * is a writer, so KRRP replication sessions will wait until it
 825  * finished.
 826  *
 827  * if pool export or destroy are in process then the function
 828  * will not hold anything and return ENOLCK.
 829  *
 830  * In case of receiving kill-signal (if the function was called
 831  * from an ioctl handler) the function returns EINTR.
 832  */
 833 int
 834 autosnap_lock(spa_t *spa, krw_t rw)
 835 {
 836         zfs_autosnap_t *autosnap = spa_get_autosnap(spa);
 837         int err = 0;
 838         int locked = 0;
 839 
 840         mutex_enter(&autosnap->autosnap_lock);
 841 
 842         locked = rw_tryenter(&autosnap->autosnap_rwlock, rw);
 843         while (locked == 0 && !autosnap->need_stop) {
 844 #ifdef _KERNEL
 845                 int rc = cv_wait_sig(&autosnap->autosnap_cv,
 846                     &autosnap->autosnap_lock);
 847                 if (rc == 0)
 848                         break;
 849 #else
 850                 (void) cv_wait(&autosnap->autosnap_cv,
 851                     &autosnap->autosnap_lock);
 852 #endif
 853 
 854                 locked = rw_tryenter(&autosnap->autosnap_rwlock, rw);
 855         }
 856 
 857         if (autosnap->need_stop) {
 858                 err = SET_ERROR(ENOLCK);
 859                 if (locked != 0)
 860                         rw_exit(&autosnap->autosnap_rwlock);
 861         } else if (locked == 0) {
 862                 err = SET_ERROR(EINTR);
 863         }
 864 
 865         cv_broadcast(&autosnap->autosnap_cv);
 866         mutex_exit(&autosnap->autosnap_lock);
 867 
 868         return (err);
 869 }
 870 
 871 void
 872 autosnap_unlock(spa_t *spa)
 873 {
 874         zfs_autosnap_t *autosnap = spa_get_autosnap(spa);
 875 
 876         rw_exit(&autosnap->autosnap_rwlock);
 877 
 878         mutex_enter(&autosnap->autosnap_lock);
 879         cv_broadcast(&autosnap->autosnap_cv);
 880         mutex_exit(&autosnap->autosnap_lock);
 881 }
 882 
 883 /* AUTOSNAP-FSNAP routines */
 884 
 885 void
 886 autosnap_exempt_snapshot(spa_t *spa, const char *name)
 887 {
 888         zfs_autosnap_t *autosnap = spa_get_autosnap(spa);
 889         autosnap_zone_t *zone;
 890         uint64_t txg;
 891         int err;
 892         dsl_dataset_t *ds;
 893         autosnap_snapshot_t search = { 0 }, *found;
 894         char *atpos;
 895 
 896         err = dsl_dataset_hold(spa_get_dsl(spa), name, FTAG, &ds);
 897         if (err != 0)
 898                 return;
 899 
 900         txg = dsl_dataset_phys(ds)->ds_creation_txg;
 901         dsl_dataset_rele(ds, FTAG);
 902 
 903         mutex_enter(&autosnap->autosnap_lock);
 904 
 905         (void) strlcpy(search.name, name, sizeof (search.name));
 906         atpos = strchr(search.name, '@');
 907         *atpos = '\0';
 908 
 909         zone = autosnap_find_zone(autosnap, search.name, B_TRUE);
 910         if (zone != NULL) {
 911                 *atpos = '@';
 912                 search.txg = txg;
 913 
 914                 found = avl_find(&zone->snapshots, &search, NULL);
 915                 if (found != NULL) {
 916                         avl_remove(&zone->snapshots, found);
 917                         autosnap_refcount_remove_all(&found->ref_cnt);
 918                         kmem_free(found, sizeof (autosnap_snapshot_t));
 919                 }
 920         }
 921 
 922         mutex_exit(&autosnap->autosnap_lock);
 923 }
 924 
 925 void
 926 autosnap_force_snap_by_name(const char *dsname, autosnap_zone_t *zone,
 927     boolean_t sync)
 928 {
 929         dsl_pool_t *dp;
 930         dsl_dataset_t *ds;
 931         objset_t *os;
 932         uint64_t txg = 0;
 933         zfs_autosnap_t *autosnap;
 934         int error;
 935 
 936         error = dsl_pool_hold(dsname, FTAG, &dp);
 937         if (error)
 938                 return;
 939 
 940         autosnap = spa_get_autosnap(dp->dp_spa);
 941         if (!autosnap) {
 942                 dsl_pool_rele(dp, FTAG);
 943                 return;
 944         }
 945 
 946         mutex_enter(&autosnap->autosnap_lock);
 947         if (zone == NULL) {
 948                 zone = autosnap_find_zone(autosnap, dsname, B_TRUE);
 949                 if (zone == NULL) {
 950                         mutex_exit(&autosnap->autosnap_lock);
 951                         dsl_pool_rele(dp, FTAG);
 952                         return;
 953                 }
 954         }
 955 
 956         error = dsl_dataset_hold(dp, dsname, FTAG, &ds);
 957         if (error) {
 958                 mutex_exit(&autosnap->autosnap_lock);
 959                 dsl_pool_rele(dp, FTAG);
 960                 return;
 961         }
 962         error = dmu_objset_from_ds(ds, &os);
 963         if (error) {
 964                 dsl_dataset_rele(ds, FTAG);
 965                 mutex_exit(&autosnap->autosnap_lock);
 966                 dsl_pool_rele(dp, FTAG);
 967                 return;
 968         }
 969         if (dmu_objset_is_snapshot(os)) {
 970                 dsl_dataset_rele(ds, FTAG);
 971                 mutex_exit(&autosnap->autosnap_lock);
 972                 dsl_pool_rele(dp, FTAG);
 973                 return;
 974         }
 975 
 976         dsl_pool_rele(dp, FTAG);
 977 
 978         if (zone->flags & AUTOSNAP_CREATOR) {
 979                 dmu_tx_t *tx = dmu_tx_create(os);
 980 
 981                 error = dmu_tx_assign(tx, TXG_NOWAIT);
 982 
 983                 if (error) {
 984                         dmu_tx_abort(tx);
 985                         dsl_dataset_rele(ds, FTAG);
 986                         mutex_exit(&autosnap->autosnap_lock);
 987                         return;
 988                 }
 989 
 990                 txg = dmu_tx_get_txg(tx);
 991                 dsl_dataset_dirty(ds, tx);
 992                 dmu_tx_commit(tx);
 993         }
 994 
 995         dsl_dataset_rele(ds, FTAG);
 996         mutex_exit(&autosnap->autosnap_lock);
 997 
 998         if (sync)
 999                 txg_wait_synced(dp, txg);
1000 }
1001 
1002 /* Force creation of an autosnap */
1003 void
1004 autosnap_force_snap(void *opaque, boolean_t sync)
1005 {
1006         autosnap_handler_t *hdl;
1007         autosnap_zone_t *zone;
1008 
1009         if (!opaque)
1010                 return;
1011 
1012         hdl = opaque;
1013         zone = hdl->zone;
1014 
1015         autosnap_force_snap_by_name(zone->dataset, zone, sync);
1016 }
1017 
1018 /*
1019  * This function is called when the caller wants snapshot ASAP
1020  */
1021 void
1022 autosnap_force_snap_fast(void *opaque)
1023 {
1024         autosnap_handler_t *hdl = opaque;
1025         autosnap_zone_t *zone = hdl->zone;
1026 
1027         mutex_enter(&zone->autosnap->autosnap_lock);
1028 
1029         /*
1030          * Mark this autosnap zone as "delayed", so that autosnap
1031          * for this zone is created in the next TXG sync
1032          */
1033         zone->delayed = B_TRUE;
1034 
1035         mutex_exit(&zone->autosnap->autosnap_lock);
1036 }
1037 
1038 /* AUTOSNAP-NOTIFIER routines */
1039 
1040 /* iterate through handlers and call its confirm callbacks */
1041 boolean_t
1042 autosnap_confirm_snap(autosnap_zone_t *zone, uint64_t txg)
1043 {
1044         autosnap_handler_t *hdl;
1045         boolean_t confirmation = B_FALSE;
1046 
1047         if ((zone->flags & AUTOSNAP_CREATOR) == 0)
1048                 return (B_FALSE);
1049 
1050         for (hdl = list_head(&zone->listeners);
1051             hdl != NULL;
1052             hdl = list_next(&zone->listeners, hdl)) {
1053                 confirmation |=
1054                     hdl->confirm_cb == NULL ? B_TRUE :
1055                     hdl->confirm_cb(zone->dataset,
1056                     !!(zone->flags & AUTOSNAP_RECURSIVE),
1057                     txg, hdl->cb_arg);
1058         }
1059 
1060         return (confirmation);
1061 }
1062 
1063 /* iterate through handlers and call its error callbacks */
1064 void
1065 autosnap_error_snap(autosnap_zone_t *zone, uint64_t txg, int err)
1066 {
1067         autosnap_handler_t *hdl;
1068 
1069         ASSERT(MUTEX_HELD(&zone->autosnap->autosnap_lock));
1070 
1071         for (hdl = list_head(&zone->listeners);
1072             hdl != NULL;
1073             hdl = list_next(&zone->listeners, hdl)) {
1074                 if (hdl->err_cb)
1075                         hdl->err_cb(zone->dataset, err, txg, hdl->cb_arg);
1076         }
1077 }
1078 
1079 /* iterate through handlers and call its notify callbacks */
1080 static void
1081 autosnap_notify_listeners(autosnap_zone_t *zone,
1082     autosnap_snapshot_t *snap)
1083 {
1084         autosnap_handler_t *hdl;
1085 
1086         for (hdl = list_head(&zone->listeners);
1087             hdl != NULL;
1088             hdl = list_next(&zone->listeners, hdl)) {
1089                 if (hdl->nc_cb(snap->name,
1090                     !!(zone->flags & AUTOSNAP_RECURSIVE),
1091                     B_TRUE, snap->txg, snap->etxg, hdl->cb_arg))
1092                         autosnap_refcount_add(&snap->ref_cnt, hdl);
1093         }
1094 }
1095 
1096 /*
1097  * With no WBC and a dataset which is either a standalone or root of
1098  * recursion, just notify about creation
1099  * With no WBC and dataset not being a part of any zone, just reject it
1100  */
1101 void
1102 autosnap_create_cb(zfs_autosnap_t *autosnap,
1103     dsl_dataset_t *ds, const char *snapname, uint64_t txg)
1104 {
1105         autosnap_zone_t *zone, *rzone;
1106         char fullname[ZFS_MAX_DATASET_NAME_LEN];
1107 
1108         dsl_dataset_name(ds, fullname);
1109 
1110         mutex_enter(&autosnap->autosnap_lock);
1111         zone = autosnap_find_zone(autosnap, fullname, B_FALSE);
1112         rzone = autosnap_find_zone(autosnap, fullname, B_TRUE);
1113 
1114         (void) strcat(fullname, "@");
1115         (void) strcat(fullname, snapname);
1116 
1117         if (zone != NULL) {
1118                 /*
1119                  * Some listeners subscribed for this datasets.
1120                  * So need to notify them about new snapshot
1121                  */
1122                 autosnap_notify_created(fullname, txg, zone);
1123         } else if (!rzone) {
1124                 /*
1125                  * There are no listeners for this datasets
1126                  * and its children. So this snapshot is not
1127                  * needed anymore.
1128                  */
1129                 autosnap_reject_snap(fullname, txg, autosnap);
1130         }
1131 
1132         mutex_exit(&autosnap->autosnap_lock);
1133 }
1134 
1135 /* Notify listeners about an autosnapshot */
1136 static void
1137 autosnap_notify_created(const char *name, uint64_t txg,
1138     autosnap_zone_t *zone)
1139 {
1140         autosnap_snapshot_t *snapshot = NULL, search;
1141         avl_index_t where = NULL;
1142         boolean_t found = B_TRUE;
1143 
1144         ASSERT(MUTEX_HELD(&zone->autosnap->autosnap_lock));
1145 
1146 #ifdef ZFS_DEBUG
1147         VERIFY(autosnap_check_name(strchr(name, '@')));
1148 #endif
1149 
1150         search.txg = txg;
1151         (void) strlcpy(search.name, name, sizeof (search.name));
1152         snapshot = avl_find(&zone->snapshots, &search, &where);
1153         if (snapshot == NULL) {
1154                 found = B_FALSE;
1155                 snapshot = autosnap_create_snap_node(name, txg, txg,
1156                     !!(zone->flags & AUTOSNAP_RECURSIVE), B_FALSE);
1157         }
1158 
1159         autosnap_notify_listeners(zone, snapshot);
1160 
1161         if ((zone->flags & AUTOSNAP_DESTROYER) != 0) {
1162                 if (list_is_empty(&snapshot->ref_cnt)) {
1163                         list_insert_tail(
1164                             &zone->autosnap->autosnap_destroy_queue, snapshot);
1165                         cv_broadcast(&zone->autosnap->autosnap_cv);
1166                 } else if (!found) {
1167                         avl_insert(&zone->snapshots, snapshot, where);
1168                 }
1169         } else if (!found) {
1170                 kmem_free(snapshot, sizeof (autosnap_snapshot_t));
1171         }
1172 }
1173 
1174 /* Reject a creation of an autosnapshot */
1175 static void
1176 autosnap_reject_snap(const char *name, uint64_t txg, zfs_autosnap_t *autosnap)
1177 {
1178         autosnap_snapshot_t *snapshot = NULL;
1179 
1180         ASSERT(MUTEX_HELD(&autosnap->autosnap_lock));
1181 
1182 #ifdef ZFS_DEBUG
1183         VERIFY(autosnap_check_name(strchr(name, '@')));
1184 #endif
1185 
1186         snapshot = autosnap_create_snap_node(name, txg, txg, B_FALSE, B_FALSE);
1187 
1188         list_insert_tail(&autosnap->autosnap_destroy_queue, snapshot);
1189         cv_broadcast(&autosnap->autosnap_cv);
1190 }
1191 
1192 /* AUTOSNAP-DESTROYER routines */
1193 
1194 typedef struct {
1195         kmutex_t nvl_lock;
1196         nvlist_t *autosnaps;
1197         const char *snap_name;
1198 } autosnap_collector_destroy_cb_arg_t;
1199 
1200 /* ARGSUSED */
1201 static int
1202 autosnap_collect_destroy_snapshots_cb(dsl_pool_t *dp,
1203     dsl_dataset_t *ds, void *arg)
1204 {
1205         autosnap_collector_destroy_cb_arg_t *cb_arg = arg;
1206         char full_snap_name[ZFS_MAX_DATASET_NAME_LEN];
1207         int err;
1208 
1209         dsl_dataset_name(ds, full_snap_name);
1210         if ((strlcat(full_snap_name, "@",
1211             sizeof (full_snap_name)) >= sizeof (full_snap_name)) ||
1212             (strlcat(full_snap_name, cb_arg->snap_name,
1213             sizeof (full_snap_name)) >= sizeof (full_snap_name))) {
1214                 /*
1215                  * If we cannot construct full snapshot name,
1216                  * then the DS doesn't have such snapshot
1217                  */
1218                 return (0);
1219         }
1220 
1221         mutex_enter(&cb_arg->nvl_lock);
1222         err = nvlist_add_boolean(cb_arg->autosnaps, full_snap_name);
1223         mutex_exit(&cb_arg->nvl_lock);
1224 
1225         return (err != 0 ? SET_ERROR(err) : 0);
1226 }
1227 
1228 /* Collect snapshots for destroy */
1229 static int
1230 autosnap_collect_for_destroy_impl(spa_t *spa, const char *root_ds,
1231     const char *snap_name, boolean_t recursive, nvlist_t *nv_auto)
1232 {
1233         dsl_pool_t *dp = spa_get_dsl(spa);
1234         dsl_dataset_t *ds;
1235         int flags = 0;
1236         uint64_t dd_object;
1237         int err;
1238         autosnap_collector_destroy_cb_arg_t cb_arg;
1239 
1240 
1241         dsl_pool_config_enter(dp, FTAG);
1242 
1243         err = dsl_dataset_hold(dp, root_ds, FTAG, &ds);
1244         if (err != 0)
1245                 goto out;
1246 
1247         dd_object = ds->ds_dir->dd_object;
1248         dsl_dataset_rele(ds, FTAG);
1249 
1250         if (recursive)
1251                 flags |= DS_FIND_CHILDREN;
1252 
1253         mutex_init(&cb_arg.nvl_lock, NULL, MUTEX_DEFAULT, NULL);
1254         cb_arg.autosnaps = nv_auto;
1255         cb_arg.snap_name = snap_name;
1256 
1257         err = dmu_objset_find_dp(spa_get_dsl(spa), dd_object,
1258             autosnap_collect_destroy_snapshots_cb, &cb_arg, flags);
1259 
1260 out:
1261         dsl_pool_config_exit(dp, FTAG);
1262 
1263         return (err);
1264 }
1265 
1266 static int
1267 autosnap_collect_for_destroy(spa_t *spa, list_t *autosnaps,
1268     nvlist_t **result)
1269 {
1270         char ds[ZFS_MAX_DATASET_NAME_LEN];
1271         char *snap;
1272         int err = 0;
1273         nvlist_t *nvl;
1274         autosnap_snapshot_t *snapshot;
1275 
1276         ASSERT(!list_is_empty(autosnaps));
1277 
1278         nvl = fnvlist_alloc();
1279         snapshot = list_head(autosnaps);
1280         while (snapshot != NULL) {
1281                 (void) strlcpy(ds, snapshot->name, sizeof (ds));
1282                 snap = strchr(ds, '@');
1283                 VERIFY(snap != NULL);
1284                 *snap++ = '\0';
1285 
1286                 err = autosnap_collect_for_destroy_impl(spa, ds, snap,
1287                     snapshot->recursive, nvl);
1288                 if (err != 0)
1289                         break;
1290 
1291                 snapshot = list_next(autosnaps, snapshot);
1292         }
1293 
1294         if (err != 0)
1295                 fnvlist_free(nvl);
1296         else
1297                 *result = nvl;
1298 
1299         return (err);
1300 }
1301 
1302 void
1303 autosnap_destroyer_thread(void *void_spa)
1304 {
1305         spa_t *spa = void_spa;
1306         zfs_autosnap_t *autosnap = spa_get_autosnap(spa);
1307         list_t error_destroy, tmp_list;
1308         boolean_t process_error_queue = B_TRUE;
1309 
1310         list_create(&error_destroy, sizeof (autosnap_snapshot_t),
1311             offsetof(autosnap_snapshot_t, dnode));
1312         list_create(&tmp_list, sizeof (autosnap_snapshot_t),
1313             offsetof(autosnap_snapshot_t, dnode));
1314 
1315         mutex_enter(&autosnap->autosnap_lock);
1316         while (!autosnap->need_stop) {
1317                 nvlist_t *nvl = NULL, *errlist;
1318                 nvpair_t *nvp = NULL;
1319                 int err;
1320 
1321                 if (!list_is_empty(&error_destroy) &&
1322                     (process_error_queue ||
1323                     list_is_empty(&autosnap->autosnap_destroy_queue))) {
1324                         /*
1325                          * error_destroy list contains items that could not
1326                          * be destroyed in batch mode, we will try to
1327                          * destroy them one by one.
1328                          */
1329                         mutex_exit(&autosnap->autosnap_lock);
1330                         list_insert_head(&tmp_list,
1331                             list_remove_tail(&error_destroy));
1332                         process_error_queue = B_FALSE;
1333                 } else if (!list_is_empty(&autosnap->autosnap_destroy_queue)) {
1334                         /*
1335                          * Items from the list will be tried to
1336                          * remove in batch mode
1337                          */
1338                         list_move_tail(&tmp_list,
1339                             &autosnap->autosnap_destroy_queue);
1340                         mutex_exit(&autosnap->autosnap_lock);
1341                         process_error_queue = B_TRUE;
1342                 } else {
1343                         cv_wait(&autosnap->autosnap_cv,
1344                             &autosnap->autosnap_lock);
1345                         continue;
1346                 }
1347 
1348                 err = autosnap_collect_for_destroy(spa, &tmp_list, &nvl);
1349                 if (err != 0) {
1350                         list_move_tail(&error_destroy, &tmp_list);
1351                         mutex_enter(&autosnap->autosnap_lock);
1352                         continue;
1353                 }
1354 
1355 #ifdef _KERNEL
1356                 /*
1357                  * Mounted snapshots (.zfs/snapshots) cannot be destroyed,
1358                  * so we unmount them before pass to the destroyer
1359                  */
1360                 while ((nvp = nvlist_next_nvpair(nvl, nvp)) != NULL)
1361                         zfs_unmount_snap(nvpair_name(nvp));
1362 #endif
1363 
1364                 errlist = fnvlist_alloc();
1365                 err = dsl_destroy_snapshots_nvl(nvl, B_TRUE, errlist);
1366                 fnvlist_free(errlist);
1367                 fnvlist_free(nvl);
1368 
1369                 if (err == 0) {
1370                         autosnap_snapshot_t *snapshot;
1371 
1372                         while ((snapshot = list_remove_head(&tmp_list)) != NULL)
1373                                 kmem_free(snapshot, sizeof (autosnap_snapshot_t));
1374                 } else {
1375                         list_move_tail(&error_destroy, &tmp_list);
1376                 }
1377 
1378                 mutex_enter(&autosnap->autosnap_lock);
1379         }
1380 
1381         if (!list_is_empty(&error_destroy)) {
1382                 list_move_tail(&autosnap->autosnap_destroy_queue,
1383                     &error_destroy);
1384         }
1385 
1386         if (!list_is_empty(&tmp_list)) {
1387                 list_move_tail(&autosnap->autosnap_destroy_queue,
1388                     &tmp_list);
1389         }
1390 
1391         autosnap->destroyer = NULL;
1392         cv_broadcast(&autosnap->autosnap_cv);
1393         mutex_exit(&autosnap->autosnap_lock);
1394 }
1395 
1396 void
1397 autosnap_destroyer_thread_start(spa_t *spa)
1398 {
1399         zfs_autosnap_t *autosnap = spa_get_autosnap(spa);
1400 
1401         mutex_enter(&autosnap->autosnap_lock);
1402         autosnap->need_stop = B_FALSE;
1403         cv_broadcast(&autosnap->autosnap_cv);
1404         mutex_exit(&autosnap->autosnap_lock);
1405 
1406         autosnap->destroyer = thread_create(NULL, 32 << 10,
1407             autosnap_destroyer_thread, spa, 0, &p0,
1408             TS_RUN, minclsyspri);
1409 }
1410 
1411 void
1412 autosnap_destroyer_thread_stop(spa_t *spa)
1413 {
1414         zfs_autosnap_t *autosnap = spa_get_autosnap(spa);
1415 
1416         if (!autosnap->initialized)
1417                 return;
1418 
1419         mutex_enter(&autosnap->autosnap_lock);
1420         if (autosnap->need_stop || autosnap->destroyer == NULL) {
1421                 mutex_exit(&autosnap->autosnap_lock);
1422                 return;
1423         }
1424 
1425         autosnap->need_stop = B_TRUE;
1426         cv_broadcast(&autosnap->autosnap_cv);
1427         while (autosnap->destroyer != NULL)
1428                 cv_wait(&autosnap->autosnap_cv, &autosnap->autosnap_lock);
1429 
1430         mutex_exit(&autosnap->autosnap_lock);
1431 }
1432 
1433 /* AUTOSNAP-INIT routines */
1434 
1435 void
1436 autosnap_init(spa_t *spa)
1437 {
1438         zfs_autosnap_t *autosnap = spa_get_autosnap(spa);
1439         mutex_init(&autosnap->autosnap_lock, NULL, MUTEX_ADAPTIVE, NULL);
1440         cv_init(&autosnap->autosnap_cv, NULL, CV_DEFAULT, NULL);
1441         rw_init(&autosnap->autosnap_rwlock, NULL, RW_DEFAULT, NULL);
1442         list_create(&autosnap->autosnap_zones, sizeof (autosnap_zone_t),
1443             offsetof(autosnap_zone_t, node));
1444         list_create(&autosnap->autosnap_destroy_queue,
1445             sizeof (autosnap_snapshot_t),
1446             offsetof(autosnap_snapshot_t, dnode));
1447         autosnap->need_stop = B_FALSE;
1448 
1449 #ifdef _KERNEL
1450         autosnap_destroyer_thread_start(spa);
1451 #endif
1452 
1453         autosnap->initialized = B_TRUE;
1454 }
1455 
1456 void
1457 autosnap_fini(spa_t *spa)
1458 {
1459         zfs_autosnap_t *autosnap = spa_get_autosnap(spa);
1460         autosnap_zone_t *zone;
1461         autosnap_handler_t *hdl;
1462         autosnap_snapshot_t *snap;
1463 
1464         if (!autosnap->initialized)
1465                 return;
1466 
1467         rw_enter(&autosnap->autosnap_rwlock, RW_WRITER);
1468 
1469         if (autosnap->destroyer)
1470                 autosnap_destroyer_thread_stop(spa);
1471 
1472         autosnap->initialized = B_FALSE;
1473 
1474         while ((zone = list_head(&autosnap->autosnap_zones)) != NULL) {
1475                 while ((hdl = list_head(&zone->listeners)) != NULL)
1476                         autosnap_unregister_handler(hdl);
1477         }
1478 
1479         while ((snap =
1480             list_remove_head(&autosnap->autosnap_destroy_queue)) != NULL)
1481                 kmem_free(snap, sizeof (*snap));
1482         list_destroy(&autosnap->autosnap_destroy_queue);
1483         list_destroy(&autosnap->autosnap_zones);
1484 
1485         rw_exit(&autosnap->autosnap_rwlock);
1486         rw_destroy(&autosnap->autosnap_rwlock);
1487         mutex_destroy(&autosnap->autosnap_lock);
1488         cv_destroy(&autosnap->autosnap_cv);
1489 }
1490 
1491 boolean_t
1492 autosnap_is_autosnap(dsl_dataset_t *ds)
1493 {
1494         char ds_name[ZFS_MAX_DATASET_NAME_LEN];
1495 
1496         ASSERT(ds != NULL && ds->ds_is_snapshot);
1497 
1498         dsl_dataset_name(ds, ds_name);
1499         return (autosnap_check_name(strchr(ds_name, '@')));
1500 }
1501 
1502 /*
1503  * Returns B_TRUE if the given name is the name of an autosnap
1504  * otherwise B_FASLE
1505  *
1506  * the name of an autosnap matches the following regexp:
1507  *
1508  * /^@?AUTOSNAP_PREFIX\d+$/
1509  */
1510 boolean_t
1511 autosnap_check_name(const char *snap_name)
1512 {
1513         size_t len, i = AUTOSNAP_PREFIX_LEN;
1514 
1515         ASSERT(snap_name != NULL);
1516 
1517         if (snap_name[0] == '@')
1518                 snap_name++;
1519 
1520         len = strlen(snap_name);
1521         if (AUTOSNAP_PREFIX_LEN > len ||
1522             strncmp(snap_name, AUTOSNAP_PREFIX,
1523             AUTOSNAP_PREFIX_LEN) != 0)
1524                 return (B_FALSE);
1525 
1526         while (i < len) {
1527                 if (!isdigit(snap_name[i]))
1528                         return (B_FALSE);
1529 
1530                 i++;
1531         }
1532 
1533         return (B_TRUE);
1534 }
1535 
1536 /*
1537  * This function will called upon TX-group commit.
1538  * Here we free allocated structures and notify
1539  * the listeners of the corresponding autosnap-zone
1540  * about error
1541  */
1542 static void
1543 autosnap_commit_cb(void *dcb_data, int error)
1544 {
1545         autosnap_commit_cb_arg_t *cb_arg = dcb_data;
1546         autosnap_zone_t *azone = cb_arg->azone;
1547         zfs_autosnap_t *autosnap = azone->autosnap;
1548         dsl_sync_task_t *dst = cb_arg->dst;
1549         dsl_dataset_snapshot_arg_t *ddsa = dst->dst_arg;
1550 
1551         VERIFY(ddsa->ddsa_autosnap);
1552 
1553         /*
1554          * TX-group was processed, but some error
1555          * occured on check-stage. This means that
1556          * the requested autosnaps were not created
1557          * and we need inform listeners about this
1558          */
1559         if (error == 0 && dst->dst_error != 0) {
1560                 mutex_enter(&autosnap->autosnap_lock);
1561                 autosnap_error_snap(azone, dst->dst_txg, dst->dst_error);
1562                 mutex_exit(&autosnap->autosnap_lock);
1563         }
1564 
1565         spa_close(dst->dst_pool->dp_spa, cb_arg);
1566 
1567         nvlist_free(ddsa->ddsa_snaps);
1568         kmem_free(ddsa, sizeof (dsl_dataset_snapshot_arg_t));
1569         kmem_free(dst, sizeof (dsl_sync_task_t));
1570         kmem_free(cb_arg, sizeof (autosnap_commit_cb_arg_t));
1571 }
1572 
1573 typedef struct {
1574         kmutex_t nvl_lock;
1575         nvlist_t *autosnaps;
1576         const char *snap_name;
1577         dmu_tx_t *tx;
1578 } autosnap_collector_create_cb_arg_t;
1579 
1580 /* ARGSUSED */
1581 static int
1582 autosnap_collect_create_snaps_cb(dsl_pool_t *dp,
1583     dsl_dataset_t *ds, void *arg)
1584 {
1585         autosnap_collector_create_cb_arg_t *cb_arg = arg;
1586         char full_snap_name[ZFS_MAX_DATASET_NAME_LEN];
1587         int err;
1588 
1589 
1590         dsl_dataset_name(ds, full_snap_name);
1591         if ((strlcat(full_snap_name, "@",
1592             sizeof (full_snap_name)) >= sizeof (full_snap_name)) ||
1593             (strlcat(full_snap_name, cb_arg->snap_name,
1594             sizeof (full_snap_name)) >= sizeof (full_snap_name))) {
1595                 return (SET_ERROR(ENAMETOOLONG));
1596         }
1597 
1598         err = dsl_dataset_snapshot_check_impl(ds,
1599             cb_arg->snap_name, cb_arg->tx, B_FALSE, 0, NULL);
1600         if (err != 0)
1601                 return (err);
1602 
1603         mutex_enter(&cb_arg->nvl_lock);
1604         err = nvlist_add_boolean(cb_arg->autosnaps, full_snap_name);
1605         mutex_exit(&cb_arg->nvl_lock);
1606 
1607         return (err != 0 ? SET_ERROR(err) : 0);
1608 }
1609 
1610 /* Collect datasets with a given param and create a snapshoting synctask */
1611 #define AUTOSNAP_COLLECTOR_BUSY_LIMIT (1000)
1612 static int
1613 dsl_pool_collect_ds_for_autosnap(dsl_pool_t *dp, uint64_t txg,
1614     const char *root_ds, const char *snap_name, boolean_t recursive,
1615     dmu_tx_t *tx, dsl_sync_task_t **dst_res)
1616 {
1617         spa_t *spa = dp->dp_spa;
1618         dsl_dataset_t *ds;
1619         int flags = 0;
1620         uint64_t dd_object;
1621         int err;
1622         autosnap_collector_create_cb_arg_t cb_arg;
1623         int busy_counter = 0;
1624 
1625 
1626         err = dsl_dataset_hold(dp, root_ds, FTAG, &ds);
1627         if (err != 0)
1628                 return (err);
1629 
1630         dd_object = ds->ds_dir->dd_object;
1631         dsl_dataset_rele(ds, FTAG);
1632 
1633         if (recursive)
1634                 flags |= DS_FIND_CHILDREN;
1635 
1636         mutex_init(&cb_arg.nvl_lock, NULL, MUTEX_DEFAULT, NULL);
1637         cb_arg.snap_name = snap_name;
1638         cb_arg.tx = tx;
1639 
1640         for (;;) {
1641                 cb_arg.autosnaps = fnvlist_alloc();
1642                 err = dmu_objset_find_dp(spa_get_dsl(spa), dd_object,
1643                     autosnap_collect_create_snaps_cb, &cb_arg, flags);
1644                 if (err == 0 || err != EBUSY ||
1645                     busy_counter++ >= AUTOSNAP_COLLECTOR_BUSY_LIMIT)
1646                         break;
1647 
1648                 delay(NSEC_TO_TICK(100));
1649                 fnvlist_free(cb_arg.autosnaps);
1650         }
1651 
1652         if (err == 0) {
1653                 dsl_sync_task_t *dst =
1654                     kmem_zalloc(sizeof (dsl_sync_task_t), KM_SLEEP);
1655                 dsl_dataset_snapshot_arg_t *ddsa =
1656                     kmem_zalloc(sizeof (dsl_dataset_snapshot_arg_t), KM_SLEEP);
1657                 ddsa->ddsa_autosnap = B_TRUE;
1658                 ddsa->ddsa_snaps = cb_arg.autosnaps;
1659                 ddsa->ddsa_cr = CRED();
1660                 dst->dst_pool = dp;
1661                 dst->dst_txg = txg;
1662                 dst->dst_space = 3 << DST_AVG_BLKSHIFT;
1663                 dst->dst_checkfunc = dsl_dataset_snapshot_check;
1664                 dst->dst_syncfunc = dsl_dataset_snapshot_sync;
1665                 dst->dst_arg = ddsa;
1666                 dst->dst_error = 0;
1667                 dst->dst_nowaiter = B_FALSE;
1668                 VERIFY(txg_list_add_tail(&dp->dp_sync_tasks,
1669                     dst, dst->dst_txg));
1670                 *dst_res = dst;
1671         } else {
1672                 fnvlist_free(cb_arg.autosnaps);
1673         }
1674 
1675         return (err);
1676 }
1677 
1678 /*
1679  * This function is called from dsl_pool_sync() during
1680  * the walking autosnap-zone that have confirmed the creation
1681  * of autosnapshot.
1682  * Here we try to create autosnap for the given autosnap-zone
1683  * and notify the listeners of the zone in case of an error
1684  */
1685 void
1686 autosnap_create_snapshot(autosnap_zone_t *azone, char *snap,
1687     dsl_pool_t *dp, uint64_t txg, dmu_tx_t *tx)
1688 {
1689         int err;
1690         boolean_t recurs;
1691         dsl_sync_task_t *dst = NULL;
1692 
1693         ASSERT(MUTEX_HELD(&azone->autosnap->autosnap_lock));
1694 
1695         ASSERT(!azone->created);
1696 
1697         recurs = !!(azone->flags & AUTOSNAP_RECURSIVE);
1698         err = dsl_pool_collect_ds_for_autosnap(dp, txg,
1699             azone->dataset, snap, recurs, tx, &dst);
1700         if (err == 0) {
1701                 autosnap_commit_cb_arg_t *cb_arg;
1702 
1703                 azone->created = B_TRUE;
1704                 azone->delayed = B_FALSE;
1705                 azone->dirty = B_FALSE;
1706 
1707                 /*
1708                  * Autosnap service works asynchronously, so to free
1709                  * allocated memory and delivery sync-task errors we register
1710                  * TX-callback that will be called after sync of the whole
1711                  * TX-group
1712                  */
1713                 cb_arg = kmem_alloc(sizeof (autosnap_commit_cb_arg_t),
1714                     KM_SLEEP);
1715                 cb_arg->azone = azone;
1716                 cb_arg->dst = dst;
1717                 dmu_tx_callback_register(tx, autosnap_commit_cb, cb_arg);
1718 
1719                 /*
1720                  * To avoid early spa_fini increase spa_refcount,
1721                  * because TX-commit callbacks are executed asynchronously.
1722                  */
1723                 spa_open_ref(dp->dp_spa, cb_arg);
1724         } else {
1725                 autosnap_error_snap(azone, txg, err);
1726         }
1727 }
1728 
1729 /*
1730  * This function is called from dsl_dataset_snapshot_check() before
1731  * any other checks.
1732  *
1733  * It is possible to destroy datasets and attempt to create recursive
1734  * autosnapshots for the destroyed datasets in the same TXG. In such cases
1735  * autosnap sync-task will fail. To avoid this, the function puts a hold
1736  * on the datasets used for autosnapshots. The datasets names to be held
1737  * are derived from the nvlist of autosnapshots passed into the function.
1738  * If the hold fails due to ENOENT, the corresponding nvpair is removed
1739  * from the nvlist.
1740  */
1741 void
1742 autosnap_invalidate_list(dsl_pool_t *dp, nvlist_t *snapshots)
1743 {
1744         nvpair_t *pair, *prev;
1745         int rc;
1746 
1747         pair = nvlist_next_nvpair(snapshots, NULL);
1748         while (pair != NULL) {
1749                 dsl_dataset_t *ds = NULL;
1750                 char *nvp_name, *atp;
1751                 char dsname[ZFS_MAX_DATASET_NAME_LEN];
1752 
1753                 nvp_name = nvpair_name(pair);
1754                 atp = strchr(nvp_name, '@');
1755                 prev = pair;
1756                 pair = nvlist_next_nvpair(snapshots, pair);
1757 
1758                 if (atp == NULL || (atp - nvp_name) >= sizeof (dsname))
1759                         continue;
1760 
1761                 (void) strlcpy(dsname, nvp_name, atp - nvp_name + 1);
1762                 rc = dsl_dataset_hold(dp, dsname, FTAG, &ds);
1763                 if (rc == 0)
1764                         dsl_dataset_rele(ds, FTAG);
1765                 else if (rc == ENOENT)
1766                         fnvlist_remove_nvpair(snapshots, prev);
1767         }
1768 }