]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/librbd/test_librbd.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / test / librbd / test_librbd.cc
index 7937dd95d6643ebdfdf492b44bb0e97f6f1f7b8c..4dbd2e2442a8d979d0afcd21489af55e4e4fcb5c 100644 (file)
@@ -19,6 +19,8 @@
 #include "include/rbd/librbd.hpp"
 #include "include/event_type.h"
 #include "include/err.h"
+#include "common/ceph_mutex.h"
+#include "json_spirit/json_spirit.h"
 
 #include "gtest/gtest.h"
 
@@ -177,6 +179,314 @@ static int create_image_pp(librbd::RBD &rbd,
   }
 }
 
+
+
+void simple_write_cb(rbd_completion_t cb, void *arg)
+{
+  printf("write completion cb called!\n");
+}
+
+void simple_read_cb(rbd_completion_t cb, void *arg)
+{
+  printf("read completion cb called!\n");
+}
+
+void aio_write_test_data_and_poll(rbd_image_t image, int fd, const char *test_data,
+                                  uint64_t off, size_t len, uint32_t iohint, bool *passed)
+{
+  rbd_completion_t comp;
+  uint64_t data = 0x123;
+  rbd_aio_create_completion((void*)&data, (rbd_callback_t) simple_write_cb, &comp);
+  printf("created completion\n");
+  printf("started write\n");
+  if (iohint)
+    rbd_aio_write2(image, off, len, test_data, comp, iohint);
+  else
+    rbd_aio_write(image, off, len, test_data, comp);
+
+  struct pollfd pfd;
+  pfd.fd = fd;
+  pfd.events = POLLIN;
+
+  ASSERT_EQ(1, poll(&pfd, 1, -1));
+  ASSERT_TRUE(pfd.revents & POLLIN);
+
+  rbd_completion_t comps[1];
+  ASSERT_EQ(1, rbd_poll_io_events(image, comps, 1));
+  uint64_t count;
+  ASSERT_EQ(static_cast<ssize_t>(sizeof(count)),
+            read(fd, &count, sizeof(count)));
+  int r = rbd_aio_get_return_value(comps[0]);
+  ASSERT_TRUE(rbd_aio_is_complete(comps[0]));
+  ASSERT_TRUE(*(uint64_t*)rbd_aio_get_arg(comps[0]) == data);
+  printf("return value is: %d\n", r);
+  ASSERT_EQ(0, r);
+  printf("finished write\n");
+  rbd_aio_release(comps[0]);
+  *passed = true;
+}
+
+void aio_write_test_data(rbd_image_t image, const char *test_data, uint64_t off, size_t len, uint32_t iohint, bool *passed)
+{
+  rbd_completion_t comp;
+  rbd_aio_create_completion(NULL, (rbd_callback_t) simple_write_cb, &comp);
+  printf("created completion\n");
+  if (iohint)
+    rbd_aio_write2(image, off, len, test_data, comp, iohint);
+  else
+    rbd_aio_write(image, off, len, test_data, comp);
+  printf("started write\n");
+  rbd_aio_wait_for_complete(comp);
+  int r = rbd_aio_get_return_value(comp);
+  printf("return value is: %d\n", r);
+  ASSERT_EQ(0, r);
+  printf("finished write\n");
+  rbd_aio_release(comp);
+  *passed = true;
+}
+
+void write_test_data(rbd_image_t image, const char *test_data, uint64_t off, size_t len, uint32_t iohint, bool *passed)
+{
+  ssize_t written;
+  if (iohint)
+    written = rbd_write2(image, off, len, test_data, iohint);
+  else
+    written = rbd_write(image, off, len, test_data);
+  printf("wrote: %d\n", (int) written);
+  ASSERT_EQ(len, static_cast<size_t>(written));
+  *passed = true;
+}
+
+void aio_discard_test_data(rbd_image_t image, uint64_t off, uint64_t len, bool *passed)
+{
+  rbd_completion_t comp;
+  rbd_aio_create_completion(NULL, (rbd_callback_t) simple_write_cb, &comp);
+  rbd_aio_discard(image, off, len, comp);
+  rbd_aio_wait_for_complete(comp);
+  int r = rbd_aio_get_return_value(comp);
+  ASSERT_EQ(0, r);
+  printf("aio discard: %d~%d = %d\n", (int)off, (int)len, (int)r);
+  rbd_aio_release(comp);
+  *passed = true;
+}
+
+void discard_test_data(rbd_image_t image, uint64_t off, size_t len, bool *passed)
+{
+  ssize_t written;
+  written = rbd_discard(image, off, len);
+  printf("discard: %d~%d = %d\n", (int)off, (int)len, (int)written);
+  ASSERT_EQ(len, static_cast<size_t>(written));
+  *passed = true;
+}
+
+void aio_read_test_data_and_poll(rbd_image_t image, int fd, const char *expected,
+                                 uint64_t off, size_t len, uint32_t iohint, bool *passed)
+{
+  rbd_completion_t comp;
+  char *result = (char *)malloc(len + 1);
+
+  ASSERT_NE(static_cast<char *>(NULL), result);
+  rbd_aio_create_completion(NULL, (rbd_callback_t) simple_read_cb, &comp);
+  printf("created completion\n");
+  printf("started read\n");
+  if (iohint)
+    rbd_aio_read2(image, off, len, result, comp, iohint);
+  else
+    rbd_aio_read(image, off, len, result, comp);
+
+  struct pollfd pfd;
+  pfd.fd = fd;
+  pfd.events = POLLIN;
+
+  ASSERT_EQ(1, poll(&pfd, 1, -1));
+  ASSERT_TRUE(pfd.revents & POLLIN);
+
+  rbd_completion_t comps[1];
+  ASSERT_EQ(1, rbd_poll_io_events(image, comps, 1));
+  uint64_t count;
+  ASSERT_EQ(static_cast<ssize_t>(sizeof(count)),
+            read(fd, &count, sizeof(count)));
+
+  int r = rbd_aio_get_return_value(comps[0]);
+  ASSERT_TRUE(rbd_aio_is_complete(comps[0]));
+  printf("return value is: %d\n", r);
+  ASSERT_EQ(len, static_cast<size_t>(r));
+  rbd_aio_release(comps[0]);
+  if (memcmp(result, expected, len)) {
+    printf("read: %s\nexpected: %s\n", result, expected);
+    ASSERT_EQ(0, memcmp(result, expected, len));
+  }
+  free(result);
+  *passed = true;
+}
+
+void aio_read_test_data(rbd_image_t image, const char *expected, uint64_t off, size_t len, uint32_t iohint, bool *passed)
+{
+  rbd_completion_t comp;
+  char *result = (char *)malloc(len + 1);
+
+  ASSERT_NE(static_cast<char *>(NULL), result);
+  rbd_aio_create_completion(NULL, (rbd_callback_t) simple_read_cb, &comp);
+  printf("created completion\n");
+  if (iohint)
+    rbd_aio_read2(image, off, len, result, comp, iohint);
+  else
+    rbd_aio_read(image, off, len, result, comp);
+  printf("started read\n");
+  rbd_aio_wait_for_complete(comp);
+  int r = rbd_aio_get_return_value(comp);
+  printf("return value is: %d\n", r);
+  ASSERT_EQ(len, static_cast<size_t>(r));
+  rbd_aio_release(comp);
+  if (memcmp(result, expected, len)) {
+    printf("read: %s\nexpected: %s\n", result, expected);
+    ASSERT_EQ(0, memcmp(result, expected, len));
+  }
+  free(result);
+  *passed = true;
+}
+
+void read_test_data(rbd_image_t image, const char *expected, uint64_t off, size_t len, uint32_t iohint, bool *passed)
+{
+  ssize_t read;
+  char *result = (char *)malloc(len + 1);
+
+  ASSERT_NE(static_cast<char *>(NULL), result);
+  if (iohint)
+    read = rbd_read2(image, off, len, result, iohint);
+  else
+    read = rbd_read(image, off, len, result);
+  printf("read: %d\n", (int) read);
+  ASSERT_EQ(len, static_cast<size_t>(read));
+  result[len] = '\0';
+  if (memcmp(result, expected, len)) {
+    printf("read: %s\nexpected: %s\n", result, expected);
+    ASSERT_EQ(0, memcmp(result, expected, len));
+  }
+  free(result);
+  *passed = true;
+}
+
+void aio_writesame_test_data(rbd_image_t image, const char *test_data, uint64_t off, uint64_t len,
+                             uint64_t data_len, uint32_t iohint, bool *passed)
+{
+  rbd_completion_t comp;
+  rbd_aio_create_completion(NULL, (rbd_callback_t) simple_write_cb, &comp);
+  printf("created completion\n");
+  int r;
+  r = rbd_aio_writesame(image, off, len, test_data, data_len, comp, iohint);
+  printf("started writesame\n");
+  if (len % data_len) {
+    ASSERT_EQ(-EINVAL, r);
+    printf("expected fail, finished writesame\n");
+    rbd_aio_release(comp);
+    *passed = true;
+    return;
+  }
+
+  rbd_aio_wait_for_complete(comp);
+  r = rbd_aio_get_return_value(comp);
+  printf("return value is: %d\n", r);
+  ASSERT_EQ(0, r);
+  printf("finished writesame\n");
+  rbd_aio_release(comp);
+
+  //verify data
+  printf("to verify the data\n");
+  ssize_t read;
+  char *result = (char *)malloc(data_len+ 1);
+  ASSERT_NE(static_cast<char *>(NULL), result);
+  uint64_t left = len;
+  while (left > 0) {
+    read = rbd_read(image, off, data_len, result);
+    ASSERT_EQ(data_len, static_cast<size_t>(read));
+    result[data_len] = '\0';
+    if (memcmp(result, test_data, data_len)) {
+      printf("read: %d ~ %d\n", (int) off, (int) read);
+      printf("read: %s\nexpected: %s\n", result, test_data);
+      ASSERT_EQ(0, memcmp(result, test_data, data_len));
+    }
+    off += data_len;
+    left -= data_len;
+  }
+  ASSERT_EQ(0U, left);
+  free(result);
+  printf("verified\n");
+
+  *passed = true;
+}
+
+void writesame_test_data(rbd_image_t image, const char *test_data, uint64_t off, uint64_t len,
+                         uint64_t data_len, uint32_t iohint, bool *passed)
+{
+  ssize_t written;
+  written = rbd_writesame(image, off, len, test_data, data_len, iohint);
+  if (len % data_len) {
+    ASSERT_EQ(-EINVAL, written);
+    printf("expected fail, finished writesame\n");
+    *passed = true;
+    return;
+  }
+  ASSERT_EQ(len, static_cast<size_t>(written));
+  printf("wrote: %d\n", (int) written);
+
+  //verify data
+  printf("to verify the data\n");
+  ssize_t read;
+  char *result = (char *)malloc(data_len+ 1);
+  ASSERT_NE(static_cast<char *>(NULL), result);
+  uint64_t left = len;
+  while (left > 0) {
+    read = rbd_read(image, off, data_len, result);
+    ASSERT_EQ(data_len, static_cast<size_t>(read));
+    result[data_len] = '\0';
+    if (memcmp(result, test_data, data_len)) {
+      printf("read: %d ~ %d\n", (int) off, (int) read);
+      printf("read: %s\nexpected: %s\n", result, test_data);
+      ASSERT_EQ(0, memcmp(result, test_data, data_len));
+    }
+    off += data_len;
+    left -= data_len;
+  }
+  ASSERT_EQ(0U, left);
+  free(result);
+  printf("verified\n");
+
+  *passed = true;
+}
+
+void aio_compare_and_write_test_data(rbd_image_t image, const char *cmp_data,
+                                     const char *test_data, uint64_t off,
+                                     size_t len, uint32_t iohint, bool *passed)
+{
+  rbd_completion_t comp;
+  rbd_aio_create_completion(NULL, (rbd_callback_t) simple_write_cb, &comp);
+  printf("created completion\n");
+
+  uint64_t mismatch_offset;
+  rbd_aio_compare_and_write(image, off, len, cmp_data, test_data, comp, &mismatch_offset, iohint);
+  printf("started aio compare and write\n");
+  rbd_aio_wait_for_complete(comp);
+  int r = rbd_aio_get_return_value(comp);
+  printf("return value is: %d\n", r);
+  ASSERT_EQ(0, r);
+  printf("finished aio compare and write\n");
+  rbd_aio_release(comp);
+  *passed = true;
+}
+
+void compare_and_write_test_data(rbd_image_t image, const char *cmp_data,
+                                 const char *test_data, uint64_t off, size_t len,
+                                 uint64_t *mismatch_off, uint32_t iohint, bool *passed)
+{
+  printf("start compare and write\n");
+  ssize_t written;
+  written = rbd_compare_and_write(image, off, len, cmp_data, test_data, mismatch_off, iohint);
+  printf("compare and  wrote: %d\n", (int) written);
+  ASSERT_EQ(len, static_cast<size_t>(written));
+  *passed = true;
+}
+
 class TestLibRBD : public ::testing::Test {
 public:
 
@@ -197,7 +507,7 @@ public:
     rados_shutdown(_cluster);
     _rados.wait_for_latest_osdmap();
     _pool_names.insert(_pool_names.end(), _unique_pool_names.begin(),
-                      _unique_pool_names.end());
+                       _unique_pool_names.end());
     for (size_t i = 1; i < _pool_names.size(); ++i) {
       ASSERT_EQ(0, _rados.pool_delete(_pool_names[i].c_str()));
     }
@@ -216,6 +526,26 @@ public:
     return value == "true";
   }
 
+  bool is_skip_partial_discard_enabled(rbd_image_t image) {
+    if (is_skip_partial_discard_enabled()) {
+      rbd_flush(image);
+      uint64_t features;
+      EXPECT_EQ(0, rbd_get_features(image, &features));
+      return !(features & RBD_FEATURE_DIRTY_CACHE);
+    }
+    return false;
+  }
+
+  bool is_skip_partial_discard_enabled(librbd::Image& image) {
+    if (is_skip_partial_discard_enabled()) {
+      image.flush();
+      uint64_t features;
+      EXPECT_EQ(0, image.features(&features));
+      return !(features & RBD_FEATURE_DIRTY_CACHE);
+    }
+    return false;
+  }
+
   void validate_object_map(rbd_image_t image, bool *passed) {
     uint64_t flags;
     ASSERT_EQ(0, rbd_get_flags(image, &flags));
@@ -263,6 +593,137 @@ public:
     return pool_name;
   }
 
+  void test_io(rbd_image_t image) {
+    bool skip_discard = is_skip_partial_discard_enabled(image);
+
+    char test_data[TEST_IO_SIZE + 1];
+    char zero_data[TEST_IO_SIZE + 1];
+    char mismatch_data[TEST_IO_SIZE + 1];
+    int i;
+    uint64_t mismatch_offset;
+
+    for (i = 0; i < TEST_IO_SIZE; ++i) {
+      test_data[i] = (char) (rand() % (126 - 33) + 33);
+    }
+    test_data[TEST_IO_SIZE] = '\0';
+    memset(zero_data, 0, sizeof(zero_data));
+    memset(mismatch_data, 9, sizeof(mismatch_data));
+
+    for (i = 0; i < 5; ++i)
+      ASSERT_PASSED(write_test_data, image, test_data, TEST_IO_SIZE * i,
+                    TEST_IO_SIZE, 0);
+
+    for (i = 5; i < 10; ++i)
+      ASSERT_PASSED(aio_write_test_data, image, test_data, TEST_IO_SIZE * i,
+                    TEST_IO_SIZE, 0);
+
+    for (i = 0; i < 5; ++i)
+      ASSERT_PASSED(compare_and_write_test_data, image, test_data, test_data,
+                    TEST_IO_SIZE * i, TEST_IO_SIZE, &mismatch_offset, 0);
+
+    for (i = 5; i < 10; ++i)
+      ASSERT_PASSED(aio_compare_and_write_test_data, image, test_data, test_data,
+                    TEST_IO_SIZE * i, TEST_IO_SIZE, 0);
+
+    for (i = 0; i < 5; ++i)
+      ASSERT_PASSED(read_test_data, image, test_data, TEST_IO_SIZE * i,
+                    TEST_IO_SIZE, 0);
+
+    for (i = 5; i < 10; ++i)
+      ASSERT_PASSED(aio_read_test_data, image, test_data, TEST_IO_SIZE * i,
+                    TEST_IO_SIZE, 0);
+
+    // discard 2nd, 4th sections.
+    ASSERT_PASSED(discard_test_data, image, TEST_IO_SIZE, TEST_IO_SIZE);
+    ASSERT_PASSED(aio_discard_test_data, image, TEST_IO_SIZE*3, TEST_IO_SIZE);
+
+    ASSERT_PASSED(read_test_data, image, test_data,  0, TEST_IO_SIZE, 0);
+    ASSERT_PASSED(read_test_data, image, skip_discard ? test_data : zero_data,
+                  TEST_IO_SIZE, TEST_IO_SIZE, 0);
+    ASSERT_PASSED(read_test_data, image, test_data,  TEST_IO_SIZE*2,
+                  TEST_IO_SIZE, 0);
+    ASSERT_PASSED(read_test_data, image, skip_discard ? test_data : zero_data,
+                  TEST_IO_SIZE*3, TEST_IO_SIZE, 0);
+    ASSERT_PASSED(read_test_data, image, test_data,  TEST_IO_SIZE*4,
+                  TEST_IO_SIZE, 0);
+
+    for (i = 0; i < 15; ++i) {
+      if (i % 3 == 2) {
+        ASSERT_PASSED(writesame_test_data, image, test_data, TEST_IO_SIZE * i,
+                      TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE, 0);
+        ASSERT_PASSED(writesame_test_data, image, zero_data, TEST_IO_SIZE * i,
+                      TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE, 0);
+      } else if (i % 3 == 1) {
+        ASSERT_PASSED(writesame_test_data, image, test_data, TEST_IO_SIZE + i,
+                      TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+        ASSERT_PASSED(writesame_test_data, image, zero_data, TEST_IO_SIZE + i,
+                      TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+      } else {
+        ASSERT_PASSED(writesame_test_data, image, test_data, TEST_IO_SIZE * i,
+                      TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+        ASSERT_PASSED(writesame_test_data, image, zero_data, TEST_IO_SIZE * i,
+                      TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+      }
+    }
+    for (i = 0; i < 15; ++i) {
+      if (i % 3 == 2) {
+        ASSERT_PASSED(aio_writesame_test_data, image, test_data,
+                      TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE,
+                      0);
+        ASSERT_PASSED(aio_writesame_test_data, image, zero_data,
+                      TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE,
+                      0);
+      } else if (i % 3 == 1) {
+        ASSERT_PASSED(aio_writesame_test_data, image, test_data,
+                      TEST_IO_SIZE + i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+        ASSERT_PASSED(aio_writesame_test_data, image, zero_data,
+                      TEST_IO_SIZE + i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+      } else {
+        ASSERT_PASSED(aio_writesame_test_data, image, test_data,
+                      TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+        ASSERT_PASSED(aio_writesame_test_data, image, zero_data,
+                      TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+      }
+    }
+
+    rbd_image_info_t info;
+    rbd_completion_t comp;
+    ASSERT_EQ(0, rbd_stat(image, &info, sizeof(info)));
+    // can't read or write starting past end
+    ASSERT_EQ(-EINVAL, rbd_write(image, info.size, 1, test_data));
+    ASSERT_EQ(-EINVAL, rbd_read(image, info.size, 1, test_data));
+    // reading through end returns amount up to end
+    ASSERT_EQ(10, rbd_read(image, info.size - 10, 100, test_data));
+    // writing through end returns amount up to end
+    ASSERT_EQ(10, rbd_write(image, info.size - 10, 100, test_data));
+
+    rbd_aio_create_completion(NULL, (rbd_callback_t) simple_read_cb, &comp);
+    ASSERT_EQ(0, rbd_aio_write(image, info.size, 1, test_data, comp));
+    ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
+    ASSERT_EQ(-EINVAL, rbd_aio_get_return_value(comp));
+    rbd_aio_release(comp);
+
+    rbd_aio_create_completion(NULL, (rbd_callback_t) simple_read_cb, &comp);
+    ASSERT_EQ(0, rbd_aio_read(image, info.size, 1, test_data, comp));
+    ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
+    ASSERT_EQ(-EINVAL, rbd_aio_get_return_value(comp));
+    rbd_aio_release(comp);
+
+    ASSERT_PASSED(write_test_data, image, zero_data, 0, TEST_IO_SIZE,
+                  LIBRADOS_OP_FLAG_FADVISE_NOCACHE);
+    ASSERT_EQ(-EILSEQ, rbd_compare_and_write(image, 0, TEST_IO_SIZE,
+              mismatch_data, mismatch_data, &mismatch_offset, 0));
+    ASSERT_EQ(0U, mismatch_offset);
+    rbd_aio_create_completion(NULL, (rbd_callback_t) simple_read_cb, &comp);
+    ASSERT_EQ(0, rbd_aio_compare_and_write(image, 0, TEST_IO_SIZE, mismatch_data,
+              mismatch_data, comp, &mismatch_offset, 0));
+    ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
+    ASSERT_EQ(0U, mismatch_offset);
+    rbd_aio_release(comp);
+
+    ASSERT_PASSED(validate_object_map, image);
+  }
+
   static std::vector<std::string> _pool_names;
   static std::vector<std::string> _unique_pool_names;
   static rados_t _cluster;
@@ -710,8 +1171,8 @@ TEST_F(TestLibRBD, UpdateWatchAndResize)
   uint64_t size = 2 << 20;
   struct Watcher {
     rbd_image_t &m_image;
-    mutex m_lock;
-    condition_variable m_cond;
+    std::mutex m_lock;
+    std::condition_variable m_cond;
     size_t m_size = 0;
     static void cb(void *arg) {
       Watcher *watcher = static_cast<Watcher *>(arg);
@@ -721,12 +1182,12 @@ TEST_F(TestLibRBD, UpdateWatchAndResize)
     void handle_notify() {
       rbd_image_info_t info;
       ASSERT_EQ(0, rbd_stat(m_image, &info, sizeof(info)));
-      lock_guard<mutex> locker(m_lock);
+      std::lock_guard<std::mutex> locker(m_lock);
       m_size = info.size;
       m_cond.notify_one();
     }
     void wait_for_size(size_t size) {
-      unique_lock<mutex> locker(m_lock);
+      std::unique_lock<std::mutex> locker(m_lock);
       ASSERT_TRUE(m_cond.wait_for(locker, seconds(5),
                                  [size, this] {
                                    return this->m_size == size;}));
@@ -768,19 +1229,19 @@ TEST_F(TestLibRBD, UpdateWatchAndResizePP)
       void handle_notify() override {
         librbd::image_info_t info;
        ASSERT_EQ(0, m_image.stat(info, sizeof(info)));
-        lock_guard<mutex> locker(m_lock);
+        std::lock_guard<std::mutex> locker(m_lock);
         m_size = info.size;
        m_cond.notify_one();
       }
       void wait_for_size(size_t size) {
-       unique_lock<mutex> locker(m_lock);
+       std::unique_lock<std::mutex> locker(m_lock);
        ASSERT_TRUE(m_cond.wait_for(locker, seconds(5),
                                    [size, this] {
                                      return this->m_size == size;}));
       }
       librbd::Image &m_image;
-      mutex m_lock;
-      condition_variable m_cond;
+      std::mutex m_lock;
+      std::condition_variable m_cond;
       size_t m_size = 0;
     } watcher(image);
     uint64_t handle;
@@ -898,144 +1359,16 @@ int test_ls_pp(librbd::RBD& rbd, librados::IoCtx& io_ctx, size_t num_expected, .
     }
     names.erase(listed_name);
   }
-  va_end(ap);
-
-  if (!names.empty()) {
-    ADD_FAILURE() << "Unexpected images discovered";
-    return -EINVAL;
-  }
-  return num;
-}
-
-TEST_F(TestLibRBD, TestCreateLsDeletePP)
-{
-  librados::IoCtx ioctx;
-  ASSERT_EQ(0, _rados.ioctx_create(create_pool(true).c_str(), ioctx));
-
-  {
-    librbd::RBD rbd;
-    librbd::Image image;
-    int order = 0;
-    std::string name = get_temp_image_name();
-    std::string name2 = get_temp_image_name();
-    uint64_t size = 2 << 20;
-
-    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
-    ASSERT_EQ(1, test_ls_pp(rbd, ioctx, 1, name.c_str()));
-    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name2.c_str(), size, &order));
-    ASSERT_EQ(2, test_ls_pp(rbd, ioctx, 2, name.c_str(), name2.c_str()));
-    ASSERT_EQ(0, rbd.remove(ioctx, name.c_str()));
-    ASSERT_EQ(1, test_ls_pp(rbd, ioctx, 1, name2.c_str()));
-  }
-
-  ioctx.close();
-}
-
-
-static int print_progress_percent(uint64_t offset, uint64_t src_size,
-                                    void *data)
-{
-  float percent = ((float)offset * 100) / src_size;
-  printf("%3.2f%% done\n", percent);
-  return 0; 
-}
-
-TEST_F(TestLibRBD, TestCopy)
-{
-  rados_ioctx_t ioctx;
-  rados_ioctx_create(_cluster, create_pool(true).c_str(), &ioctx);
-
-  rbd_image_t image;
-  rbd_image_t image2;
-  rbd_image_t image3;
-  int order = 0;
-  std::string name = get_temp_image_name();
-  std::string name2 = get_temp_image_name();
-  std::string name3 = get_temp_image_name();
-
-  uint64_t size = 2 << 20;
-
-  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
-  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
-  ASSERT_EQ(1, test_ls(ioctx, 1, name.c_str()));
-
-  size_t sum_key_len = 0;
-  size_t sum_value_len = 0;
-  std::string key;
-  std::string val;
-  for (int i = 1; i <= 70; i++) {
-    key = "key" + stringify(i);
-    val = "value" + stringify(i);
-    ASSERT_EQ(0, rbd_metadata_set(image, key.c_str(), val.c_str()));
-
-    sum_key_len += (key.size() + 1);
-    sum_value_len += (val.size() + 1);
-  }
-
-  char keys[1024];
-  char vals[1024];
-  size_t keys_len = sizeof(keys);
-  size_t vals_len = sizeof(vals);
-
-  char value[1024];
-  size_t value_len = sizeof(value);
-
-  ASSERT_EQ(0, rbd_copy(image, ioctx, name2.c_str()));
-  ASSERT_EQ(2, test_ls(ioctx, 2, name.c_str(), name2.c_str()));
-  ASSERT_EQ(0, rbd_open(ioctx, name2.c_str(), &image2, NULL));
-  ASSERT_EQ(0, rbd_metadata_list(image2, "", 70, keys, &keys_len, vals,
-                                 &vals_len));
-  ASSERT_EQ(keys_len, sum_key_len);
-  ASSERT_EQ(vals_len, sum_value_len);
-
-  for (int i = 1; i <= 70; i++) {
-    key = "key" + stringify(i);
-    val = "value" + stringify(i);
-    ASSERT_EQ(0, rbd_metadata_get(image2, key.c_str(), value, &value_len));
-    ASSERT_STREQ(val.c_str(), value);
-
-    value_len = sizeof(value);
-  }
-
-  ASSERT_EQ(0, rbd_copy_with_progress(image, ioctx, name3.c_str(),
-                                     print_progress_percent, NULL));
-  ASSERT_EQ(3, test_ls(ioctx, 3, name.c_str(), name2.c_str(), name3.c_str()));
-
-  keys_len = sizeof(keys);
-  vals_len = sizeof(vals);
-  ASSERT_EQ(0, rbd_open(ioctx, name3.c_str(), &image3, NULL));
-  ASSERT_EQ(0, rbd_metadata_list(image3, "", 70, keys, &keys_len, vals,
-                                 &vals_len));
-  ASSERT_EQ(keys_len, sum_key_len);
-  ASSERT_EQ(vals_len, sum_value_len);
-
-  for (int i = 1; i <= 70; i++) {
-    key = "key" + stringify(i);
-    val = "value" + stringify(i);
-    ASSERT_EQ(0, rbd_metadata_get(image3, key.c_str(), value, &value_len));
-    ASSERT_STREQ(val.c_str(), value);
-
-    value_len = sizeof(value);
-  }
-
-  ASSERT_EQ(0, rbd_close(image));
-  ASSERT_EQ(0, rbd_close(image2));
-  ASSERT_EQ(0, rbd_close(image3));
-  rados_ioctx_destroy(ioctx);
-}
+  va_end(ap);
 
-class PrintProgress : public librbd::ProgressContext
-{
-public:
-  int update_progress(uint64_t offset, uint64_t src_size) override
-  {
-    float percent = ((float)offset * 100) / src_size;
-    printf("%3.2f%% done\n", percent);
-    return 0;
+  if (!names.empty()) {
+    ADD_FAILURE() << "Unexpected images discovered";
+    return -EINVAL;
   }
-};
+  return num;
+}
 
-TEST_F(TestLibRBD, TestCopyPP)
+TEST_F(TestLibRBD, TestCreateLsDeletePP)
 {
   librados::IoCtx ioctx;
   ASSERT_EQ(0, _rados.ioctx_create(create_pool(true).c_str(), ioctx));
@@ -1043,101 +1376,48 @@ TEST_F(TestLibRBD, TestCopyPP)
   {
     librbd::RBD rbd;
     librbd::Image image;
-    librbd::Image image2;
-    librbd::Image image3;
     int order = 0;
     std::string name = get_temp_image_name();
     std::string name2 = get_temp_image_name();
-    std::string name3 = get_temp_image_name();
     uint64_t size = 2 << 20;
-    PrintProgress pp;
 
     ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
-    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
-
-    std::string key;
-    std::string val;
-    for (int i = 1; i <= 70; i++) {
-      key = "key" + stringify(i);
-      val = "value" + stringify(i);
-      ASSERT_EQ(0, image.metadata_set(key, val));
-    }
-
     ASSERT_EQ(1, test_ls_pp(rbd, ioctx, 1, name.c_str()));
-    ASSERT_EQ(0, image.copy(ioctx, name2.c_str()));
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name2.c_str(), size, &order));
     ASSERT_EQ(2, test_ls_pp(rbd, ioctx, 2, name.c_str(), name2.c_str()));
-    ASSERT_EQ(0, rbd.open(ioctx, image2, name2.c_str(), NULL));
-
-    map<string, bufferlist> pairs;
-    std::string value;
-    ASSERT_EQ(0, image2.metadata_list("", 70, &pairs));
-    ASSERT_EQ(70U, pairs.size());
-
-    for (int i = 1; i <= 70; i++) {
-      key = "key" + stringify(i);
-      val = "value" + stringify(i);
-      ASSERT_EQ(0, image2.metadata_get(key.c_str(), &value));
-      ASSERT_STREQ(val.c_str(), value.c_str());
-    }
-
-    ASSERT_EQ(0, image.copy_with_progress(ioctx, name3.c_str(), pp));
-    ASSERT_EQ(3, test_ls_pp(rbd, ioctx, 3, name.c_str(), name2.c_str(),
-                           name3.c_str()));
-    ASSERT_EQ(0, rbd.open(ioctx, image3, name3.c_str(), NULL));
-
-    pairs.clear();
-    ASSERT_EQ(0, image3.metadata_list("", 70, &pairs));
-    ASSERT_EQ(70U, pairs.size());
-
-    for (int i = 1; i <= 70; i++) {
-      key = "key" + stringify(i);
-      val = "value" + stringify(i);
-      ASSERT_EQ(0, image3.metadata_get(key.c_str(), &value));
-      ASSERT_STREQ(val.c_str(), value.c_str());
-    }
+    ASSERT_EQ(0, rbd.remove(ioctx, name.c_str()));
+    ASSERT_EQ(1, test_ls_pp(rbd, ioctx, 1, name2.c_str()));
   }
 
   ioctx.close();
 }
 
-TEST_F(TestLibRBD, TestDeepCopy)
+
+static int print_progress_percent(uint64_t offset, uint64_t src_size,
+                                    void *data)
 {
-  REQUIRE_FORMAT_V2();
-  REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
+  float percent = ((float)offset * 100) / src_size;
+  printf("%3.2f%% done\n", percent);
+  return 0; 
+}
 
+TEST_F(TestLibRBD, TestCopy)
+{
   rados_ioctx_t ioctx;
   rados_ioctx_create(_cluster, create_pool(true).c_str(), &ioctx);
-  BOOST_SCOPE_EXIT_ALL( (&ioctx) ) {
-    rados_ioctx_destroy(ioctx);
-  };
 
   rbd_image_t image;
   rbd_image_t image2;
   rbd_image_t image3;
-  rbd_image_t image4;
-  rbd_image_t image5;
-  rbd_image_t image6;
   int order = 0;
   std::string name = get_temp_image_name();
   std::string name2 = get_temp_image_name();
   std::string name3 = get_temp_image_name();
-  std::string name4 = get_temp_image_name();
-  std::string name5 = get_temp_image_name();
-  std::string name6 = get_temp_image_name();
 
   uint64_t size = 2 << 20;
 
-  rbd_image_options_t opts;
-  rbd_image_options_create(&opts);
-  BOOST_SCOPE_EXIT_ALL( (&opts) ) {
-    rbd_image_options_destroy(opts);
-  };
-
   ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
   ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
-  BOOST_SCOPE_EXIT_ALL( (&image) ) {
-    ASSERT_EQ(0, rbd_close(image));
-  };
   ASSERT_EQ(1, test_ls(ioctx, 1, name.c_str()));
 
   size_t sum_key_len = 0;
@@ -1161,13 +1441,10 @@ TEST_F(TestLibRBD, TestDeepCopy)
   char value[1024];
   size_t value_len = sizeof(value);
 
-  ASSERT_EQ(0, rbd_deep_copy(image, ioctx, name2.c_str(), opts));
+  ASSERT_EQ(0, rbd_copy(image, ioctx, name2.c_str()));
   ASSERT_EQ(2, test_ls(ioctx, 2, name.c_str(), name2.c_str()));
   ASSERT_EQ(0, rbd_open(ioctx, name2.c_str(), &image2, NULL));
-  BOOST_SCOPE_EXIT_ALL( (&image2) ) {
-    ASSERT_EQ(0, rbd_close(image2));
-  };
-  ASSERT_EQ(0, rbd_metadata_list(image2, "", 70, keys, &keys_len, vals,
+  ASSERT_EQ(0, rbd_metadata_list(image2, "key", 70, keys, &keys_len, vals,
                                  &vals_len));
   ASSERT_EQ(keys_len, sum_key_len);
   ASSERT_EQ(vals_len, sum_value_len);
@@ -1181,17 +1458,14 @@ TEST_F(TestLibRBD, TestDeepCopy)
     value_len = sizeof(value);
   }
 
-  ASSERT_EQ(0, rbd_deep_copy_with_progress(image, ioctx, name3.c_str(), opts,
-                                           print_progress_percent, NULL));
+  ASSERT_EQ(0, rbd_copy_with_progress(image, ioctx, name3.c_str(),
+                                     print_progress_percent, NULL));
   ASSERT_EQ(3, test_ls(ioctx, 3, name.c_str(), name2.c_str(), name3.c_str()));
 
   keys_len = sizeof(keys);
   vals_len = sizeof(vals);
   ASSERT_EQ(0, rbd_open(ioctx, name3.c_str(), &image3, NULL));
-  BOOST_SCOPE_EXIT_ALL( (&image3) ) {
-    ASSERT_EQ(0, rbd_close(image3));
-  };
-  ASSERT_EQ(0, rbd_metadata_list(image3, "", 70, keys, &keys_len, vals,
+  ASSERT_EQ(0, rbd_metadata_list(image3, "key", 70, keys, &keys_len, vals,
                                  &vals_len));
   ASSERT_EQ(keys_len, sum_key_len);
   ASSERT_EQ(vals_len, sum_value_len);
@@ -1205,72 +1479,25 @@ TEST_F(TestLibRBD, TestDeepCopy)
     value_len = sizeof(value);
   }
 
-  ASSERT_EQ(0, rbd_snap_create(image, "deep_snap"));
   ASSERT_EQ(0, rbd_close(image));
-  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, "deep_snap"));
-  ASSERT_EQ(0, rbd_snap_protect(image, "deep_snap"));
-  ASSERT_EQ(0, rbd_clone3(ioctx, name.c_str(), "deep_snap", ioctx,
-            name4.c_str(), opts));
-
-  ASSERT_EQ(4, test_ls(ioctx, 4, name.c_str(), name2.c_str(), name3.c_str(),
-            name4.c_str()));
-  ASSERT_EQ(0, rbd_open(ioctx, name4.c_str(), &image4, NULL));
-  BOOST_SCOPE_EXIT_ALL( (&image4) ) {
-    ASSERT_EQ(0, rbd_close(image4));
-  };
-  ASSERT_EQ(0, rbd_snap_create(image4, "deep_snap"));
-
-  ASSERT_EQ(0, rbd_deep_copy(image4, ioctx, name5.c_str(), opts));
-  ASSERT_EQ(5, test_ls(ioctx, 5, name.c_str(), name2.c_str(), name3.c_str(),
-            name4.c_str(), name5.c_str()));
-  ASSERT_EQ(0, rbd_open(ioctx, name5.c_str(), &image5, NULL));
-  BOOST_SCOPE_EXIT_ALL( (&image5) ) {
-    ASSERT_EQ(0, rbd_close(image5));
-  };
-  ASSERT_EQ(0, rbd_metadata_list(image5, "", 70, keys, &keys_len, vals,
-                                 &vals_len));
-  ASSERT_EQ(keys_len, sum_key_len);
-  ASSERT_EQ(vals_len, sum_value_len);
-
-  for (int i = 1; i <= 70; i++) {
-    key = "key" + stringify(i);
-    val = "value" + stringify(i);
-    ASSERT_EQ(0, rbd_metadata_get(image5, key.c_str(), value, &value_len));
-    ASSERT_STREQ(val.c_str(), value);
-
-    value_len = sizeof(value);
-  }
-
-  ASSERT_EQ(0, rbd_deep_copy_with_progress(image4, ioctx, name6.c_str(), opts,
-                                           print_progress_percent, NULL));
-  ASSERT_EQ(6, test_ls(ioctx, 6, name.c_str(), name2.c_str(), name3.c_str(),
-            name4.c_str(), name5.c_str(), name6.c_str()));
-
-  keys_len = sizeof(keys);
-  vals_len = sizeof(vals);
-  ASSERT_EQ(0, rbd_open(ioctx, name6.c_str(), &image6, NULL));
-  BOOST_SCOPE_EXIT_ALL( (&image6) ) {
-    ASSERT_EQ(0, rbd_close(image6));
-  };
-  ASSERT_EQ(0, rbd_metadata_list(image6, "", 70, keys, &keys_len, vals,
-                                 &vals_len));
-  ASSERT_EQ(keys_len, sum_key_len);
-  ASSERT_EQ(vals_len, sum_value_len);
-
-  for (int i = 1; i <= 70; i++) {
-    key = "key" + stringify(i);
-    val = "value" + stringify(i);
-    ASSERT_EQ(0, rbd_metadata_get(image6, key.c_str(), value, &value_len));
-    ASSERT_STREQ(val.c_str(), value);
-
-    value_len = sizeof(value);
-  }
+  ASSERT_EQ(0, rbd_close(image2));
+  ASSERT_EQ(0, rbd_close(image3));
+  rados_ioctx_destroy(ioctx);
 }
 
-TEST_F(TestLibRBD, TestDeepCopyPP)
+class PrintProgress : public librbd::ProgressContext
 {
-  REQUIRE_FORMAT_V2();
+public:
+  int update_progress(uint64_t offset, uint64_t src_size) override
+  {
+    float percent = ((float)offset * 100) / src_size;
+    printf("%3.2f%% done\n", percent);
+    return 0;
+  }
+};
 
+TEST_F(TestLibRBD, TestCopyPP)
+{
   librados::IoCtx ioctx;
   ASSERT_EQ(0, _rados.ioctx_create(create_pool(true).c_str(), ioctx));
 
@@ -1284,7 +1511,6 @@ TEST_F(TestLibRBD, TestDeepCopyPP)
     std::string name2 = get_temp_image_name();
     std::string name3 = get_temp_image_name();
     uint64_t size = 2 << 20;
-    librbd::ImageOptions opts;
     PrintProgress pp;
 
     ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
@@ -1299,7 +1525,7 @@ TEST_F(TestLibRBD, TestDeepCopyPP)
     }
 
     ASSERT_EQ(1, test_ls_pp(rbd, ioctx, 1, name.c_str()));
-    ASSERT_EQ(0, image.deep_copy(ioctx, name2.c_str(), opts));
+    ASSERT_EQ(0, image.copy(ioctx, name2.c_str()));
     ASSERT_EQ(2, test_ls_pp(rbd, ioctx, 2, name.c_str(), name2.c_str()));
     ASSERT_EQ(0, rbd.open(ioctx, image2, name2.c_str(), NULL));
 
@@ -1311,637 +1537,563 @@ TEST_F(TestLibRBD, TestDeepCopyPP)
     for (int i = 1; i <= 70; i++) {
       key = "key" + stringify(i);
       val = "value" + stringify(i);
-      ASSERT_EQ(0, image2.metadata_get(key.c_str(), &value));
-      ASSERT_STREQ(val.c_str(), value.c_str());
-    }
-
-    ASSERT_EQ(0, image.deep_copy_with_progress(ioctx, name3.c_str(), opts, pp));
-    ASSERT_EQ(3, test_ls_pp(rbd, ioctx, 3, name.c_str(), name2.c_str(),
-                            name3.c_str()));
-    ASSERT_EQ(0, rbd.open(ioctx, image3, name3.c_str(), NULL));
-
-    pairs.clear();
-    ASSERT_EQ(0, image3.metadata_list("", 70, &pairs));
-    ASSERT_EQ(70U, pairs.size());
-
-    for (int i = 1; i <= 70; i++) {
-      key = "key" + stringify(i);
-      val = "value" + stringify(i);
-      ASSERT_EQ(0, image3.metadata_get(key.c_str(), &value));
-      ASSERT_STREQ(val.c_str(), value.c_str());
-    }
-  }
-
-  ioctx.close();
-}
-
-int test_ls_snaps(rbd_image_t image, int num_expected, ...)
-{
-  int num_snaps, i, j, max_size = 10;
-  va_list ap;
-  rbd_snap_info_t snaps[max_size];
-  num_snaps = rbd_snap_list(image, snaps, &max_size);
-  printf("num snaps is: %d\nexpected: %d\n", num_snaps, num_expected);
-
-  for (i = 0; i < num_snaps; i++) {
-    printf("snap: %s\n", snaps[i].name);
-  }
-
-  va_start(ap, num_expected);
-  for (i = num_expected; i > 0; i--) {
-    char *expected = va_arg(ap, char *);
-    uint64_t expected_size = va_arg(ap, uint64_t);
-    bool found = false;
-    for (j = 0; j < num_snaps; j++) {
-      if (snaps[j].name == NULL)
-       continue;
-      if (strcmp(snaps[j].name, expected) == 0) {
-       printf("found %s with size %llu\n", snaps[j].name, (unsigned long long) snaps[j].size);
-       EXPECT_EQ(expected_size, snaps[j].size);
-       free((void *) snaps[j].name);
-       snaps[j].name = NULL;
-       found = true;
-       break;
-      }
-    }
-    EXPECT_TRUE(found);
-  }
-  va_end(ap);
-
-  for (i = 0; i < num_snaps; i++) {
-    EXPECT_EQ((const char *)0, snaps[i].name);
-  }
-
-  return num_snaps;
-}
-
-TEST_F(TestLibRBD, TestCreateLsDeleteSnap)
-{
-  rados_ioctx_t ioctx;
-  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+      ASSERT_EQ(0, image2.metadata_get(key.c_str(), &value));
+      ASSERT_STREQ(val.c_str(), value.c_str());
+    }
 
-  rbd_image_t image;
-  int order = 0;
-  std::string name = get_temp_image_name();
-  uint64_t size = 2 << 20;
-  uint64_t size2 = 4 << 20;
-  
-  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
-  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+    ASSERT_EQ(0, image.copy_with_progress(ioctx, name3.c_str(), pp));
+    ASSERT_EQ(3, test_ls_pp(rbd, ioctx, 3, name.c_str(), name2.c_str(),
+                           name3.c_str()));
+    ASSERT_EQ(0, rbd.open(ioctx, image3, name3.c_str(), NULL));
 
-  ASSERT_EQ(0, rbd_snap_create(image, "snap1"));
-  ASSERT_EQ(1, test_ls_snaps(image, 1, "snap1", size));
-  ASSERT_EQ(0, rbd_resize(image, size2));
-  ASSERT_EQ(0, rbd_snap_create(image, "snap2"));
-  ASSERT_EQ(2, test_ls_snaps(image, 2, "snap1", size, "snap2", size2));
-  ASSERT_EQ(0, rbd_snap_remove(image, "snap1"));
-  ASSERT_EQ(1, test_ls_snaps(image, 1, "snap2", size2));
-  ASSERT_EQ(0, rbd_snap_remove(image, "snap2"));
-  ASSERT_EQ(0, test_ls_snaps(image, 0));
-  
-  ASSERT_EQ(0, rbd_close(image));
+    pairs.clear();
+    ASSERT_EQ(0, image3.metadata_list("", 70, &pairs));
+    ASSERT_EQ(70U, pairs.size());
 
-  rados_ioctx_destroy(ioctx);
-}
+    for (int i = 1; i <= 70; i++) {
+      key = "key" + stringify(i);
+      val = "value" + stringify(i);
+      ASSERT_EQ(0, image3.metadata_get(key.c_str(), &value));
+      ASSERT_STREQ(val.c_str(), value.c_str());
+    }
+  }
 
-int test_get_snapshot_timestamp(rbd_image_t image, uint64_t snap_id)
-{
-  struct timespec timestamp;
-  EXPECT_EQ(0, rbd_snap_get_timestamp(image, snap_id, &timestamp));
-  EXPECT_LT(0, timestamp.tv_sec);
-  return 0;
+  ioctx.close();
 }
 
-TEST_F(TestLibRBD, TestGetSnapShotTimeStamp)
+TEST_F(TestLibRBD, TestDeepCopy)
 {
   REQUIRE_FORMAT_V2();
+  REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
 
   rados_ioctx_t ioctx;
-  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+  rados_ioctx_create(_cluster, create_pool(true).c_str(), &ioctx);
+  BOOST_SCOPE_EXIT_ALL( (&ioctx) ) {
+    rados_ioctx_destroy(ioctx);
+  };
 
   rbd_image_t image;
+  rbd_image_t image2;
+  rbd_image_t image3;
+  rbd_image_t image4;
+  rbd_image_t image5;
+  rbd_image_t image6;
   int order = 0;
   std::string name = get_temp_image_name();
+  std::string name2 = get_temp_image_name();
+  std::string name3 = get_temp_image_name();
+  std::string name4 = get_temp_image_name();
+  std::string name5 = get_temp_image_name();
+  std::string name6 = get_temp_image_name();
+
   uint64_t size = 2 << 20;
-  int num_snaps, max_size = 10;
-  rbd_snap_info_t snaps[max_size];
+
+  rbd_image_options_t opts;
+  rbd_image_options_create(&opts);
+  BOOST_SCOPE_EXIT_ALL( (&opts) ) {
+    rbd_image_options_destroy(opts);
+  };
 
   ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
   ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+  BOOST_SCOPE_EXIT_ALL( (&image) ) {
+    ASSERT_EQ(0, rbd_close(image));
+  };
+  ASSERT_EQ(1, test_ls(ioctx, 1, name.c_str()));
 
-  ASSERT_EQ(0, rbd_snap_create(image, "snap1"));
-  num_snaps = rbd_snap_list(image, snaps, &max_size);
-  ASSERT_EQ(1, num_snaps);
-  ASSERT_EQ(0, test_get_snapshot_timestamp(image, snaps[0].id));
-  free((void *)snaps[0].name);
+  size_t sum_key_len = 0;
+  size_t sum_value_len = 0;
+  std::string key;
+  std::string val;
+  for (int i = 1; i <= 70; i++) {
+    key = "key" + stringify(i);
+    val = "value" + stringify(i);
+    ASSERT_EQ(0, rbd_metadata_set(image, key.c_str(), val.c_str()));
 
-  ASSERT_EQ(0, rbd_snap_create(image, "snap2"));
-  num_snaps = rbd_snap_list(image, snaps, &max_size);
-  ASSERT_EQ(2, num_snaps);
-  ASSERT_EQ(0, test_get_snapshot_timestamp(image, snaps[0].id));
-  ASSERT_EQ(0, test_get_snapshot_timestamp(image, snaps[1].id));
-  free((void *)snaps[0].name);
-  free((void *)snaps[1].name);
+    sum_key_len += (key.size() + 1);
+    sum_value_len += (val.size() + 1);
+  }
 
-  ASSERT_EQ(0, rbd_close(image));
+  char keys[1024];
+  char vals[1024];
+  size_t keys_len = sizeof(keys);
+  size_t vals_len = sizeof(vals);
 
-  rados_ioctx_destroy(ioctx);
-}
+  char value[1024];
+  size_t value_len = sizeof(value);
 
+  ASSERT_EQ(0, rbd_deep_copy(image, ioctx, name2.c_str(), opts));
+  ASSERT_EQ(2, test_ls(ioctx, 2, name.c_str(), name2.c_str()));
+  ASSERT_EQ(0, rbd_open(ioctx, name2.c_str(), &image2, NULL));
+  BOOST_SCOPE_EXIT_ALL( (&image2) ) {
+    ASSERT_EQ(0, rbd_close(image2));
+  };
+  ASSERT_EQ(0, rbd_metadata_list(image2, "key", 70, keys, &keys_len, vals,
+                                 &vals_len));
+  ASSERT_EQ(keys_len, sum_key_len);
+  ASSERT_EQ(vals_len, sum_value_len);
 
-int test_ls_snaps(librbd::Image& image, size_t num_expected, ...)
-{
-  int r;
-  size_t i, j;
-  va_list ap;
-  vector<librbd::snap_info_t> snaps;
-  r = image.snap_list(snaps);
-  EXPECT_TRUE(r >= 0);
-  cout << "num snaps is: " << snaps.size() << std::endl
-           << "expected: " << num_expected << std::endl;
+  for (int i = 1; i <= 70; i++) {
+    key = "key" + stringify(i);
+    val = "value" + stringify(i);
+    ASSERT_EQ(0, rbd_metadata_get(image2, key.c_str(), value, &value_len));
+    ASSERT_STREQ(val.c_str(), value);
 
-  for (i = 0; i < snaps.size(); i++) {
-    cout << "snap: " << snaps[i].name << std::endl;
+    value_len = sizeof(value);
   }
 
-  va_start(ap, num_expected);
-  for (i = num_expected; i > 0; i--) {
-    char *expected = va_arg(ap, char *);
-    uint64_t expected_size = va_arg(ap, uint64_t);
-    int found = 0;
-    for (j = 0; j < snaps.size(); j++) {
-      if (snaps[j].name == "")
-       continue;
-      if (strcmp(snaps[j].name.c_str(), expected) == 0) {
-       cout << "found " << snaps[j].name << " with size " << snaps[j].size
-            << std::endl;
-       EXPECT_EQ(expected_size, snaps[j].size);
-       snaps[j].name = "";
-       found = 1;
-       break;
-      }
-    }
-    EXPECT_TRUE(found);
-  }
-  va_end(ap);
+  ASSERT_EQ(0, rbd_deep_copy_with_progress(image, ioctx, name3.c_str(), opts,
+                                           print_progress_percent, NULL));
+  ASSERT_EQ(3, test_ls(ioctx, 3, name.c_str(), name2.c_str(), name3.c_str()));
 
-  for (i = 0; i < snaps.size(); i++) {
-    EXPECT_EQ("", snaps[i].name);
+  keys_len = sizeof(keys);
+  vals_len = sizeof(vals);
+  ASSERT_EQ(0, rbd_open(ioctx, name3.c_str(), &image3, NULL));
+  BOOST_SCOPE_EXIT_ALL( (&image3) ) {
+    ASSERT_EQ(0, rbd_close(image3));
+  };
+  ASSERT_EQ(0, rbd_metadata_list(image3, "key", 70, keys, &keys_len, vals,
+                                 &vals_len));
+  ASSERT_EQ(keys_len, sum_key_len);
+  ASSERT_EQ(vals_len, sum_value_len);
+
+  for (int i = 1; i <= 70; i++) {
+    key = "key" + stringify(i);
+    val = "value" + stringify(i);
+    ASSERT_EQ(0, rbd_metadata_get(image3, key.c_str(), value, &value_len));
+    ASSERT_STREQ(val.c_str(), value);
+
+    value_len = sizeof(value);
   }
 
-  return snaps.size();
-}
+  ASSERT_EQ(0, rbd_snap_create(image, "deep_snap"));
+  ASSERT_EQ(0, rbd_close(image));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, "deep_snap"));
+  ASSERT_EQ(0, rbd_snap_protect(image, "deep_snap"));
+  ASSERT_EQ(0, rbd_clone3(ioctx, name.c_str(), "deep_snap", ioctx,
+            name4.c_str(), opts));
 
-TEST_F(TestLibRBD, TestCreateLsDeleteSnapPP)
-{
-  librados::IoCtx ioctx;
-  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+  ASSERT_EQ(4, test_ls(ioctx, 4, name.c_str(), name2.c_str(), name3.c_str(),
+            name4.c_str()));
+  ASSERT_EQ(0, rbd_open(ioctx, name4.c_str(), &image4, NULL));
+  BOOST_SCOPE_EXIT_ALL( (&image4) ) {
+    ASSERT_EQ(0, rbd_close(image4));
+  };
+  ASSERT_EQ(0, rbd_snap_create(image4, "deep_snap"));
 
-  {
-    librbd::RBD rbd;
-    librbd::Image image;
-    int order = 0;
-    std::string name = get_temp_image_name();
-    uint64_t size = 2 << 20;
-    uint64_t size2 = 4 << 20;
-    
-    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
-    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
-   
-    bool exists;
-    ASSERT_EQ(0, image.snap_exists2("snap1", &exists));
-    ASSERT_FALSE(exists);
-    ASSERT_EQ(0, image.snap_create("snap1"));
-    ASSERT_EQ(0, image.snap_exists2("snap1", &exists));
-    ASSERT_TRUE(exists);
-    ASSERT_EQ(1, test_ls_snaps(image, 1, "snap1", size));
-    ASSERT_EQ(0, image.resize(size2));
-    ASSERT_EQ(0, image.snap_exists2("snap2", &exists));
-    ASSERT_FALSE(exists);
-    ASSERT_EQ(0, image.snap_create("snap2"));
-    ASSERT_EQ(0, image.snap_exists2("snap2", &exists));
-    ASSERT_TRUE(exists);
-    ASSERT_EQ(2, test_ls_snaps(image, 2, "snap1", size, "snap2", size2));
-    ASSERT_EQ(0, image.snap_remove("snap1"));
-    ASSERT_EQ(0, image.snap_exists2("snap1", &exists));
-    ASSERT_FALSE(exists);
-    ASSERT_EQ(1, test_ls_snaps(image, 1, "snap2", size2));
-    ASSERT_EQ(0, image.snap_remove("snap2"));
-    ASSERT_EQ(0, image.snap_exists2("snap2", &exists));
-    ASSERT_FALSE(exists);
-    ASSERT_EQ(0, test_ls_snaps(image, 0));
+  ASSERT_EQ(0, rbd_deep_copy(image4, ioctx, name5.c_str(), opts));
+  ASSERT_EQ(5, test_ls(ioctx, 5, name.c_str(), name2.c_str(), name3.c_str(),
+            name4.c_str(), name5.c_str()));
+  ASSERT_EQ(0, rbd_open(ioctx, name5.c_str(), &image5, NULL));
+  BOOST_SCOPE_EXIT_ALL( (&image5) ) {
+    ASSERT_EQ(0, rbd_close(image5));
+  };
+  ASSERT_EQ(0, rbd_metadata_list(image5, "key", 70, keys, &keys_len, vals,
+                                 &vals_len));
+  ASSERT_EQ(keys_len, sum_key_len);
+  ASSERT_EQ(vals_len, sum_value_len);
+
+  for (int i = 1; i <= 70; i++) {
+    key = "key" + stringify(i);
+    val = "value" + stringify(i);
+    ASSERT_EQ(0, rbd_metadata_get(image5, key.c_str(), value, &value_len));
+    ASSERT_STREQ(val.c_str(), value);
+
+    value_len = sizeof(value);
   }
 
-  ioctx.close();
+  ASSERT_EQ(0, rbd_deep_copy_with_progress(image4, ioctx, name6.c_str(), opts,
+                                           print_progress_percent, NULL));
+  ASSERT_EQ(6, test_ls(ioctx, 6, name.c_str(), name2.c_str(), name3.c_str(),
+            name4.c_str(), name5.c_str(), name6.c_str()));
+
+  keys_len = sizeof(keys);
+  vals_len = sizeof(vals);
+  ASSERT_EQ(0, rbd_open(ioctx, name6.c_str(), &image6, NULL));
+  BOOST_SCOPE_EXIT_ALL( (&image6) ) {
+    ASSERT_EQ(0, rbd_close(image6));
+  };
+  ASSERT_EQ(0, rbd_metadata_list(image6, "key", 70, keys, &keys_len, vals,
+                                 &vals_len));
+  ASSERT_EQ(keys_len, sum_key_len);
+  ASSERT_EQ(vals_len, sum_value_len);
+
+  for (int i = 1; i <= 70; i++) {
+    key = "key" + stringify(i);
+    val = "value" + stringify(i);
+    ASSERT_EQ(0, rbd_metadata_get(image6, key.c_str(), value, &value_len));
+    ASSERT_STREQ(val.c_str(), value);
+
+    value_len = sizeof(value);
+  }
 }
 
-TEST_F(TestLibRBD, TestGetNameIdSnapPP)
+TEST_F(TestLibRBD, TestDeepCopyPP)
 {
+  REQUIRE_FORMAT_V2();
+
   librados::IoCtx ioctx;
-  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+  ASSERT_EQ(0, _rados.ioctx_create(create_pool(true).c_str(), ioctx));
 
   {
     librbd::RBD rbd;
     librbd::Image image;
+    librbd::Image image2;
+    librbd::Image image3;
     int order = 0;
     std::string name = get_temp_image_name();
+    std::string name2 = get_temp_image_name();
+    std::string name3 = get_temp_image_name();
     uint64_t size = 2 << 20;
+    librbd::ImageOptions opts;
+    PrintProgress pp;
 
     ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
     ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
 
-    ASSERT_EQ(0, image.snap_create("snap1"));
-    ASSERT_EQ(0, image.snap_create("snap2"));
-    ASSERT_EQ(0, image.snap_create("snap3"));
-    vector<librbd::snap_info_t> snaps;
-    int r = image.snap_list(snaps);
-    EXPECT_TRUE(r >= 0);
-
-    for (size_t i = 0; i < snaps.size(); ++i) {
-      std::string expected_snap_name;
-      image.snap_get_name(snaps[i].id, &expected_snap_name);
-      ASSERT_EQ(expected_snap_name, snaps[i].name);
+    std::string key;
+    std::string val;
+    for (int i = 1; i <= 70; i++) {
+      key = "key" + stringify(i);
+      val = "value" + stringify(i);
+      ASSERT_EQ(0, image.metadata_set(key, val));
     }
 
-    for (size_t i = 0; i < snaps.size(); ++i) {
-      uint64_t expected_snap_id;
-      image.snap_get_id(snaps[i].name, &expected_snap_id);
-      ASSERT_EQ(expected_snap_id, snaps[i].id);
-    }
+    ASSERT_EQ(1, test_ls_pp(rbd, ioctx, 1, name.c_str()));
+    ASSERT_EQ(0, image.deep_copy(ioctx, name2.c_str(), opts));
+    ASSERT_EQ(2, test_ls_pp(rbd, ioctx, 2, name.c_str(), name2.c_str()));
+    ASSERT_EQ(0, rbd.open(ioctx, image2, name2.c_str(), NULL));
 
-    ASSERT_EQ(0, image.snap_remove("snap1"));
-    ASSERT_EQ(0, image.snap_remove("snap2"));
-    ASSERT_EQ(0, image.snap_remove("snap3"));
-    ASSERT_EQ(0, test_ls_snaps(image, 0));
-  }
+    map<string, bufferlist> pairs;
+    std::string value;
+    ASSERT_EQ(0, image2.metadata_list("", 70, &pairs));
+    ASSERT_EQ(70U, pairs.size());
 
-  ioctx.close();
-}
+    for (int i = 1; i <= 70; i++) {
+      key = "key" + stringify(i);
+      val = "value" + stringify(i);
+      ASSERT_EQ(0, image2.metadata_get(key.c_str(), &value));
+      ASSERT_STREQ(val.c_str(), value.c_str());
+    }
 
-TEST_F(TestLibRBD, TestCreateLsRenameSnapPP)
-{
-  librados::IoCtx ioctx;
-  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+    ASSERT_EQ(0, image.deep_copy_with_progress(ioctx, name3.c_str(), opts, pp));
+    ASSERT_EQ(3, test_ls_pp(rbd, ioctx, 3, name.c_str(), name2.c_str(),
+                            name3.c_str()));
+    ASSERT_EQ(0, rbd.open(ioctx, image3, name3.c_str(), NULL));
 
-  {
-    librbd::RBD rbd;
-    librbd::Image image;
-    int order = 0;
-    std::string name = get_temp_image_name();
-    uint64_t size = 2 << 20;
-    uint64_t size2 = 4 << 20;
-    
-    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
-    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
-    
-    bool exists;
-    ASSERT_EQ(0, image.snap_exists2("snap1", &exists));
-    ASSERT_FALSE(exists);
-    ASSERT_EQ(0, image.snap_create("snap1"));
-    ASSERT_EQ(0, image.snap_exists2("snap1", &exists));
-    ASSERT_TRUE(exists);
-    ASSERT_EQ(1, test_ls_snaps(image, 1, "snap1", size));
-    ASSERT_EQ(0, image.resize(size2));
-    ASSERT_EQ(0, image.snap_exists2("snap2", &exists));
-    ASSERT_FALSE(exists);
-    ASSERT_EQ(0, image.snap_create("snap2"));
-    ASSERT_EQ(0, image.snap_exists2("snap2", &exists));
-    ASSERT_TRUE(exists);
-    ASSERT_EQ(2, test_ls_snaps(image, 2, "snap1", size, "snap2", size2));
-    ASSERT_EQ(0, image.snap_rename("snap1","snap1-rename"));
-    ASSERT_EQ(2, test_ls_snaps(image, 2, "snap1-rename", size, "snap2", size2));
-    ASSERT_EQ(0, image.snap_exists2("snap1", &exists));
-    ASSERT_FALSE(exists);
-    ASSERT_EQ(0, image.snap_exists2("snap1-rename", &exists));
-    ASSERT_TRUE(exists);
-    ASSERT_EQ(0, image.snap_remove("snap1-rename"));
-    ASSERT_EQ(0, image.snap_rename("snap2","snap2-rename"));
-    ASSERT_EQ(1, test_ls_snaps(image, 1, "snap2-rename", size2));
-    ASSERT_EQ(0, image.snap_exists2("snap2", &exists));
-    ASSERT_FALSE(exists);
-    ASSERT_EQ(0, image.snap_exists2("snap2-rename", &exists));
-    ASSERT_TRUE(exists);
-    ASSERT_EQ(0, image.snap_remove("snap2-rename"));
-    ASSERT_EQ(0, test_ls_snaps(image, 0));
+    pairs.clear();
+    ASSERT_EQ(0, image3.metadata_list("", 70, &pairs));
+    ASSERT_EQ(70U, pairs.size());
+
+    for (int i = 1; i <= 70; i++) {
+      key = "key" + stringify(i);
+      val = "value" + stringify(i);
+      ASSERT_EQ(0, image3.metadata_get(key.c_str(), &value));
+      ASSERT_STREQ(val.c_str(), value.c_str());
+    }
   }
 
   ioctx.close();
 }
 
-void simple_write_cb(rbd_completion_t cb, void *arg)
-{
-  printf("write completion cb called!\n");
-}
-
-void simple_read_cb(rbd_completion_t cb, void *arg)
+int test_ls_snaps(rbd_image_t image, int num_expected, ...)
 {
-  printf("read completion cb called!\n");
-}
+  int num_snaps, i, j, max_size = 10;
+  va_list ap;
+  rbd_snap_info_t snaps[max_size];
+  num_snaps = rbd_snap_list(image, snaps, &max_size);
+  printf("num snaps is: %d\nexpected: %d\n", num_snaps, num_expected);
 
-void aio_write_test_data_and_poll(rbd_image_t image, int fd, const char *test_data,
-                                  uint64_t off, size_t len, uint32_t iohint, bool *passed)
-{
-  rbd_completion_t comp;
-  uint64_t data = 0x123;
-  rbd_aio_create_completion((void*)&data, (rbd_callback_t) simple_write_cb, &comp);
-  printf("created completion\n");
-  printf("started write\n");
-  if (iohint)
-    rbd_aio_write2(image, off, len, test_data, comp, iohint);
-  else
-    rbd_aio_write(image, off, len, test_data, comp);
+  for (i = 0; i < num_snaps; i++) {
+    printf("snap: %s\n", snaps[i].name);
+  }
 
-  struct pollfd pfd;
-  pfd.fd = fd;
-  pfd.events = POLLIN;
+  va_start(ap, num_expected);
+  for (i = num_expected; i > 0; i--) {
+    char *expected = va_arg(ap, char *);
+    uint64_t expected_size = va_arg(ap, uint64_t);
+    bool found = false;
+    for (j = 0; j < num_snaps; j++) {
+      if (snaps[j].name == NULL)
+       continue;
+      if (strcmp(snaps[j].name, expected) == 0) {
+       printf("found %s with size %llu\n", snaps[j].name, (unsigned long long) snaps[j].size);
+       EXPECT_EQ(expected_size, snaps[j].size);
+       free((void *) snaps[j].name);
+       snaps[j].name = NULL;
+       found = true;
+       break;
+      }
+    }
+    EXPECT_TRUE(found);
+  }
+  va_end(ap);
 
-  ASSERT_EQ(1, poll(&pfd, 1, -1));
-  ASSERT_TRUE(pfd.revents & POLLIN);
+  for (i = 0; i < num_snaps; i++) {
+    EXPECT_EQ((const char *)0, snaps[i].name);
+  }
 
-  rbd_completion_t comps[1];
-  ASSERT_EQ(1, rbd_poll_io_events(image, comps, 1));
-  uint64_t count;
-  ASSERT_EQ(static_cast<ssize_t>(sizeof(count)),
-            read(fd, &count, sizeof(count)));
-  int r = rbd_aio_get_return_value(comps[0]);
-  ASSERT_TRUE(rbd_aio_is_complete(comps[0]));
-  ASSERT_TRUE(*(uint64_t*)rbd_aio_get_arg(comps[0]) == data);
-  printf("return value is: %d\n", r);
-  ASSERT_EQ(0, r);
-  printf("finished write\n");
-  rbd_aio_release(comps[0]);
-  *passed = true;
+  return num_snaps;
 }
 
-void aio_write_test_data(rbd_image_t image, const char *test_data, uint64_t off, size_t len, uint32_t iohint, bool *passed)
+TEST_F(TestLibRBD, TestCreateLsDeleteSnap)
 {
-  rbd_completion_t comp;
-  rbd_aio_create_completion(NULL, (rbd_callback_t) simple_write_cb, &comp);
-  printf("created completion\n");
-  if (iohint)
-    rbd_aio_write2(image, off, len, test_data, comp, iohint);
-  else
-    rbd_aio_write(image, off, len, test_data, comp);
-  printf("started write\n");
-  rbd_aio_wait_for_complete(comp);
-  int r = rbd_aio_get_return_value(comp);
-  printf("return value is: %d\n", r);
-  ASSERT_EQ(0, r);
-  printf("finished write\n");
-  rbd_aio_release(comp);
-  *passed = true;
-}
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
 
-void write_test_data(rbd_image_t image, const char *test_data, uint64_t off, size_t len, uint32_t iohint, bool *passed)
-{
-  ssize_t written;
-  if (iohint)
-    written = rbd_write2(image, off, len, test_data, iohint);
-  else
-    written = rbd_write(image, off, len, test_data);
-  printf("wrote: %d\n", (int) written);
-  ASSERT_EQ(len, static_cast<size_t>(written));
-  *passed = true;
-}
+  rbd_image_t image;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 2 << 20;
+  uint64_t size2 = 4 << 20;
+  
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
 
-void aio_discard_test_data(rbd_image_t image, uint64_t off, uint64_t len, bool *passed)
-{
-  rbd_completion_t comp;
-  rbd_aio_create_completion(NULL, (rbd_callback_t) simple_write_cb, &comp);
-  rbd_aio_discard(image, off, len, comp);
-  rbd_aio_wait_for_complete(comp);
-  int r = rbd_aio_get_return_value(comp);
-  ASSERT_EQ(0, r);
-  printf("aio discard: %d~%d = %d\n", (int)off, (int)len, (int)r);
-  rbd_aio_release(comp);
-  *passed = true;
+  ASSERT_EQ(0, rbd_snap_create(image, "snap1"));
+  ASSERT_EQ(1, test_ls_snaps(image, 1, "snap1", size));
+  ASSERT_EQ(0, rbd_resize(image, size2));
+  ASSERT_EQ(0, rbd_snap_create(image, "snap2"));
+  ASSERT_EQ(2, test_ls_snaps(image, 2, "snap1", size, "snap2", size2));
+  ASSERT_EQ(0, rbd_snap_remove(image, "snap1"));
+  ASSERT_EQ(1, test_ls_snaps(image, 1, "snap2", size2));
+  ASSERT_EQ(0, rbd_snap_remove(image, "snap2"));
+  ASSERT_EQ(0, test_ls_snaps(image, 0));
+  
+  ASSERT_EQ(0, rbd_close(image));
+
+  rados_ioctx_destroy(ioctx);
 }
 
-void discard_test_data(rbd_image_t image, uint64_t off, size_t len, bool *passed)
+int test_get_snapshot_timestamp(rbd_image_t image, uint64_t snap_id)
 {
-  ssize_t written;
-  written = rbd_discard(image, off, len);
-  printf("discard: %d~%d = %d\n", (int)off, (int)len, (int)written);
-  ASSERT_EQ(len, static_cast<size_t>(written));
-  *passed = true;
+  struct timespec timestamp;
+  EXPECT_EQ(0, rbd_snap_get_timestamp(image, snap_id, &timestamp));
+  EXPECT_LT(0, timestamp.tv_sec);
+  return 0;
 }
 
-void aio_read_test_data_and_poll(rbd_image_t image, int fd, const char *expected,
-                                 uint64_t off, size_t len, uint32_t iohint, bool *passed)
+TEST_F(TestLibRBD, TestGetSnapShotTimeStamp)
 {
-  rbd_completion_t comp;
-  char *result = (char *)malloc(len + 1);
+  REQUIRE_FORMAT_V2();
 
-  ASSERT_NE(static_cast<char *>(NULL), result);
-  rbd_aio_create_completion(NULL, (rbd_callback_t) simple_read_cb, &comp);
-  printf("created completion\n");
-  printf("started read\n");
-  if (iohint)
-    rbd_aio_read2(image, off, len, result, comp, iohint);
-  else
-    rbd_aio_read(image, off, len, result, comp);
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
 
-  struct pollfd pfd;
-  pfd.fd = fd;
-  pfd.events = POLLIN;
+  rbd_image_t image;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 2 << 20;
+  int num_snaps, max_size = 10;
+  rbd_snap_info_t snaps[max_size];
 
-  ASSERT_EQ(1, poll(&pfd, 1, -1));
-  ASSERT_TRUE(pfd.revents & POLLIN);
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
 
-  rbd_completion_t comps[1];
-  ASSERT_EQ(1, rbd_poll_io_events(image, comps, 1));
-  uint64_t count;
-  ASSERT_EQ(static_cast<ssize_t>(sizeof(count)),
-            read(fd, &count, sizeof(count)));
+  ASSERT_EQ(0, rbd_snap_create(image, "snap1"));
+  num_snaps = rbd_snap_list(image, snaps, &max_size);
+  ASSERT_EQ(1, num_snaps);
+  ASSERT_EQ(0, test_get_snapshot_timestamp(image, snaps[0].id));
+  free((void *)snaps[0].name);
 
-  int r = rbd_aio_get_return_value(comps[0]);
-  ASSERT_TRUE(rbd_aio_is_complete(comps[0]));
-  printf("return value is: %d\n", r);
-  ASSERT_EQ(len, static_cast<size_t>(r));
-  rbd_aio_release(comps[0]);
-  if (memcmp(result, expected, len)) {
-    printf("read: %s\nexpected: %s\n", result, expected);
-    ASSERT_EQ(0, memcmp(result, expected, len));
-  }
-  free(result);
-  *passed = true;
-}
+  ASSERT_EQ(0, rbd_snap_create(image, "snap2"));
+  num_snaps = rbd_snap_list(image, snaps, &max_size);
+  ASSERT_EQ(2, num_snaps);
+  ASSERT_EQ(0, test_get_snapshot_timestamp(image, snaps[0].id));
+  ASSERT_EQ(0, test_get_snapshot_timestamp(image, snaps[1].id));
+  free((void *)snaps[0].name);
+  free((void *)snaps[1].name);
 
-void aio_read_test_data(rbd_image_t image, const char *expected, uint64_t off, size_t len, uint32_t iohint, bool *passed)
-{
-  rbd_completion_t comp;
-  char *result = (char *)malloc(len + 1);
+  ASSERT_EQ(0, rbd_close(image));
 
-  ASSERT_NE(static_cast<char *>(NULL), result);
-  rbd_aio_create_completion(NULL, (rbd_callback_t) simple_read_cb, &comp);
-  printf("created completion\n");
-  if (iohint)
-    rbd_aio_read2(image, off, len, result, comp, iohint);
-  else
-    rbd_aio_read(image, off, len, result, comp);
-  printf("started read\n");
-  rbd_aio_wait_for_complete(comp);
-  int r = rbd_aio_get_return_value(comp);
-  printf("return value is: %d\n", r);
-  ASSERT_EQ(len, static_cast<size_t>(r));
-  rbd_aio_release(comp);
-  if (memcmp(result, expected, len)) {
-    printf("read: %s\nexpected: %s\n", result, expected);
-    ASSERT_EQ(0, memcmp(result, expected, len));
-  }
-  free(result);
-  *passed = true;
+  rados_ioctx_destroy(ioctx);
 }
 
-void read_test_data(rbd_image_t image, const char *expected, uint64_t off, size_t len, uint32_t iohint, bool *passed)
-{
-  ssize_t read;
-  char *result = (char *)malloc(len + 1);
-
-  ASSERT_NE(static_cast<char *>(NULL), result);
-  if (iohint)
-    read = rbd_read2(image, off, len, result, iohint);
-  else
-    read = rbd_read(image, off, len, result);
-  printf("read: %d\n", (int) read);
-  ASSERT_EQ(len, static_cast<size_t>(read));
-  result[len] = '\0';
-  if (memcmp(result, expected, len)) {
-    printf("read: %s\nexpected: %s\n", result, expected);
-    ASSERT_EQ(0, memcmp(result, expected, len));
-  }
-  free(result);
-  *passed = true;
-}
 
-void aio_writesame_test_data(rbd_image_t image, const char *test_data, uint64_t off, uint64_t len,
-                             uint64_t data_len, uint32_t iohint, bool *passed)
+int test_ls_snaps(librbd::Image& image, size_t num_expected, ...)
 {
-  rbd_completion_t comp;
-  rbd_aio_create_completion(NULL, (rbd_callback_t) simple_write_cb, &comp);
-  printf("created completion\n");
   int r;
-  r = rbd_aio_writesame(image, off, len, test_data, data_len, comp, iohint);
-  printf("started writesame\n");
-  if (len % data_len) {
-    ASSERT_EQ(-EINVAL, r);
-    printf("expected fail, finished writesame\n");
-    rbd_aio_release(comp);
-    *passed = true;
-    return;
-  }
+  size_t i, j;
+  va_list ap;
+  vector<librbd::snap_info_t> snaps;
+  r = image.snap_list(snaps);
+  EXPECT_TRUE(r >= 0);
+  cout << "num snaps is: " << snaps.size() << std::endl
+           << "expected: " << num_expected << std::endl;
 
-  rbd_aio_wait_for_complete(comp);
-  r = rbd_aio_get_return_value(comp);
-  printf("return value is: %d\n", r);
-  ASSERT_EQ(0, r);
-  printf("finished writesame\n");
-  rbd_aio_release(comp);
+  for (i = 0; i < snaps.size(); i++) {
+    cout << "snap: " << snaps[i].name << std::endl;
+  }
 
-  //verify data
-  printf("to verify the data\n");
-  ssize_t read;
-  char *result = (char *)malloc(data_len+ 1);
-  ASSERT_NE(static_cast<char *>(NULL), result);
-  uint64_t left = len;
-  while (left > 0) {
-    read = rbd_read(image, off, data_len, result);
-    ASSERT_EQ(data_len, static_cast<size_t>(read));
-    result[data_len] = '\0';
-    if (memcmp(result, test_data, data_len)) {
-      printf("read: %d ~ %d\n", (int) off, (int) read);
-      printf("read: %s\nexpected: %s\n", result, test_data);
-      ASSERT_EQ(0, memcmp(result, test_data, data_len));
+  va_start(ap, num_expected);
+  for (i = num_expected; i > 0; i--) {
+    char *expected = va_arg(ap, char *);
+    uint64_t expected_size = va_arg(ap, uint64_t);
+    int found = 0;
+    for (j = 0; j < snaps.size(); j++) {
+      if (snaps[j].name == "")
+       continue;
+      if (strcmp(snaps[j].name.c_str(), expected) == 0) {
+       cout << "found " << snaps[j].name << " with size " << snaps[j].size
+            << std::endl;
+       EXPECT_EQ(expected_size, snaps[j].size);
+       snaps[j].name = "";
+       found = 1;
+       break;
+      }
     }
-    off += data_len;
-    left -= data_len;
+    EXPECT_TRUE(found);
   }
-  ASSERT_EQ(0U, left);
-  free(result);
-  printf("verified\n");
+  va_end(ap);
 
-  *passed = true;
+  for (i = 0; i < snaps.size(); i++) {
+    EXPECT_EQ("", snaps[i].name);
+  }
+
+  return snaps.size();
 }
 
-void writesame_test_data(rbd_image_t image, const char *test_data, uint64_t off, uint64_t len,
-                         uint64_t data_len, uint32_t iohint, bool *passed)
+TEST_F(TestLibRBD, TestCreateLsDeleteSnapPP)
 {
-  ssize_t written;
-  written = rbd_writesame(image, off, len, test_data, data_len, iohint);
-  if (len % data_len) {
-    ASSERT_EQ(-EINVAL, written);
-    printf("expected fail, finished writesame\n");
-    *passed = true;
-    return;
-  }
-  ASSERT_EQ(len, static_cast<size_t>(written));
-  printf("wrote: %d\n", (int) written);
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
 
-  //verify data
-  printf("to verify the data\n");
-  ssize_t read;
-  char *result = (char *)malloc(data_len+ 1);
-  ASSERT_NE(static_cast<char *>(NULL), result);
-  uint64_t left = len;
-  while (left > 0) {
-    read = rbd_read(image, off, data_len, result);
-    ASSERT_EQ(data_len, static_cast<size_t>(read));
-    result[data_len] = '\0';
-    if (memcmp(result, test_data, data_len)) {
-      printf("read: %d ~ %d\n", (int) off, (int) read);
-      printf("read: %s\nexpected: %s\n", result, test_data);
-      ASSERT_EQ(0, memcmp(result, test_data, data_len));
-    }
-    off += data_len;
-    left -= data_len;
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 2 << 20;
+    uint64_t size2 = 4 << 20;
+    
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+   
+    bool exists;
+    ASSERT_EQ(0, image.snap_exists2("snap1", &exists));
+    ASSERT_FALSE(exists);
+    ASSERT_EQ(0, image.snap_create("snap1"));
+    ASSERT_EQ(0, image.snap_exists2("snap1", &exists));
+    ASSERT_TRUE(exists);
+    ASSERT_EQ(1, test_ls_snaps(image, 1, "snap1", size));
+    ASSERT_EQ(0, image.resize(size2));
+    ASSERT_EQ(0, image.snap_exists2("snap2", &exists));
+    ASSERT_FALSE(exists);
+    ASSERT_EQ(0, image.snap_create("snap2"));
+    ASSERT_EQ(0, image.snap_exists2("snap2", &exists));
+    ASSERT_TRUE(exists);
+    ASSERT_EQ(2, test_ls_snaps(image, 2, "snap1", size, "snap2", size2));
+    ASSERT_EQ(0, image.snap_remove("snap1"));
+    ASSERT_EQ(0, image.snap_exists2("snap1", &exists));
+    ASSERT_FALSE(exists);
+    ASSERT_EQ(1, test_ls_snaps(image, 1, "snap2", size2));
+    ASSERT_EQ(0, image.snap_remove("snap2"));
+    ASSERT_EQ(0, image.snap_exists2("snap2", &exists));
+    ASSERT_FALSE(exists);
+    ASSERT_EQ(0, test_ls_snaps(image, 0));
   }
-  ASSERT_EQ(0U, left);
-  free(result);
-  printf("verified\n");
 
-  *passed = true;
+  ioctx.close();
 }
 
-void aio_compare_and_write_test_data(rbd_image_t image, const char *cmp_data,
-                                     const char *test_data, uint64_t off,
-                                     size_t len, uint32_t iohint, bool *passed)
+TEST_F(TestLibRBD, TestGetNameIdSnapPP)
 {
-  rbd_completion_t comp;
-  rbd_aio_create_completion(NULL, (rbd_callback_t) simple_write_cb, &comp);
-  printf("created completion\n");
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
 
-  uint64_t mismatch_offset;
-  rbd_aio_compare_and_write(image, off, len, cmp_data, test_data, comp, &mismatch_offset, iohint);
-  printf("started aio compare and write\n");
-  rbd_aio_wait_for_complete(comp);
-  int r = rbd_aio_get_return_value(comp);
-  printf("return value is: %d\n", r);
-  ASSERT_EQ(0, r);
-  printf("finished aio compare and write\n");
-  rbd_aio_release(comp);
-  *passed = true;
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 2 << 20;
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    ASSERT_EQ(0, image.snap_create("snap1"));
+    ASSERT_EQ(0, image.snap_create("snap2"));
+    ASSERT_EQ(0, image.snap_create("snap3"));
+    vector<librbd::snap_info_t> snaps;
+    int r = image.snap_list(snaps);
+    EXPECT_TRUE(r >= 0);
+
+    for (size_t i = 0; i < snaps.size(); ++i) {
+      std::string expected_snap_name;
+      image.snap_get_name(snaps[i].id, &expected_snap_name);
+      ASSERT_EQ(expected_snap_name, snaps[i].name);
+    }
+
+    for (size_t i = 0; i < snaps.size(); ++i) {
+      uint64_t expected_snap_id;
+      image.snap_get_id(snaps[i].name, &expected_snap_id);
+      ASSERT_EQ(expected_snap_id, snaps[i].id);
+    }
+
+    ASSERT_EQ(0, image.snap_remove("snap1"));
+    ASSERT_EQ(0, image.snap_remove("snap2"));
+    ASSERT_EQ(0, image.snap_remove("snap3"));
+    ASSERT_EQ(0, test_ls_snaps(image, 0));
+  }
+
+  ioctx.close();
 }
 
-void compare_and_write_test_data(rbd_image_t image, const char *cmp_data,
-                                 const char *test_data, uint64_t off, size_t len,
-                                 uint64_t *mismatch_off, uint32_t iohint, bool *passed)
+TEST_F(TestLibRBD, TestCreateLsRenameSnapPP)
 {
-  printf("start compare and write\n");
-  ssize_t written;
-  written = rbd_compare_and_write(image, off, len, cmp_data, test_data, mismatch_off, iohint);
-  printf("compare and  wrote: %d\n", (int) written);
-  ASSERT_EQ(len, static_cast<size_t>(written));
-  *passed = true;
-}
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 2 << 20;
+    uint64_t size2 = 4 << 20;
+    
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+    
+    bool exists;
+    ASSERT_EQ(0, image.snap_exists2("snap1", &exists));
+    ASSERT_FALSE(exists);
+    ASSERT_EQ(0, image.snap_create("snap1"));
+    ASSERT_EQ(0, image.snap_exists2("snap1", &exists));
+    ASSERT_TRUE(exists);
+    ASSERT_EQ(1, test_ls_snaps(image, 1, "snap1", size));
+    ASSERT_EQ(0, image.resize(size2));
+    ASSERT_EQ(0, image.snap_exists2("snap2", &exists));
+    ASSERT_FALSE(exists);
+    ASSERT_EQ(0, image.snap_create("snap2"));
+    ASSERT_EQ(0, image.snap_exists2("snap2", &exists));
+    ASSERT_TRUE(exists);
+    ASSERT_EQ(2, test_ls_snaps(image, 2, "snap1", size, "snap2", size2));
+    ASSERT_EQ(0, image.snap_rename("snap1","snap1-rename"));
+    ASSERT_EQ(2, test_ls_snaps(image, 2, "snap1-rename", size, "snap2", size2));
+    ASSERT_EQ(0, image.snap_exists2("snap1", &exists));
+    ASSERT_FALSE(exists);
+    ASSERT_EQ(0, image.snap_exists2("snap1-rename", &exists));
+    ASSERT_TRUE(exists);
+    ASSERT_EQ(0, image.snap_remove("snap1-rename"));
+    ASSERT_EQ(0, image.snap_rename("snap2","snap2-rename"));
+    ASSERT_EQ(1, test_ls_snaps(image, 1, "snap2-rename", size2));
+    ASSERT_EQ(0, image.snap_exists2("snap2", &exists));
+    ASSERT_FALSE(exists);
+    ASSERT_EQ(0, image.snap_exists2("snap2-rename", &exists));
+    ASSERT_TRUE(exists);
+    ASSERT_EQ(0, image.snap_remove("snap2-rename"));
+    ASSERT_EQ(0, test_ls_snaps(image, 0));
+  }
 
+  ioctx.close();
+}
 
 TEST_F(TestLibRBD, TestIO)
 {
   rados_ioctx_t ioctx;
   rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
 
-  bool skip_discard = is_skip_partial_discard_enabled();
-
   rbd_image_t image;
   int order = 0;
   std::string name = get_temp_image_name();
@@ -1950,110 +2102,115 @@ TEST_F(TestLibRBD, TestIO)
   ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
   ASSERT_EQ(0, rados_conf_set(_cluster, "rbd_read_from_replica_policy", "balance"));
   ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
-  
-  char test_data[TEST_IO_SIZE + 1];
-  char zero_data[TEST_IO_SIZE + 1];
-  char mismatch_data[TEST_IO_SIZE + 1];
-  int i;
-  uint64_t mismatch_offset;
 
-  for (i = 0; i < TEST_IO_SIZE; ++i) {
-    test_data[i] = (char) (rand() % (126 - 33) + 33);
-  }
-  test_data[TEST_IO_SIZE] = '\0';
-  memset(zero_data, 0, sizeof(zero_data));
-  memset(mismatch_data, 9, sizeof(mismatch_data));
+  test_io(image);
 
-  for (i = 0; i < 5; ++i)
-    ASSERT_PASSED(write_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE, 0);
+  ASSERT_EQ(0, rbd_close(image));
 
-  for (i = 5; i < 10; ++i)
-    ASSERT_PASSED(aio_write_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE, 0);
+  rados_ioctx_destroy(ioctx);
+}
 
-  for (i = 0; i < 5; ++i)
-    ASSERT_PASSED(compare_and_write_test_data, image, test_data, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE, &mismatch_offset, 0);
+TEST_F(TestLibRBD, TestEncryptionLUKS1)
+{
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_JOURNALING));
 
-  for (i = 5; i < 10; ++i)
-    ASSERT_PASSED(aio_compare_and_write_test_data, image, test_data, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE, 0);
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
 
-  for (i = 0; i < 5; ++i)
-    ASSERT_PASSED(read_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE, 0);
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 32 << 20;
 
-  for (i = 5; i < 10; ++i)
-    ASSERT_PASSED(aio_read_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE, 0);
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rados_conf_set(
+          _cluster, "rbd_read_from_replica_policy", "balance"));
 
-  // discard 2nd, 4th sections.
-  ASSERT_PASSED(discard_test_data, image, TEST_IO_SIZE, TEST_IO_SIZE);
-  ASSERT_PASSED(aio_discard_test_data, image, TEST_IO_SIZE*3, TEST_IO_SIZE);
+  rbd_image_t image;
+  rbd_encryption_luks1_format_options_t opts = {
+          .alg = RBD_ENCRYPTION_ALGORITHM_AES256,
+          .passphrase = "password",
+          .passphrase_size = 8,
+  };
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
 
-  ASSERT_PASSED(read_test_data, image, test_data,  0, TEST_IO_SIZE, 0);
-  ASSERT_PASSED(read_test_data, image, skip_discard ? test_data : zero_data,
-               TEST_IO_SIZE, TEST_IO_SIZE, 0);
-  ASSERT_PASSED(read_test_data, image, test_data,  TEST_IO_SIZE*2, TEST_IO_SIZE, 0);
-  ASSERT_PASSED(read_test_data, image, skip_discard ? test_data : zero_data,
-               TEST_IO_SIZE*3, TEST_IO_SIZE, 0);
-  ASSERT_PASSED(read_test_data, image, test_data,  TEST_IO_SIZE*4, TEST_IO_SIZE, 0);
+#ifndef HAVE_LIBCRYPTSETUP
+  ASSERT_EQ(-ENOTSUP, rbd_encryption_format(
+          image, RBD_ENCRYPTION_FORMAT_LUKS1, &opts, sizeof(opts)));
+  ASSERT_EQ(-ENOTSUP, rbd_encryption_load(
+          image, RBD_ENCRYPTION_FORMAT_LUKS1, &opts, sizeof(opts)));
+#else
+  ASSERT_EQ(0, rbd_encryption_format(
+          image, RBD_ENCRYPTION_FORMAT_LUKS1, &opts, sizeof(opts)));
+  ASSERT_EQ(-EEXIST, rbd_encryption_load(
+          image, RBD_ENCRYPTION_FORMAT_LUKS1, &opts, sizeof(opts)));
+
+  test_io(image);
+
+  bool passed;
+  write_test_data(image, "test", 0, 4, 0, &passed);
+  ASSERT_TRUE(passed);
+  ASSERT_EQ(0, rbd_close(image));
 
-  for (i = 0; i < 15; ++i) {
-    if (i % 3 == 2) {
-      ASSERT_PASSED(writesame_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE, 0);
-      ASSERT_PASSED(writesame_test_data, image, zero_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE, 0);
-    } else if (i % 3 == 1) {
-      ASSERT_PASSED(writesame_test_data, image, test_data, TEST_IO_SIZE + i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-      ASSERT_PASSED(writesame_test_data, image, zero_data, TEST_IO_SIZE + i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-    } else {
-      ASSERT_PASSED(writesame_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-      ASSERT_PASSED(writesame_test_data, image, zero_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-    }
-  }
-  for (i = 0; i < 15; ++i) {
-    if (i % 3 == 2) {
-      ASSERT_PASSED(aio_writesame_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE, 0);
-      ASSERT_PASSED(aio_writesame_test_data, image, zero_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE, 0);
-    } else if (i % 3 == 1) {
-      ASSERT_PASSED(aio_writesame_test_data, image, test_data, TEST_IO_SIZE + i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-      ASSERT_PASSED(aio_writesame_test_data, image, zero_data, TEST_IO_SIZE + i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-    } else {
-      ASSERT_PASSED(aio_writesame_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-      ASSERT_PASSED(aio_writesame_test_data, image, zero_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-    }
-  }
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+  ASSERT_EQ(0, rbd_encryption_load(
+          image, RBD_ENCRYPTION_FORMAT_LUKS1, &opts, sizeof(opts)));
+  read_test_data(image, "test", 0, 4, 0, &passed);
+  ASSERT_TRUE(passed);
+#endif
 
-  rbd_image_info_t info;
-  rbd_completion_t comp;
-  ASSERT_EQ(0, rbd_stat(image, &info, sizeof(info)));
-  // can't read or write starting past end
-  ASSERT_EQ(-EINVAL, rbd_write(image, info.size, 1, test_data));
-  ASSERT_EQ(-EINVAL, rbd_read(image, info.size, 1, test_data));
-  // reading through end returns amount up to end
-  ASSERT_EQ(10, rbd_read(image, info.size - 10, 100, test_data));
-  // writing through end returns amount up to end
-  ASSERT_EQ(10, rbd_write(image, info.size - 10, 100, test_data));
+  ASSERT_EQ(0, rbd_close(image));
+  rados_ioctx_destroy(ioctx);
+}
 
-  rbd_aio_create_completion(NULL, (rbd_callback_t) simple_read_cb, &comp);
-  ASSERT_EQ(0, rbd_aio_write(image, info.size, 1, test_data, comp));
-  ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
-  ASSERT_EQ(-EINVAL, rbd_aio_get_return_value(comp));
-  rbd_aio_release(comp);
+TEST_F(TestLibRBD, TestEncryptionLUKS2)
+{
+  REQUIRE(!is_feature_enabled(RBD_FEATURE_JOURNALING));
 
-  rbd_aio_create_completion(NULL, (rbd_callback_t) simple_read_cb, &comp);
-  ASSERT_EQ(0, rbd_aio_read(image, info.size, 1, test_data, comp));
-  ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
-  ASSERT_EQ(-EINVAL, rbd_aio_get_return_value(comp));
-  rbd_aio_release(comp);
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
 
-  ASSERT_PASSED(write_test_data, image, zero_data, 0, TEST_IO_SIZE, LIBRADOS_OP_FLAG_FADVISE_NOCACHE);
-  ASSERT_EQ(-EILSEQ, rbd_compare_and_write(image, 0, TEST_IO_SIZE, mismatch_data, mismatch_data, &mismatch_offset, 0));
-  ASSERT_EQ(0U, mismatch_offset);
-  rbd_aio_create_completion(NULL, (rbd_callback_t) simple_read_cb, &comp);
-  ASSERT_EQ(0, rbd_aio_compare_and_write(image, 0, TEST_IO_SIZE, mismatch_data, mismatch_data, comp, &mismatch_offset, 0));
-  ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
-  ASSERT_EQ(0U, mismatch_offset);
-  rbd_aio_release(comp);
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 32 << 20;
 
-  ASSERT_PASSED(validate_object_map, image);
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rados_conf_set(
+          _cluster, "rbd_read_from_replica_policy", "balance"));
+
+  rbd_image_t image;
+  rbd_encryption_luks2_format_options_t opts = {
+          .alg = RBD_ENCRYPTION_ALGORITHM_AES256,
+          .passphrase = "password",
+          .passphrase_size = 8,
+  };
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+
+#ifndef HAVE_LIBCRYPTSETUP
+  ASSERT_EQ(-ENOTSUP, rbd_encryption_format(
+          image, RBD_ENCRYPTION_FORMAT_LUKS2, &opts, sizeof(opts)));
+  ASSERT_EQ(-ENOTSUP, rbd_encryption_load(
+          image, RBD_ENCRYPTION_FORMAT_LUKS2, &opts, sizeof(opts)));
+#else
+  ASSERT_EQ(0, rbd_encryption_format(
+          image, RBD_ENCRYPTION_FORMAT_LUKS2, &opts, sizeof(opts)));
+  ASSERT_EQ(-EEXIST, rbd_encryption_load(
+          image, RBD_ENCRYPTION_FORMAT_LUKS2, &opts, sizeof(opts)));
+
+  test_io(image);
+
+  bool passed;
+  write_test_data(image, "test", 0, 4, 0, &passed);
+  ASSERT_TRUE(passed);
   ASSERT_EQ(0, rbd_close(image));
 
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+  ASSERT_EQ(0, rbd_encryption_load(
+          image, RBD_ENCRYPTION_FORMAT_LUKS2, &opts, sizeof(opts)));
+  read_test_data(image, "test", 0, 4, 0, &passed);
+  ASSERT_TRUE(passed);
+#endif
+
+  ASSERT_EQ(0, rbd_close(image));
   rados_ioctx_destroy(ioctx);
 }
 
@@ -2062,8 +2219,6 @@ TEST_F(TestLibRBD, TestIOWithIOHint)
   rados_ioctx_t ioctx;
   rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
 
-  bool skip_discard = is_skip_partial_discard_enabled();
-
   rbd_image_t image;
   int order = 0;
   std::string name = get_temp_image_name();
@@ -2072,6 +2227,8 @@ TEST_F(TestLibRBD, TestIOWithIOHint)
   ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
   ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
 
+  bool skip_discard = is_skip_partial_discard_enabled(image);
+
   char test_data[TEST_IO_SIZE + 1];
   char zero_data[TEST_IO_SIZE + 1];
   char mismatch_data[TEST_IO_SIZE + 1];
@@ -2208,8 +2365,6 @@ TEST_F(TestLibRBD, TestDataPoolIO)
 
   std::string data_pool_name = create_pool(true);
 
-  bool skip_discard = is_skip_partial_discard_enabled();
-
   rbd_image_t image;
   std::string name = get_temp_image_name();
   uint64_t size = 2 << 20;
@@ -2236,6 +2391,8 @@ TEST_F(TestLibRBD, TestDataPoolIO)
   ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
   ASSERT_NE(-1, rbd_get_data_pool_id(image));
 
+  bool skip_discard = is_skip_partial_discard_enabled(image);
+
   char test_data[TEST_IO_SIZE + 1];
   char zero_data[TEST_IO_SIZE + 1];
   int i;
@@ -2684,8 +2841,6 @@ TEST_F(TestLibRBD, TestIOPP)
   librados::IoCtx ioctx;
   ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
 
-  bool skip_discard = is_skip_partial_discard_enabled();
-
   {
     librbd::RBD rbd;
     librbd::Image image;
@@ -2696,6 +2851,8 @@ TEST_F(TestLibRBD, TestIOPP)
     ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
     ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
 
+    bool skip_discard = this->is_skip_partial_discard_enabled(image);
+
     char test_data[TEST_IO_SIZE + 1];
     char zero_data[TEST_IO_SIZE + 1];
     int i;
@@ -2931,15 +3088,45 @@ TEST_F(TestLibRBD, TestIOToSnapshot)
   cout << strerror(-r) << std::endl;
   ASSERT_EQ(0, rbd_close(image_at_snap));
 
-  ASSERT_EQ(2, test_ls_snaps(image, 2, "orig", isize, "written", isize));
-  ASSERT_EQ(0, rbd_snap_remove(image, "written"));
-  ASSERT_EQ(1, test_ls_snaps(image, 1, "orig", isize));
+  ASSERT_EQ(2, test_ls_snaps(image, 2, "orig", isize, "written", isize));
+  ASSERT_EQ(0, rbd_snap_remove(image, "written"));
+  ASSERT_EQ(1, test_ls_snaps(image, 1, "orig", isize));
+  ASSERT_EQ(0, rbd_snap_remove(image, "orig"));
+  ASSERT_EQ(0, test_ls_snaps(image, 0));
+
+  ASSERT_PASSED(validate_object_map, image);
+  ASSERT_EQ(0, rbd_close(image));
+
+  rados_ioctx_destroy(ioctx);
+}
+
+TEST_F(TestLibRBD, TestSnapshotDeletedIo)
+{
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+  rbd_image_t image;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t isize = 2 << 20;
+
+  int r;
+
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), isize, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+  ASSERT_EQ(0, rbd_snap_create(image, "orig"));
+
+  r = rbd_snap_set(image, "orig");
+  ASSERT_EQ(r, 0);
+
   ASSERT_EQ(0, rbd_snap_remove(image, "orig"));
-  ASSERT_EQ(0, test_ls_snaps(image, 0));
+  char test[20];
+  ASSERT_EQ(-ENOENT, rbd_read(image, 20, 20, test));
 
-  ASSERT_PASSED(validate_object_map, image);
-  ASSERT_EQ(0, rbd_close(image));
+  r = rbd_snap_set(image, NULL);
+  ASSERT_EQ(r, 0);
 
+  ASSERT_EQ(0, rbd_close(image));
   rados_ioctx_destroy(ioctx);
 }
 
@@ -3049,7 +3236,7 @@ TEST_F(TestLibRBD, TestClone)
   printf("sizes and overlaps are good between parent and child\n");
 
   // check key/value pairs in child image
-  ASSERT_EQ(0, rbd_metadata_list(child, "", 70, keys, &keys_len, vals,
+  ASSERT_EQ(0, rbd_metadata_list(child, "key", 70, keys, &keys_len, vals,
                                 &vals_len));
   ASSERT_EQ(sum_key_len, keys_len);
   ASSERT_EQ(sum_value_len, vals_len);
@@ -3183,7 +3370,7 @@ TEST_F(TestLibRBD, TestClone2)
   printf("made and opened clone \"child\"\n");
 
   // check key/value pairs in child image
-  ASSERT_EQ(0, rbd_metadata_list(child, "", 70, keys, &keys_len, vals,
+  ASSERT_EQ(0, rbd_metadata_list(child, "key", 70, keys, &keys_len, vals,
                                  &vals_len));
   ASSERT_EQ(sum_key_len, keys_len);
   ASSERT_EQ(sum_value_len, vals_len);
@@ -3949,8 +4136,6 @@ TYPED_TEST(DiffIterateTest, DiffIterate)
   librados::IoCtx ioctx;
   ASSERT_EQ(0, this->_rados.ioctx_create(this->m_pool_name.c_str(), ioctx));
 
-  bool skip_discard = this->is_skip_partial_discard_enabled();
-
   {
     librbd::RBD rbd;
     librbd::Image image;
@@ -3961,6 +4146,8 @@ TYPED_TEST(DiffIterateTest, DiffIterate)
     ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
     ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
 
+    bool skip_discard = this->is_skip_partial_discard_enabled(image);
+
     uint64_t object_size = 0;
     if (this->whole_object) {
       object_size = 1 << order;
@@ -4096,11 +4283,10 @@ TYPED_TEST(DiffIterateTest, DiffIterateDiscard)
 
 TYPED_TEST(DiffIterateTest, DiffIterateStress)
 {
+  REQUIRE(!is_rbd_pwl_enabled((CephContext *)this->_rados.cct()));
   librados::IoCtx ioctx;
   ASSERT_EQ(0, this->_rados.ioctx_create(this->m_pool_name.c_str(), ioctx));
 
-  bool skip_discard = this->is_skip_partial_discard_enabled();
-
   librbd::RBD rbd;
   librbd::Image image;
   int order = 0;
@@ -4110,6 +4296,8 @@ TYPED_TEST(DiffIterateTest, DiffIterateStress)
   ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
   ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
 
+  bool skip_discard = this->is_skip_partial_discard_enabled(image);
+
   uint64_t object_size = 0;
   if (this->whole_object) {
     object_size = 1 << order;
@@ -4220,8 +4408,6 @@ TYPED_TEST(DiffIterateTest, DiffIterateIgnoreParent)
   librados::IoCtx ioctx;
   ASSERT_EQ(0, this->_rados.ioctx_create(this->m_pool_name.c_str(), ioctx));
 
-  bool skip_discard = this->is_skip_partial_discard_enabled();
-
   librbd::RBD rbd;
   librbd::Image image;
   std::string name = this->get_temp_image_name();
@@ -4231,6 +4417,8 @@ TYPED_TEST(DiffIterateTest, DiffIterateIgnoreParent)
   ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
   ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
 
+  bool skip_discard = this->is_skip_partial_discard_enabled(image);
+
   uint64_t object_size = 0;
   if (this->whole_object) {
     object_size = 1 << order;
@@ -4271,8 +4459,6 @@ TYPED_TEST(DiffIterateTest, DiffIterateCallbackError)
   librados::IoCtx ioctx;
   ASSERT_EQ(0, this->_rados.ioctx_create(this->m_pool_name.c_str(), ioctx));
 
-  bool skip_discard = this->is_skip_partial_discard_enabled();
-
   {
     librbd::RBD rbd;
     librbd::Image image;
@@ -4283,6 +4469,8 @@ TYPED_TEST(DiffIterateTest, DiffIterateCallbackError)
     ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
     ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
 
+    bool skip_discard = this->is_skip_partial_discard_enabled(image);
+
     interval_set<uint64_t> exists;
     interval_set<uint64_t> one;
     scribble(image, 10, 102400, skip_discard, &exists, &one);
@@ -4303,8 +4491,6 @@ TYPED_TEST(DiffIterateTest, DiffIterateParentDiscard)
   librados::IoCtx ioctx;
   ASSERT_EQ(0, this->_rados.ioctx_create(this->m_pool_name.c_str(), ioctx));
 
-  bool skip_discard = this->is_skip_partial_discard_enabled();
-
   librbd::RBD rbd;
   librbd::Image image;
   std::string name = this->get_temp_image_name();
@@ -4314,6 +4500,8 @@ TYPED_TEST(DiffIterateTest, DiffIterateParentDiscard)
   ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
   ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
 
+  bool skip_discard = this->is_skip_partial_discard_enabled(image);
+
   uint64_t object_size = 0;
   if (this->whole_object) {
     object_size = 1 << order;
@@ -4485,6 +4673,8 @@ TEST_F(TestLibRBD, TestPendingAio)
                                 false, features));
   ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
 
+  ASSERT_EQ(0, rbd_invalidate_cache(image));
+
   char test_data[TEST_IO_SIZE];
   for (size_t i = 0; i < TEST_IO_SIZE; ++i) {
     test_data[i] = (char) (rand() % (126 - 33) + 33);
@@ -5420,7 +5610,7 @@ TEST_F(TestLibRBD, Metadata)
   memset_rand(keys, keys_len);
   memset_rand(vals, vals_len);
 
-  ASSERT_EQ(0, rbd_metadata_list(image, "", 0, keys, &keys_len, vals,
+  ASSERT_EQ(0, rbd_metadata_list(image, "key", 0, keys, &keys_len, vals,
                                  &vals_len));
   ASSERT_EQ(0U, keys_len);
   ASSERT_EQ(0U, vals_len);
@@ -5437,13 +5627,13 @@ TEST_F(TestLibRBD, Metadata)
   ASSERT_EQ(-ERANGE, rbd_metadata_get(image1, "key1", value, &value_len));
   ASSERT_EQ(value_len, strlen("value1") + 1);
 
-  ASSERT_EQ(-ERANGE, rbd_metadata_list(image1, "", 0, keys, &keys_len, vals,
+  ASSERT_EQ(-ERANGE, rbd_metadata_list(image1, "key", 0, keys, &keys_len, vals,
                                        &vals_len));
   keys_len = sizeof(keys);
   vals_len = sizeof(vals);
   memset_rand(keys, keys_len);
   memset_rand(vals, vals_len);
-  ASSERT_EQ(0, rbd_metadata_list(image1, "", 0, keys, &keys_len, vals,
+  ASSERT_EQ(0, rbd_metadata_list(image1, "key", 0, keys, &keys_len, vals,
                                  &vals_len));
   ASSERT_EQ(keys_len, strlen("key1") + 1 + strlen("key2") + 1);
   ASSERT_EQ(vals_len, strlen("value1") + 1 + strlen("value2") + 1);
@@ -5456,7 +5646,7 @@ TEST_F(TestLibRBD, Metadata)
   ASSERT_EQ(-ENOENT, rbd_metadata_remove(image1, "key3"));
   value_len = sizeof(value);
   ASSERT_EQ(-ENOENT, rbd_metadata_get(image1, "key3", value, &value_len));
-  ASSERT_EQ(0, rbd_metadata_list(image1, "", 0, keys, &keys_len, vals,
+  ASSERT_EQ(0, rbd_metadata_list(image1, "key", 0, keys, &keys_len, vals,
                                  &vals_len));
   ASSERT_EQ(keys_len, strlen("key2") + 1);
   ASSERT_EQ(vals_len, strlen("value2") + 1);
@@ -5473,32 +5663,28 @@ TEST_F(TestLibRBD, Metadata)
   ASSERT_EQ(0, rbd_snap_protect(image1, "snap1"));
   ASSERT_EQ(0, rbd_snap_set(image1, "snap1"));
 
-  ASSERT_EQ(0, rbd_metadata_set(image1, "key1", "value1"));
-  ASSERT_EQ(0, rbd_metadata_set(image1, "key3", "value3"));
+  ASSERT_EQ(-EROFS, rbd_metadata_set(image1, "key1", "value1"));
+  ASSERT_EQ(-EROFS, rbd_metadata_remove(image1, "key2"));
 
   keys_len = sizeof(keys);
   vals_len = sizeof(vals);
   memset_rand(keys, keys_len);
   memset_rand(vals, vals_len);
-  ASSERT_EQ(0, rbd_metadata_list(image1, "", 0, keys, &keys_len, vals,
+  ASSERT_EQ(0, rbd_metadata_list(image1, "key", 0, keys, &keys_len, vals,
                                  &vals_len));
-  ASSERT_EQ(keys_len,
-            strlen("key1") + 1 + strlen("key2") + 1 + strlen("key3") + 1);
-  ASSERT_EQ(vals_len,
-            strlen("value1") + 1 + strlen("value2") + 1 + strlen("value3") + 1);
-  ASSERT_STREQ(keys, "key1");
-  ASSERT_STREQ(keys + strlen("key1") + 1, "key2");
-  ASSERT_STREQ(keys + strlen("key1") + 1 + strlen("key2") + 1, "key3");
-  ASSERT_STREQ(vals, "value1");
-  ASSERT_STREQ(vals + strlen("value1") + 1, "value2");
-  ASSERT_STREQ(vals + strlen("value1") + 1 + strlen("value2") + 1, "value3");
+  ASSERT_EQ(keys_len, strlen("key2") + 1);
+  ASSERT_EQ(vals_len, strlen("value2") + 1);
+  ASSERT_STREQ(keys, "key2");
+  ASSERT_STREQ(vals, "value2");
 
   ASSERT_EQ(0, rbd_snap_set(image1, NULL));
+  ASSERT_EQ(0, rbd_metadata_set(image1, "key1", "value1"));
+  ASSERT_EQ(0, rbd_metadata_set(image1, "key3", "value3"));
   keys_len = sizeof(keys);
   vals_len = sizeof(vals);
   memset_rand(keys, keys_len);
   memset_rand(vals, vals_len);
-  ASSERT_EQ(0, rbd_metadata_list(image1, "", 0, keys, &keys_len, vals,
+  ASSERT_EQ(0, rbd_metadata_list(image1, "key", 0, keys, &keys_len, vals,
                                  &vals_len));
   ASSERT_EQ(keys_len,
             strlen("key1") + 1 + strlen("key2") + 1 + strlen("key3") + 1);
@@ -5526,7 +5712,7 @@ TEST_F(TestLibRBD, Metadata)
   vals_len = sizeof(vals);
   memset_rand(keys, keys_len);
   memset_rand(vals, vals_len);
-  ASSERT_EQ(0, rbd_metadata_list(image2, "", 0, keys, &keys_len, vals,
+  ASSERT_EQ(0, rbd_metadata_list(image2, "key", 0, keys, &keys_len, vals,
                                  &vals_len));
   ASSERT_EQ(keys_len, strlen("key1") + 1 + strlen("key2") + 1 + strlen("key3") +
             1 + strlen("key4") + 1);
@@ -5537,7 +5723,7 @@ TEST_F(TestLibRBD, Metadata)
   ASSERT_STREQ(vals + strlen("value1") + 1 + strlen("value2") + 1 +
                strlen("value3") + 1, "value4");
 
-  ASSERT_EQ(0, rbd_metadata_list(image1, "", 0, keys, &keys_len, vals,
+  ASSERT_EQ(0, rbd_metadata_list(image1, "key", 0, keys, &keys_len, vals,
                                  &vals_len));
   ASSERT_EQ(keys_len,
             strlen("key1") + 1 + strlen("key2") + 1 + strlen("key3") + 1);
@@ -5550,19 +5736,19 @@ TEST_F(TestLibRBD, Metadata)
   vals_len = strlen("value1") + 1;
   memset_rand(keys, keys_len);
   memset_rand(vals, vals_len);
-  ASSERT_EQ(0, rbd_metadata_list(image2, "", 1, keys, &keys_len, vals,
+  ASSERT_EQ(0, rbd_metadata_list(image2, "key", 1, keys, &keys_len, vals,
                                  &vals_len));
   ASSERT_EQ(keys_len, strlen("key1") + 1);
   ASSERT_EQ(vals_len, strlen("value1") + 1);
   ASSERT_STREQ(keys, "key1");
   ASSERT_STREQ(vals, "value1");
 
-  ASSERT_EQ(-ERANGE, rbd_metadata_list(image2, "", 2, keys, &keys_len, vals,
+  ASSERT_EQ(-ERANGE, rbd_metadata_list(image2, "key", 2, keys, &keys_len, vals,
                                        &vals_len));
   ASSERT_EQ(keys_len, strlen("key1") + 1 + strlen("key2") + 1);
   ASSERT_EQ(vals_len, strlen("value1") + 1 + strlen("value2") + 1);
 
-  ASSERT_EQ(-ERANGE, rbd_metadata_list(image2, "", 0, keys, &keys_len, vals,
+  ASSERT_EQ(-ERANGE, rbd_metadata_list(image2, "key", 0, keys, &keys_len, vals,
                                        &vals_len));
   ASSERT_EQ(keys_len, strlen("key1") + 1 + strlen("key2") + 1 + strlen("key3") +
             1 + strlen("key4") + 1);
@@ -5605,14 +5791,14 @@ TEST_F(TestLibRBD, MetadataPP)
   librbd::Image image1;
   ASSERT_EQ(0, rbd.open(ioctx, image1, name.c_str(), NULL));
   map<string, bufferlist> pairs;
-  ASSERT_EQ(0, image1.metadata_list("", 0, &pairs));
+  ASSERT_EQ(0, image1.metadata_list("key", 0, &pairs));
   ASSERT_TRUE(pairs.empty());
 
   ASSERT_EQ(0, image1.metadata_set("key1", "value1"));
   ASSERT_EQ(0, image1.metadata_set("key2", "value2"));
   ASSERT_EQ(0, image1.metadata_get("key1", &value));
   ASSERT_EQ(0, strcmp("value1", value.c_str()));
-  ASSERT_EQ(0, image1.metadata_list("", 0, &pairs));
+  ASSERT_EQ(0, image1.metadata_list("key", 0, &pairs));
   ASSERT_EQ(2U, pairs.size());
   ASSERT_EQ(0, strncmp("value1", pairs["key1"].c_str(), 6));
   ASSERT_EQ(0, strncmp("value2", pairs["key2"].c_str(), 6));
@@ -5621,7 +5807,7 @@ TEST_F(TestLibRBD, MetadataPP)
   ASSERT_EQ(0, image1.metadata_remove("key1"));
   ASSERT_EQ(-ENOENT, image1.metadata_remove("key3"));
   ASSERT_TRUE(image1.metadata_get("key3", &value) < 0);
-  ASSERT_EQ(0, image1.metadata_list("", 0, &pairs));
+  ASSERT_EQ(0, image1.metadata_list("key", 0, &pairs));
   ASSERT_EQ(1U, pairs.size());
   ASSERT_EQ(0, strncmp("value2", pairs["key2"].c_str(), 6));
 
@@ -5636,16 +5822,16 @@ TEST_F(TestLibRBD, MetadataPP)
   ASSERT_EQ(0, image1.snap_set("snap1"));
 
   pairs.clear();
-  ASSERT_EQ(0, image1.metadata_set("key1", "value1"));
-  ASSERT_EQ(0, image1.metadata_set("key3", "value3"));
-  ASSERT_EQ(0, image1.metadata_list("", 0, &pairs));
-  ASSERT_EQ(3U, pairs.size());
-  ASSERT_EQ(0, strncmp("value1", pairs["key1"].c_str(), 6));
+  ASSERT_EQ(-EROFS, image1.metadata_set("key1", "value1"));
+  ASSERT_EQ(-EROFS, image1.metadata_remove("key2"));
+  ASSERT_EQ(0, image1.metadata_list("key", 0, &pairs));
+  ASSERT_EQ(1U, pairs.size());
   ASSERT_EQ(0, strncmp("value2", pairs["key2"].c_str(), 6));
-  ASSERT_EQ(0, strncmp("value3", pairs["key3"].c_str(), 6));
 
   ASSERT_EQ(0, image1.snap_set(NULL));
-  ASSERT_EQ(0, image1.metadata_list("", 0, &pairs));
+  ASSERT_EQ(0, image1.metadata_set("key1", "value1"));
+  ASSERT_EQ(0, image1.metadata_set("key3", "value3"));
+  ASSERT_EQ(0, image1.metadata_list("key", 0, &pairs));
   ASSERT_EQ(3U, pairs.size());
   ASSERT_EQ(0, strncmp("value1", pairs["key1"].c_str(), 6));
   ASSERT_EQ(0, strncmp("value2", pairs["key2"].c_str(), 6));
@@ -5660,10 +5846,10 @@ TEST_F(TestLibRBD, MetadataPP)
   ASSERT_EQ(0, rbd.open(ioctx, image2, cname.c_str(), NULL));
   ASSERT_EQ(0, image2.metadata_set("key4", "value4"));
   pairs.clear();
-  ASSERT_EQ(0, image2.metadata_list("", 0, &pairs));
+  ASSERT_EQ(0, image2.metadata_list("key", 0, &pairs));
   ASSERT_EQ(4U, pairs.size());
   pairs.clear();
-  ASSERT_EQ(0, image1.metadata_list("", 0, &pairs));
+  ASSERT_EQ(0, image1.metadata_list("key", 0, &pairs));
   ASSERT_EQ(3U, pairs.size());
   ASSERT_EQ(-ENOENT, image1.metadata_get("key4", &value));
 }
@@ -5956,8 +6142,6 @@ TEST_F(TestLibRBD, BlockingAIO)
   librados::IoCtx ioctx;
   ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
 
-  bool skip_discard = is_skip_partial_discard_enabled();
-
   librbd::RBD rbd;
   std::string name = get_temp_image_name();
   uint64_t size = 1 << 20;
@@ -5975,6 +6159,8 @@ TEST_F(TestLibRBD, BlockingAIO)
   librbd::Image image;
   ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
 
+  bool skip_discard = this->is_skip_partial_discard_enabled(image);
+
   bufferlist bl;
   ASSERT_EQ(0, image.write(0, bl.length(), bl));
 
@@ -6763,11 +6949,11 @@ TEST_F(TestLibRBD, ExclusiveLock)
   ASSERT_FALSE(lock_owner);
 
   int owner_id = -1;
-  mutex lock;
+  std::mutex lock;
   const auto pingpong = [&](int m_id, rbd_image_t &m_image) {
       for (int i = 0; i < 10; i++) {
        {
-         lock_guard<mutex> locker(lock);
+         std::lock_guard<std::mutex> locker(lock);
          if (owner_id == m_id) {
            std::cout << m_id << ": releasing exclusive lock" << std::endl;
            EXPECT_EQ(0, rbd_lock_release(m_image));
@@ -6795,13 +6981,13 @@ TEST_F(TestLibRBD, ExclusiveLock)
        EXPECT_TRUE(lock_owner);
        std::cout << m_id << ": exclusive lock acquired" << std::endl;
        {
-         lock_guard<mutex> locker(lock);
+         std::lock_guard<std::mutex> locker(lock);
          owner_id = m_id;
        }
        usleep(rand() % 50000);
       }
 
-      lock_guard<mutex> locker(lock);
+      std::lock_guard<std::mutex> locker(lock);
       if (owner_id == m_id) {
        EXPECT_EQ(0, rbd_lock_release(m_image));
        int lock_owner;
@@ -6833,28 +7019,29 @@ TEST_F(TestLibRBD, ExclusiveLock)
 TEST_F(TestLibRBD, BreakLock)
 {
   REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
+  REQUIRE(!is_rbd_pwl_enabled((CephContext *)_rados.cct()));
 
   static char buf[10];
 
-  rados_t blacklist_cluster;
-  ASSERT_EQ("", connect_cluster(&blacklist_cluster));
+  rados_t blocklist_cluster;
+  ASSERT_EQ("", connect_cluster(&blocklist_cluster));
 
-  rados_ioctx_t ioctx, blacklist_ioctx;
+  rados_ioctx_t ioctx, blocklist_ioctx;
   ASSERT_EQ(0, rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx));
-  ASSERT_EQ(0, rados_ioctx_create(blacklist_cluster, m_pool_name.c_str(),
-                                  &blacklist_ioctx));
+  ASSERT_EQ(0, rados_ioctx_create(blocklist_cluster, m_pool_name.c_str(),
+                                  &blocklist_ioctx));
 
   std::string name = get_temp_image_name();
   uint64_t size = 2 << 20;
   int order = 0;
   ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
 
-  rbd_image_t image, blacklist_image;
+  rbd_image_t image, blocklist_image;
   ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
-  ASSERT_EQ(0, rbd_open(blacklist_ioctx, name.c_str(), &blacklist_image, NULL));
+  ASSERT_EQ(0, rbd_open(blocklist_ioctx, name.c_str(), &blocklist_image, NULL));
 
-  ASSERT_EQ(0, rbd_metadata_set(image, "conf_rbd_blacklist_on_break_lock", "true"));
-  ASSERT_EQ(0, rbd_lock_acquire(blacklist_image, RBD_LOCK_MODE_EXCLUSIVE));
+  ASSERT_EQ(0, rbd_metadata_set(image, "conf_rbd_blocklist_on_break_lock", "true"));
+  ASSERT_EQ(0, rbd_lock_acquire(blocklist_image, RBD_LOCK_MODE_EXCLUSIVE));
 
   rbd_lock_mode_t lock_mode;
   char *lock_owners[1];
@@ -6867,25 +7054,23 @@ TEST_F(TestLibRBD, BreakLock)
 
   ASSERT_EQ(0, rbd_lock_break(image, RBD_LOCK_MODE_EXCLUSIVE, lock_owners[0]));
   ASSERT_EQ(0, rbd_lock_acquire(image, RBD_LOCK_MODE_EXCLUSIVE));
-  EXPECT_EQ(0, rados_wait_for_latest_osdmap(blacklist_cluster));
+  EXPECT_EQ(0, rados_wait_for_latest_osdmap(blocklist_cluster));
 
   ASSERT_EQ((ssize_t)sizeof(buf), rbd_write(image, 0, sizeof(buf), buf));
-  ASSERT_EQ(-EBLACKLISTED, rbd_write(blacklist_image, 0, sizeof(buf), buf));
+  ASSERT_EQ(-EBLOCKLISTED, rbd_write(blocklist_image, 0, sizeof(buf), buf));
 
   ASSERT_EQ(0, rbd_close(image));
-  ASSERT_EQ(0, rbd_close(blacklist_image));
+  ASSERT_EQ(0, rbd_close(blocklist_image));
 
   rbd_lock_get_owners_cleanup(lock_owners, max_lock_owners);
 
   rados_ioctx_destroy(ioctx);
-  rados_ioctx_destroy(blacklist_ioctx);
-  rados_shutdown(blacklist_cluster);
+  rados_ioctx_destroy(blocklist_ioctx);
+  rados_shutdown(blocklist_cluster);
 }
 
 TEST_F(TestLibRBD, DiscardAfterWrite)
 {
-  REQUIRE(!is_skip_partial_discard_enabled());
-
   librados::IoCtx ioctx;
   ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
 
@@ -6898,6 +7083,10 @@ TEST_F(TestLibRBD, DiscardAfterWrite)
   librbd::Image image;
   ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
 
+  if (this->is_skip_partial_discard_enabled(image)) {
+    return;
+  }
+
   // enable writeback cache
   ASSERT_EQ(0, image.flush());
 
@@ -7327,6 +7516,16 @@ TEST_F(TestLibRBD, Migration) {
   ASSERT_EQ(status.state, RBD_IMAGE_MIGRATION_STATE_PREPARED);
   rbd_migration_status_cleanup(&status);
 
+  rbd_image_t image;
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+  char source_spec[2048];
+  size_t source_spec_length = sizeof(source_spec);
+  ASSERT_EQ(0, rbd_get_migration_source_spec(image, source_spec,
+                                             &source_spec_length));
+  json_spirit::mValue json_source_spec;
+  json_spirit::read(source_spec, json_source_spec);
+  EXPECT_EQ(0, rbd_close(image));
+
   ASSERT_EQ(-EBUSY, rbd_remove(ioctx, name.c_str()));
   ASSERT_EQ(-EBUSY, rbd_trash_move(ioctx, name.c_str(), 0));
 
@@ -7349,7 +7548,6 @@ TEST_F(TestLibRBD, Migration) {
 
   ASSERT_EQ(0, rbd_migration_abort(ioctx, name.c_str()));
 
-  rbd_image_t image;
   ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
   EXPECT_EQ(0, rbd_close(image));
 }
@@ -7390,6 +7588,22 @@ TEST_F(TestLibRBD, MigrationPP) {
   ASSERT_NE(status.dest_image_id, "");
   ASSERT_EQ(status.state, RBD_IMAGE_MIGRATION_STATE_PREPARED);
 
+  librbd::Image image;
+  ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+  std::string source_spec;
+  ASSERT_EQ(0, image.get_migration_source_spec(&source_spec));
+  json_spirit::mValue json_source_spec;
+  json_spirit::read(source_spec, json_source_spec);
+  json_spirit::mObject json_source_spec_obj = json_source_spec.get_obj();
+  ASSERT_EQ("native", json_source_spec_obj["type"].get_str());
+  ASSERT_EQ(ioctx.get_id(), json_source_spec_obj["pool_id"].get_int64());
+  ASSERT_EQ("", json_source_spec_obj["pool_namespace"].get_str());
+  ASSERT_EQ(1, json_source_spec_obj.count("image_name"));
+  if (!old_format) {
+    ASSERT_EQ(1, json_source_spec_obj.count("image_id"));
+  }
+  ASSERT_EQ(0, image.close());
+
   ASSERT_EQ(-EBUSY, rbd.remove(ioctx, name.c_str()));
   ASSERT_EQ(-EBUSY, rbd.trash_move(ioctx, name.c_str(), 0));
 
@@ -7411,7 +7625,6 @@ TEST_F(TestLibRBD, MigrationPP) {
 
   ASSERT_EQ(0, rbd.migration_abort(ioctx, name.c_str()));
 
-  librbd::Image image;
   ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
 }
 
@@ -8074,113 +8287,503 @@ TEST_F(TestLibRBD, DISABLED_TestSeqWriteAIOPP)
     }
     comps.clear();
 
-    struct timespec end_time;
-    clock_gettime(CLOCK_REALTIME, &end_time);
-    int duration = end_time.tv_sec * 1000 + end_time.tv_nsec / 1000000 -
-      start_time.tv_sec * 1000 - start_time.tv_nsec / 1000000;
-    std::cout << "duration: " << duration << " msec" << std::endl;
+    struct timespec end_time;
+    clock_gettime(CLOCK_REALTIME, &end_time);
+    int duration = end_time.tv_sec * 1000 + end_time.tv_nsec / 1000000 -
+      start_time.tv_sec * 1000 - start_time.tv_nsec / 1000000;
+    std::cout << "duration: " << duration << " msec" << std::endl;
+
+    for (uint64_t i = 0; i < size / TEST_IO_SIZE; ++i) {
+      char *p = test_data + (TEST_IO_SIZE + 1) * (i % 10);
+      ASSERT_PASSED(read_test_data, image, p, strlen(p) * i, TEST_IO_SIZE, 0);
+    }
+
+    ASSERT_PASSED(validate_object_map, image);
+  }
+
+  ioctx.close();
+}
+
+TEST_F(TestLibRBD, SnapRemoveWithChildMissing)
+{
+  REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
+  ASSERT_EQ(0, rados_conf_set(_cluster, "rbd_default_clone_format", "2"));
+  BOOST_SCOPE_EXIT_ALL(&) {
+    ASSERT_EQ(0, rados_conf_set(_cluster, "rbd_default_clone_format", "auto"));
+  };
+
+  librbd::RBD rbd;
+  rados_ioctx_t ioctx1, ioctx2;
+  string pool_name1 = create_pool(true);
+  rados_ioctx_create(_cluster, pool_name1.c_str(), &ioctx1);
+  ASSERT_EQ(0, rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx2));
+
+  bool old_format;
+  uint64_t features;
+  rbd_image_t parent, child1, child2, child3;
+  int order = 0;
+  char child_id1[4096];
+  char child_id2[4096];
+  char child_id3[4096];
+
+  ASSERT_EQ(0, get_features(&old_format, &features));
+  ASSERT_FALSE(old_format);
+  std::string parent_name = get_temp_image_name();
+  std::string child_name1 = get_temp_image_name();
+  std::string child_name2 = get_temp_image_name();
+  std::string child_name3 = get_temp_image_name();
+  ASSERT_EQ(0, create_image_full(ioctx1, parent_name.c_str(), 4<<20, &order,
+            false, features));
+  ASSERT_EQ(0, rbd_open(ioctx1, parent_name.c_str(), &parent, NULL));
+  ASSERT_EQ(0, rbd_snap_create(parent, "snap1"));
+  ASSERT_EQ(0, rbd_snap_create(parent, "snap2"));
+
+  ASSERT_EQ(0, clone_image(ioctx1, parent, parent_name.c_str(), "snap1",
+                           ioctx2, child_name1.c_str(), features, &order));
+  ASSERT_EQ(0, clone_image(ioctx1, parent, parent_name.c_str(), "snap2",
+                           ioctx1, child_name2.c_str(), features, &order));
+  ASSERT_EQ(0, clone_image(ioctx1, parent, parent_name.c_str(), "snap2",
+                           ioctx2, child_name3.c_str(), features, &order));
+
+  ASSERT_EQ(0, rbd_open(ioctx2, child_name1.c_str(), &child1, NULL));
+  ASSERT_EQ(0, rbd_open(ioctx1, child_name2.c_str(), &child2, NULL));
+  ASSERT_EQ(0, rbd_open(ioctx2, child_name3.c_str(), &child3, NULL));
+  ASSERT_EQ(0, rbd_get_id(child1, child_id1, sizeof(child_id1)));
+  ASSERT_EQ(0, rbd_get_id(child2, child_id2, sizeof(child_id2)));
+  ASSERT_EQ(0, rbd_get_id(child3, child_id3, sizeof(child_id3)));
+  test_list_children2(parent, 3,
+                      child_id1, m_pool_name.c_str(), child_name1.c_str(), false,
+                      child_id2, pool_name1.c_str(), child_name2.c_str(), false,
+                      child_id3, m_pool_name.c_str(), child_name3.c_str(), false);
+
+  size_t max_size = 10;
+  rbd_linked_image_spec_t children[max_size];
+  ASSERT_EQ(0, rbd_list_children3(parent, children, &max_size));
+  ASSERT_EQ(3, static_cast<int>(max_size));
+  rbd_linked_image_spec_list_cleanup(children, max_size);
+
+  ASSERT_EQ(0, rbd_close(child1));
+  ASSERT_EQ(0, rbd_close(child2));
+  ASSERT_EQ(0, rbd_close(child3));
+  rados_ioctx_destroy(ioctx2);
+  ASSERT_EQ(0, rados_pool_delete(_cluster, m_pool_name.c_str()));
+  _pool_names.erase(std::remove(_pool_names.begin(),
+                                _pool_names.end(), m_pool_name),
+                    _pool_names.end());
+  EXPECT_EQ(0, rados_wait_for_latest_osdmap(_cluster));
+
+  ASSERT_EQ(0, rbd_list_children3(parent, children, &max_size));
+  ASSERT_EQ(3, static_cast<int>(max_size));
+  rbd_linked_image_spec_list_cleanup(children, max_size);
+  ASSERT_EQ(0, rbd_snap_remove(parent, "snap1"));
+  ASSERT_EQ(0, rbd_list_children3(parent, children, &max_size));
+  ASSERT_EQ(2, static_cast<int>(max_size));
+  rbd_linked_image_spec_list_cleanup(children, max_size);
+
+  ASSERT_EQ(0, rbd_remove(ioctx1, child_name2.c_str()));
+  ASSERT_EQ(0, rbd_list_children3(parent, children, &max_size));
+  ASSERT_EQ(1, static_cast<int>(max_size));
+  rbd_linked_image_spec_list_cleanup(children, max_size);
+
+  ASSERT_EQ(0, rbd_snap_remove(parent, "snap2"));
+  ASSERT_EQ(0, rbd_list_children3(parent, children, &max_size));
+  ASSERT_EQ(0, static_cast<int>(max_size));
+  rbd_linked_image_spec_list_cleanup(children, max_size);
+  test_list_children2(parent, 0);
+  ASSERT_EQ(0, test_ls_snaps(parent, 0));
+
+  ASSERT_EQ(0, rbd_close(parent));
+  rados_ioctx_destroy(ioctx1);
+}
+
+TEST_F(TestLibRBD, QuiesceWatch)
+{
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 2 << 20;
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+
+  rbd_image_t image1, image2;
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image1, NULL));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image2, NULL));
+
+  struct Watcher {
+    static void quiesce_cb(void *arg) {
+      Watcher *watcher = static_cast<Watcher *>(arg);
+      watcher->handle_quiesce();
+    }
+    static void unquiesce_cb(void *arg) {
+      Watcher *watcher = static_cast<Watcher *>(arg);
+      watcher->handle_unquiesce();
+    }
+
+    rbd_image_t &image;
+    uint64_t handle = 0;
+    size_t quiesce_count = 0;
+    size_t unquiesce_count = 0;
+
+    ceph::mutex lock = ceph::make_mutex("lock");
+    ceph::condition_variable cv;
+
+    Watcher(rbd_image_t &image) : image(image) {
+    }
+
+    void handle_quiesce() {
+      ASSERT_EQ(quiesce_count, unquiesce_count);
+      quiesce_count++;
+      rbd_quiesce_complete(image, handle, 0);
+    }
+    void handle_unquiesce() {
+      std::unique_lock locker(lock);
+      unquiesce_count++;
+      ASSERT_EQ(quiesce_count, unquiesce_count);
+      cv.notify_one();
+    }
+    bool wait_for_unquiesce(size_t c) {
+      std::unique_lock locker(lock);
+      return cv.wait_for(locker, seconds(60),
+                         [this, c]() { return unquiesce_count >= c; });
+    }
+  } watcher1(image1), watcher2(image2);
+
+  ASSERT_EQ(0, rbd_quiesce_watch(image1, Watcher::quiesce_cb,
+                                 Watcher::unquiesce_cb, &watcher1,
+                                 &watcher1.handle));
+  ASSERT_EQ(0, rbd_quiesce_watch(image2, Watcher::quiesce_cb,
+                                 Watcher::unquiesce_cb, &watcher2,
+                                 &watcher2.handle));
+
+  ASSERT_EQ(0, rbd_snap_create(image1, "snap1"));
+  ASSERT_EQ(1U, watcher1.quiesce_count);
+  ASSERT_TRUE(watcher1.wait_for_unquiesce(1U));
+  ASSERT_EQ(1U, watcher2.quiesce_count);
+  ASSERT_TRUE(watcher2.wait_for_unquiesce(1U));
+
+  ASSERT_EQ(0, rbd_snap_create(image2, "snap2"));
+  ASSERT_EQ(2U, watcher1.quiesce_count);
+  ASSERT_TRUE(watcher1.wait_for_unquiesce(2U));
+  ASSERT_EQ(2U, watcher2.quiesce_count);
+  ASSERT_TRUE(watcher2.wait_for_unquiesce(2U));
+
+  ASSERT_EQ(0, rbd_quiesce_unwatch(image1, watcher1.handle));
+
+  ASSERT_EQ(0, rbd_snap_create(image1, "snap3"));
+  ASSERT_EQ(2U, watcher1.quiesce_count);
+  ASSERT_EQ(2U, watcher1.unquiesce_count);
+  ASSERT_EQ(3U, watcher2.quiesce_count);
+  ASSERT_TRUE(watcher2.wait_for_unquiesce(3U));
+
+  ASSERT_EQ(0, rbd_quiesce_unwatch(image2, watcher2.handle));
+
+  ASSERT_EQ(0, rbd_snap_remove(image1, "snap1"));
+  ASSERT_EQ(0, rbd_snap_remove(image1, "snap2"));
+  ASSERT_EQ(0, rbd_snap_remove(image1, "snap3"));
+  ASSERT_EQ(0, rbd_close(image1));
+  ASSERT_EQ(0, rbd_close(image2));
+  ASSERT_EQ(0, rbd_remove(ioctx, name.c_str()));
+  rados_ioctx_destroy(ioctx);
+}
+
+TEST_F(TestLibRBD, QuiesceWatchPP)
+{
+  librbd::RBD rbd;
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+  std::string name = get_temp_image_name();
+  int order = 0;
+  uint64_t size = 2 << 20;
+  ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+
+  {
+    librbd::Image image1, image2;
+    ASSERT_EQ(0, rbd.open(ioctx, image1, name.c_str(), NULL));
+    ASSERT_EQ(0, rbd.open(ioctx, image2, name.c_str(), NULL));
+
+    struct Watcher : public librbd::QuiesceWatchCtx {
+      librbd::Image &image;
+      uint64_t handle = 0;
+      size_t quiesce_count = 0;
+      size_t unquiesce_count = 0;
+
+      ceph::mutex lock = ceph::make_mutex("lock");
+      ceph::condition_variable cv;
+
+      Watcher(librbd::Image &image) : image(image) {
+      }
+
+      void handle_quiesce() override {
+        ASSERT_EQ(quiesce_count, unquiesce_count);
+        quiesce_count++;
+        image.quiesce_complete(handle, 0);
+      }
+      void handle_unquiesce() override {
+        std::unique_lock locker(lock);
+        unquiesce_count++;
+        ASSERT_EQ(quiesce_count, unquiesce_count);
+        cv.notify_one();
+      }
+      bool wait_for_unquiesce(size_t c) {
+        std::unique_lock locker(lock);
+        return cv.wait_for(locker, seconds(60),
+                           [this, c]() { return unquiesce_count >= c; });
+      }
+    } watcher1(image1), watcher2(image2);
+
+    ASSERT_EQ(0, image1.quiesce_watch(&watcher1, &watcher1.handle));
+    ASSERT_EQ(0, image2.quiesce_watch(&watcher2, &watcher2.handle));
+
+    ASSERT_EQ(0, image1.snap_create("snap1"));
+    ASSERT_EQ(1U, watcher1.quiesce_count);
+    ASSERT_TRUE(watcher1.wait_for_unquiesce(1U));
+    ASSERT_EQ(1U, watcher2.quiesce_count);
+    ASSERT_TRUE(watcher2.wait_for_unquiesce(1U));
+
+    ASSERT_EQ(0, image2.snap_create("snap2"));
+    ASSERT_EQ(2U, watcher1.quiesce_count);
+    ASSERT_TRUE(watcher1.wait_for_unquiesce(2U));
+    ASSERT_EQ(2U, watcher2.quiesce_count);
+    ASSERT_TRUE(watcher2.wait_for_unquiesce(2U));
+
+    ASSERT_EQ(0, image1.quiesce_unwatch(watcher1.handle));
+
+    ASSERT_EQ(0, image1.snap_create("snap3"));
+    ASSERT_EQ(2U, watcher1.quiesce_count);
+    ASSERT_EQ(2U, watcher1.unquiesce_count);
+    ASSERT_EQ(3U, watcher2.quiesce_count);
+    ASSERT_TRUE(watcher2.wait_for_unquiesce(3U));
+
+    ASSERT_EQ(0, image2.quiesce_unwatch(watcher2.handle));
+
+    ASSERT_EQ(0, image1.snap_remove("snap1"));
+    ASSERT_EQ(0, image1.snap_remove("snap2"));
+    ASSERT_EQ(0, image1.snap_remove("snap3"));
+  }
+
+  ASSERT_EQ(0, rbd.remove(ioctx, name.c_str()));
+  ioctx.close();
+}
+
+TEST_F(TestLibRBD, QuiesceWatchError)
+{
+  librbd::RBD rbd;
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+  std::string name = get_temp_image_name();
+  int order = 0;
+  uint64_t size = 2 << 20;
+  ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+
+  {
+    librbd::Image image1, image2;
+    ASSERT_EQ(0, rbd.open(ioctx, image1, name.c_str(), NULL));
+    ASSERT_EQ(0, rbd.open(ioctx, image2, name.c_str(), NULL));
+
+    struct Watcher : public librbd::QuiesceWatchCtx {
+      librbd::Image &image;
+      int r;
+      uint64_t handle;
+      size_t quiesce_count = 0;
+      size_t unquiesce_count = 0;
+
+      ceph::mutex lock = ceph::make_mutex("lock");
+      ceph::condition_variable cv;
+
+      Watcher(librbd::Image &image, int r) : image(image), r(r) {
+      }
+
+      void reset_counters() {
+        quiesce_count = 0;
+        unquiesce_count = 0;
+      }
 
-    for (uint64_t i = 0; i < size / TEST_IO_SIZE; ++i) {
-      char *p = test_data + (TEST_IO_SIZE + 1) * (i % 10);
-      ASSERT_PASSED(read_test_data, image, p, strlen(p) * i, TEST_IO_SIZE, 0);
-    }
+      void handle_quiesce() override {
+        quiesce_count++;
+        image.quiesce_complete(handle, r);
+      }
 
-    ASSERT_PASSED(validate_object_map, image);
-  }
+      void handle_unquiesce() override {
+        std::unique_lock locker(lock);
+        unquiesce_count++;
+        cv.notify_one();
+      }
 
+      bool wait_for_unquiesce() {
+        std::unique_lock locker(lock);
+        return cv.wait_for(locker, seconds(60),
+                           [this]() {
+                             return quiesce_count == unquiesce_count;
+                           });
+      }
+    } watcher10(image1, -EINVAL), watcher11(image1, 0), watcher20(image2, 0);
+
+    ASSERT_EQ(0, image1.quiesce_watch(&watcher10, &watcher10.handle));
+    ASSERT_EQ(0, image1.quiesce_watch(&watcher11, &watcher11.handle));
+    ASSERT_EQ(0, image2.quiesce_watch(&watcher20, &watcher20.handle));
+
+    ASSERT_EQ(-EINVAL, image1.snap_create("snap1"));
+    ASSERT_GT(watcher10.quiesce_count, 0U);
+    ASSERT_EQ(watcher10.unquiesce_count, 0U);
+    ASSERT_GT(watcher11.quiesce_count, 0U);
+    ASSERT_TRUE(watcher11.wait_for_unquiesce());
+    ASSERT_GT(watcher20.quiesce_count, 0U);
+    ASSERT_TRUE(watcher20.wait_for_unquiesce());
+
+    PrintProgress prog_ctx;
+    watcher10.reset_counters();
+    watcher11.reset_counters();
+    watcher20.reset_counters();
+    ASSERT_EQ(0, image2.snap_create2("snap2",
+                                     RBD_SNAP_CREATE_IGNORE_QUIESCE_ERROR,
+                                     prog_ctx));
+    ASSERT_GT(watcher10.quiesce_count, 0U);
+    ASSERT_EQ(watcher10.unquiesce_count, 0U);
+    ASSERT_GT(watcher11.quiesce_count, 0U);
+    ASSERT_TRUE(watcher11.wait_for_unquiesce());
+    ASSERT_GT(watcher20.quiesce_count, 0U);
+    ASSERT_TRUE(watcher20.wait_for_unquiesce());
+
+    ASSERT_EQ(0, image1.quiesce_unwatch(watcher10.handle));
+
+    watcher11.reset_counters();
+    watcher20.reset_counters();
+    ASSERT_EQ(0, image1.snap_create("snap3"));
+    ASSERT_GT(watcher11.quiesce_count, 0U);
+    ASSERT_TRUE(watcher11.wait_for_unquiesce());
+    ASSERT_GT(watcher20.quiesce_count, 0U);
+    ASSERT_TRUE(watcher20.wait_for_unquiesce());
+
+    ASSERT_EQ(0, image1.quiesce_unwatch(watcher11.handle));
+
+    watcher20.reset_counters();
+    ASSERT_EQ(0, image2.snap_create2("snap4", RBD_SNAP_CREATE_SKIP_QUIESCE,
+                                     prog_ctx));
+    ASSERT_EQ(watcher20.quiesce_count, 0U);
+    ASSERT_EQ(watcher20.unquiesce_count, 0U);
+
+    ASSERT_EQ(0, image2.quiesce_unwatch(watcher20.handle));
+
+    ASSERT_EQ(0, image1.snap_remove("snap2"));
+    ASSERT_EQ(0, image1.snap_remove("snap3"));
+    ASSERT_EQ(0, image1.snap_remove("snap4"));
+  }
+
+  ASSERT_EQ(0, rbd.remove(ioctx, name.c_str()));
   ioctx.close();
 }
 
-TEST_F(TestLibRBD, SnapRemoveWithChildMissing)
+TEST_F(TestLibRBD, QuiesceWatchTimeout)
 {
-  REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
-  ASSERT_EQ(0, rados_conf_set(_cluster, "rbd_default_clone_format", "2"));
-  BOOST_SCOPE_EXIT_ALL(&) {
-    ASSERT_EQ(0, rados_conf_set(_cluster, "rbd_default_clone_format", "auto"));
-  };
+  REQUIRE(!is_librados_test_stub(_rados));
 
-  librbd::RBD rbd;
-  rados_ioctx_t ioctx1, ioctx2;
-  string pool_name1 = create_pool(true);
-  rados_ioctx_create(_cluster, pool_name1.c_str(), &ioctx1);
-  ASSERT_EQ(0, rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx2));
+  ASSERT_EQ(0, _rados.conf_set("rbd_quiesce_notification_attempts", "2"));
 
-  bool old_format;
-  uint64_t features;
-  rbd_image_t parent, child1, child2, child3;
+  librbd::RBD rbd;
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+  std::string name = get_temp_image_name();
   int order = 0;
-  char child_id1[4096];
-  char child_id2[4096];
-  char child_id3[4096];
+  uint64_t size = 2 << 20;
+  ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
 
-  ASSERT_EQ(0, get_features(&old_format, &features));
-  ASSERT_FALSE(old_format);
-  std::string parent_name = get_temp_image_name();
-  std::string child_name1 = get_temp_image_name();
-  std::string child_name2 = get_temp_image_name();
-  std::string child_name3 = get_temp_image_name();
-  ASSERT_EQ(0, create_image_full(ioctx1, parent_name.c_str(), 4<<20, &order,
-            false, features));
-  ASSERT_EQ(0, rbd_open(ioctx1, parent_name.c_str(), &parent, NULL));
-  ASSERT_EQ(0, rbd_snap_create(parent, "snap1"));
-  ASSERT_EQ(0, rbd_snap_create(parent, "snap2"));
+  {
+    librbd::Image image;
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
 
-  ASSERT_EQ(0, clone_image(ioctx1, parent, parent_name.c_str(), "snap1",
-                           ioctx2, child_name1.c_str(), features, &order));
-  ASSERT_EQ(0, clone_image(ioctx1, parent, parent_name.c_str(), "snap2",
-                           ioctx1, child_name2.c_str(), features, &order));
-  ASSERT_EQ(0, clone_image(ioctx1, parent, parent_name.c_str(), "snap2",
-                           ioctx2, child_name3.c_str(), features, &order));
+    struct Watcher : public librbd::QuiesceWatchCtx {
+      librbd::Image &image;
+      std::mutex m_lock;
+      std::condition_variable m_cond;
+      size_t quiesce_count = 0;
+      size_t unquiesce_count = 0;
 
-  ASSERT_EQ(0, rbd_open(ioctx2, child_name1.c_str(), &child1, NULL));
-  ASSERT_EQ(0, rbd_open(ioctx1, child_name2.c_str(), &child2, NULL));
-  ASSERT_EQ(0, rbd_open(ioctx2, child_name3.c_str(), &child3, NULL));
-  ASSERT_EQ(0, rbd_get_id(child1, child_id1, sizeof(child_id1)));
-  ASSERT_EQ(0, rbd_get_id(child2, child_id2, sizeof(child_id2)));
-  ASSERT_EQ(0, rbd_get_id(child3, child_id3, sizeof(child_id3)));
-  test_list_children2(parent, 3,
-                      child_id1, m_pool_name.c_str(), child_name1.c_str(), false,
-                      child_id2, pool_name1.c_str(), child_name2.c_str(), false,
-                      child_id3, m_pool_name.c_str(), child_name3.c_str(), false);
+      Watcher(librbd::Image &image) : image(image) {
+      }
 
-  size_t max_size = 10;
-  rbd_linked_image_spec_t children[max_size];
-  ASSERT_EQ(0, rbd_list_children3(parent, children, &max_size));
-  ASSERT_EQ(3, static_cast<int>(max_size));
-  rbd_linked_image_spec_list_cleanup(children, max_size);
+      void handle_quiesce() override {
+        std::lock_guard<std::mutex> locker(m_lock);
+        quiesce_count++;
+        m_cond.notify_one();
+      }
 
-  ASSERT_EQ(0, rbd_close(child1));
-  ASSERT_EQ(0, rbd_close(child2));
-  ASSERT_EQ(0, rbd_close(child3));
-  rados_ioctx_destroy(ioctx2);
-  ASSERT_EQ(0, rados_pool_delete(_cluster, m_pool_name.c_str()));
-  _pool_names.erase(std::remove(_pool_names.begin(),
-                                _pool_names.end(), m_pool_name),
-                    _pool_names.end());
-  EXPECT_EQ(0, rados_wait_for_latest_osdmap(_cluster));
+      void handle_unquiesce() override {
+        std::lock_guard<std::mutex> locker(m_lock);
+        unquiesce_count++;
+        m_cond.notify_one();
+      }
 
-  ASSERT_EQ(0, rbd_list_children3(parent, children, &max_size));
-  ASSERT_EQ(3, static_cast<int>(max_size));
-  rbd_linked_image_spec_list_cleanup(children, max_size);
-  ASSERT_EQ(0, rbd_snap_remove(parent, "snap1"));
-  ASSERT_EQ(0, rbd_list_children3(parent, children, &max_size));
-  ASSERT_EQ(2, static_cast<int>(max_size));
-  rbd_linked_image_spec_list_cleanup(children, max_size);
+      void wait_for_quiesce() {
+        std::unique_lock<std::mutex> locker(m_lock);
+        ASSERT_TRUE(m_cond.wait_for(locker, seconds(60),
+                                    [this] {
+                                      return quiesce_count >= 1;
+                                    }));
+      }
 
-  ASSERT_EQ(0, rbd_remove(ioctx1, child_name2.c_str()));
-  ASSERT_EQ(0, rbd_list_children3(parent, children, &max_size));
-  ASSERT_EQ(1, static_cast<int>(max_size));
-  rbd_linked_image_spec_list_cleanup(children, max_size);
+      void wait_for_unquiesce() {
+        std::unique_lock<std::mutex> locker(m_lock);
+        ASSERT_TRUE(m_cond.wait_for(locker, seconds(60),
+                                    [this] {
+                                      return quiesce_count == unquiesce_count;
+                                    }));
+        quiesce_count = unquiesce_count = 0;
+      }
+    } watcher(image);
+    uint64_t handle;
 
-  ASSERT_EQ(0, rbd_snap_remove(parent, "snap2"));
-  ASSERT_EQ(0, rbd_list_children3(parent, children, &max_size));
-  ASSERT_EQ(0, static_cast<int>(max_size));
-  rbd_linked_image_spec_list_cleanup(children, max_size);
-  test_list_children2(parent, 0);
-  ASSERT_EQ(0, test_ls_snaps(parent, 0));
+    ASSERT_EQ(0, image.quiesce_watch(&watcher, &handle));
 
-  ASSERT_EQ(0, rbd_close(parent));
-  rados_ioctx_destroy(ioctx1);
+    std::cerr << "test quiesce is not long enough to time out" << std::endl;
+
+    thread quiesce1([&image, &watcher, handle]() {
+      watcher.wait_for_quiesce();
+      sleep(8);
+      image.quiesce_complete(handle, 0);
+    });
+
+    ASSERT_EQ(0, image.snap_create("snap1"));
+    quiesce1.join();
+    ASSERT_GE(watcher.quiesce_count, 1U);
+    watcher.wait_for_unquiesce();
+
+    std::cerr << "test quiesce is timed out" << std::endl;
+
+    bool timed_out = false;
+    thread quiesce2([&image, &watcher, handle, &timed_out]() {
+      watcher.wait_for_quiesce();
+      for (int i = 0; !timed_out && i < 60; i++) {
+        std::cerr << "waiting for timed out ... " << i << std::endl;
+        sleep(1);
+      }
+      image.quiesce_complete(handle, 0);
+    });
+
+    ASSERT_EQ(-ETIMEDOUT, image.snap_create("snap2"));
+    timed_out = true;
+    quiesce2.join();
+    ASSERT_GE(watcher.quiesce_count, 1U);
+    watcher.wait_for_unquiesce();
+
+    thread quiesce3([&image, handle, &watcher]() {
+      watcher.wait_for_quiesce();
+      image.quiesce_complete(handle, 0);
+    });
+
+    std::cerr << "test retry succeeds" << std::endl;
+
+    ASSERT_EQ(0, image.snap_create("snap2"));
+    quiesce3.join();
+    ASSERT_GE(watcher.quiesce_count, 1U);
+    watcher.wait_for_unquiesce();
+
+    ASSERT_EQ(0, image.snap_remove("snap1"));
+    ASSERT_EQ(0, image.snap_remove("snap2"));
+  }
+
+  ASSERT_EQ(0, rbd.remove(ioctx, name.c_str()));
+  ioctx.close();
 }
 
 TEST_F(TestLibRBD, WriteZeroes) {
@@ -8242,6 +8845,241 @@ TEST_F(TestLibRBD, WriteZeroes) {
   ASSERT_EQ(0, image.close());
 }
 
+TEST_F(TestLibRBD, WriteZeroesThickProvision) {
+  librbd::RBD rbd;
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+  std::string name = get_temp_image_name();
+  int order = 0;
+  uint64_t size = 2 << 20;
+  ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+
+  librbd::Image image;
+  ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+  interval_set<uint64_t> diff;
+  ASSERT_EQ(0, image.diff_iterate2(nullptr, 0, size, false, false,
+                                   iterate_cb, (void *)&diff));
+  auto expected_diff = interval_set<uint64_t>{{}};
+  ASSERT_EQ(expected_diff, diff);
+
+  // writes unaligned zeroes as a prepend
+  ASSERT_EQ(128, image.write_zeroes(
+              0, 128, RBD_WRITE_ZEROES_FLAG_THICK_PROVISION, 0));
+  diff.clear();
+  ASSERT_EQ(0, image.diff_iterate2(nullptr, 0, size, false, false,
+                                   iterate_cb, (void *)&diff));
+  expected_diff = interval_set<uint64_t>{{{0, 128}}};
+  ASSERT_EQ(expected_diff, diff);
+
+  ASSERT_EQ(512, image.write_zeroes(
+              384, 512, RBD_WRITE_ZEROES_FLAG_THICK_PROVISION, 0));
+  diff.clear();
+  ASSERT_EQ(0, image.diff_iterate2(nullptr, 0, size, false, false,
+                                   iterate_cb, (void *)&diff));
+  expected_diff = interval_set<uint64_t>{{{0, 896}}};
+  ASSERT_EQ(expected_diff, diff);
+
+  // prepend with write-same
+  ASSERT_EQ(640, image.write_zeroes(
+              896, 640, RBD_WRITE_ZEROES_FLAG_THICK_PROVISION, 0));
+  diff.clear();
+  ASSERT_EQ(0, image.diff_iterate2(nullptr, 0, size, false, false,
+                                   iterate_cb, (void *)&diff));
+  expected_diff = interval_set<uint64_t>{{{0, 1536}}};
+  ASSERT_EQ(expected_diff, diff);
+
+  // write-same with append
+  ASSERT_EQ(640, image.write_zeroes(
+              1536, 640, RBD_WRITE_ZEROES_FLAG_THICK_PROVISION, 0));
+  diff.clear();
+  ASSERT_EQ(0, image.diff_iterate2(nullptr, 0, size, false, false,
+                                   iterate_cb, (void *)&diff));
+  expected_diff = interval_set<uint64_t>{{{0, 2176}}};
+  ASSERT_EQ(expected_diff, diff);
+
+  // prepend + write-same + append
+  ASSERT_EQ(768, image.write_zeroes(
+              2176, 768, RBD_WRITE_ZEROES_FLAG_THICK_PROVISION, 0));
+  diff.clear();
+  ASSERT_EQ(0, image.diff_iterate2(nullptr, 0, size, false, false,
+                                   iterate_cb, (void *)&diff));
+  expected_diff = interval_set<uint64_t>{{{0, 2944}}};
+
+  // write-same
+  ASSERT_EQ(1024, image.write_zeroes(
+              3072, 1024, RBD_WRITE_ZEROES_FLAG_THICK_PROVISION, 0));
+  diff.clear();
+  ASSERT_EQ(0, image.diff_iterate2(nullptr, 0, size, false, false,
+                                   iterate_cb, (void *)&diff));
+  expected_diff = interval_set<uint64_t>{{{0, 4096}}};
+
+  bufferlist expected_bl;
+  expected_bl.append_zero(size);
+
+  bufferlist read_bl;
+  EXPECT_EQ(size, image.read(0, size, read_bl));
+  EXPECT_EQ(expected_bl, read_bl);
+
+  ASSERT_EQ(0, image.close());
+}
+
+TEST_F(TestLibRBD, ConcurentOperations)
+{
+  REQUIRE_FEATURE(RBD_FEATURE_EXCLUSIVE_LOCK);
+
+  librbd::RBD rbd;
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+  std::string name = get_temp_image_name();
+  int order = 0;
+  uint64_t size = 2 << 20;
+  ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+
+  // Test creating/removing many snapshots simultaneously
+
+  std::vector<librbd::Image> images(10);
+  std::vector<librbd::RBD::AioCompletion *> comps;
+
+  for (auto &image : images) {
+    auto comp = new librbd::RBD::AioCompletion(NULL, NULL);
+    ASSERT_EQ(0, rbd.aio_open(ioctx, image, name.c_str(), NULL, comp));
+    comps.push_back(comp);
+  }
+
+  for (auto &comp : comps) {
+    ASSERT_EQ(0, comp->wait_for_complete());
+    ASSERT_EQ(1, comp->is_complete());
+    ASSERT_EQ(0, comp->get_return_value());
+    comp->release();
+  }
+  comps.clear();
+
+  std::vector<std::thread> threads;
+  int i = 0;
+  for (auto &image : images) {
+    std::string snap_name = "snap" + stringify(i++);
+    threads.emplace_back([&image, snap_name]() {
+      int r = image.snap_create(snap_name.c_str());
+      ceph_assert(r == 0);
+    });
+  }
+
+  for (auto &t : threads) {
+    t.join();
+  }
+  threads.clear();
+
+  i = 0;
+  for (auto &image : images) {
+    std::string snap_name = "snap" + stringify(i++);
+    threads.emplace_back([&image, snap_name](){
+      int r = image.snap_remove(snap_name.c_str());
+      ceph_assert(r == 0);
+    });
+  }
+
+  for (auto &t : threads) {
+    t.join();
+  }
+  threads.clear();
+
+  for (auto &image : images) {
+    auto comp = new librbd::RBD::AioCompletion(NULL, NULL);
+    ASSERT_EQ(0, image.aio_close(comp));
+    comps.push_back(comp);
+  }
+
+  for (auto &comp : comps) {
+    ASSERT_EQ(0, comp->wait_for_complete());
+    ASSERT_EQ(1, comp->is_complete());
+    ASSERT_EQ(0, comp->get_return_value());
+    comp->release();
+  }
+  comps.clear();
+
+  // Test shutdown
+  {
+    librbd::Image image1, image2, image3;
+    ASSERT_EQ(0, rbd.open(ioctx, image1, name.c_str(), NULL));
+    ASSERT_EQ(0, rbd.open(ioctx, image2, name.c_str(), NULL));
+    ASSERT_EQ(0, rbd.open(ioctx, image3, name.c_str(), NULL));
+
+    ASSERT_EQ(0, image1.lock_acquire(RBD_LOCK_MODE_EXCLUSIVE));
+
+    struct Watcher : public librbd::QuiesceWatchCtx {
+      size_t count = 0;
+
+      ceph::mutex lock = ceph::make_mutex("lock");
+      ceph::condition_variable cv;
+
+      void handle_quiesce() override {
+        std::unique_lock locker(lock);
+        count++;
+        cv.notify_one();
+      }
+
+      void handle_unquiesce() override {
+      }
+
+      bool wait_for_quiesce(size_t c) {
+        std::unique_lock locker(lock);
+        return cv.wait_for(locker, seconds(60),
+                           [this, c]() { return count >= c; });
+      }
+    } watcher;
+    uint64_t handle;
+    ASSERT_EQ(0, image2.quiesce_watch(&watcher, &handle));
+
+    auto close1_comp = new librbd::RBD::AioCompletion(NULL, NULL);
+
+    std::thread create_snap1([&image1, close1_comp]() {
+      int r = image1.snap_create("snap1");
+      ceph_assert(r == 0);
+      r = image1.aio_close(close1_comp);
+      ceph_assert(r == 0);
+    });
+
+    ASSERT_TRUE(watcher.wait_for_quiesce(1));
+
+    std::thread create_snap2([&image2]() {
+      int r = image2.snap_create("snap2");
+      ceph_assert(r == 0);
+    });
+
+    std::thread create_snap3([&image3]() {
+      int r = image3.snap_create("snap3");
+      ceph_assert(r == 0);
+    });
+
+    image2.quiesce_complete(handle, 0);
+    create_snap1.join();
+
+    ASSERT_TRUE(watcher.wait_for_quiesce(2));
+    image2.quiesce_complete(handle, 0);
+
+    ASSERT_TRUE(watcher.wait_for_quiesce(3));
+    image2.quiesce_complete(handle, 0);
+
+    ASSERT_EQ(0, close1_comp->wait_for_complete());
+    ASSERT_EQ(1, close1_comp->is_complete());
+    ASSERT_EQ(0, close1_comp->get_return_value());
+    close1_comp->release();
+
+    create_snap2.join();
+    create_snap3.join();
+
+    ASSERT_EQ(0, image2.quiesce_unwatch(handle));
+    ASSERT_EQ(0, image2.snap_remove("snap1"));
+    ASSERT_EQ(0, image2.snap_remove("snap2"));
+    ASSERT_EQ(0, image2.snap_remove("snap3"));
+  }
+
+  ASSERT_EQ(0, rbd.remove(ioctx, name.c_str()));
+  ioctx.close();
+}
+
+
 // poorman's ceph_assert()
 namespace ceph {
   void __ceph_assert_fail(const char *assertion, const char *file, int line,