]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/librbd/test_librbd.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / test / librbd / test_librbd.cc
index 61b0d7ebd8e82e89c2997a281c005e7b71b01d81..29d4a06ffd16a77f5a02aaf81a1c9417a16761a6 100644 (file)
@@ -712,13 +712,16 @@ public:
 
     ASSERT_PASSED(write_test_data, image, zero_data, 0, TEST_IO_SIZE,
                   LIBRADOS_OP_FLAG_FADVISE_NOCACHE);
+    mismatch_offset = 123;
     ASSERT_EQ(-EILSEQ, rbd_compare_and_write(image, 0, TEST_IO_SIZE,
               mismatch_data, mismatch_data, &mismatch_offset, 0));
     ASSERT_EQ(0U, mismatch_offset);
     rbd_aio_create_completion(NULL, (rbd_callback_t) simple_read_cb, &comp);
+    mismatch_offset = 123;
     ASSERT_EQ(0, rbd_aio_compare_and_write(image, 0, TEST_IO_SIZE, mismatch_data,
               mismatch_data, comp, &mismatch_offset, 0));
     ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
+    ASSERT_EQ(-EILSEQ, rbd_aio_get_return_value(comp));
     ASSERT_EQ(0U, mismatch_offset);
     rbd_aio_release(comp);
 
@@ -2458,13 +2461,16 @@ TEST_F(TestLibRBD, TestIOWithIOHint)
   rbd_aio_release(comp);
 
   ASSERT_PASSED(write_test_data, image, zero_data, 0, TEST_IO_SIZE, LIBRADOS_OP_FLAG_FADVISE_NOCACHE);
+  mismatch_offset = 123;
   ASSERT_EQ(-EILSEQ, rbd_compare_and_write(image, 0, TEST_IO_SIZE, mismatch_data, mismatch_data,
                                            &mismatch_offset, LIBRADOS_OP_FLAG_FADVISE_DONTNEED));
   ASSERT_EQ(0U, mismatch_offset);
   rbd_aio_create_completion(NULL, (rbd_callback_t) simple_read_cb, &comp);
+  mismatch_offset = 123;
   ASSERT_EQ(0, rbd_aio_compare_and_write(image, 0, TEST_IO_SIZE, mismatch_data, mismatch_data,
                                          comp, &mismatch_offset, LIBRADOS_OP_FLAG_FADVISE_DONTNEED));
   ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
+  ASSERT_EQ(-EILSEQ, rbd_aio_get_return_value(comp));
   ASSERT_EQ(0U, mismatch_offset);
   rbd_aio_release(comp);
 
@@ -2574,7 +2580,7 @@ TEST_F(TestLibRBD, TestDataPoolIO)
   rados_ioctx_destroy(ioctx);
 }
 
-TEST_F(TestLibRBD, TestScatterGatherIO)
+TEST_F(TestLibRBD, TestCompareAndWriteMismatch)
 {
   rados_ioctx_t ioctx;
   rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
@@ -2582,65 +2588,83 @@ TEST_F(TestLibRBD, TestScatterGatherIO)
   rbd_image_t image;
   int order = 0;
   std::string name = get_temp_image_name();
-  uint64_t size = 20 << 20;
+  uint64_t size = 20 << 20; /* 20MiB */
+  off_t off = 512;
 
   ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
   ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
 
-  std::string write_buffer("This is a test");
-  // These iovecs should produce a length overflow
-  struct iovec bad_iovs[] = {
-    {.iov_base = &write_buffer[0], .iov_len = 5},
-    {.iov_base = NULL, .iov_len = std::numeric_limits<size_t>::max()}
-  };
-  struct iovec write_iovs[] = {
-    {.iov_base = &write_buffer[0],  .iov_len = 5},
-    {.iov_base = &write_buffer[5],  .iov_len = 3},
-    {.iov_base = &write_buffer[8],  .iov_len = 2},
-    {.iov_base = &write_buffer[10], .iov_len = 4}
-  };
+  // We only support to compare and write the same amount of (len) bytes
+  std::string cmp_buffer("This is a test");
+  std::string write_buffer("Write this !!!");
+  std::string mismatch_buffer("This will fail");
+  std::string read_buffer(cmp_buffer.length(), '1');
+
+  ssize_t written = rbd_write(image, off, cmp_buffer.length(),
+                              cmp_buffer.data());
+  ASSERT_EQ(cmp_buffer.length(), written);
+
+  // Compare should fail because of mismatch
+  uint64_t mismatch_off = 0;
+  written = rbd_compare_and_write(image, off, write_buffer.length(),
+                                  mismatch_buffer.data(), write_buffer.data(),
+                                  &mismatch_off, 0);
+  ASSERT_EQ(-EILSEQ, written);
+  ASSERT_EQ(5U, mismatch_off);
+
+  // check nothing was written
+  ssize_t read = rbd_read(image, off, read_buffer.length(), read_buffer.data());
+  ASSERT_EQ(read_buffer.length(), read);
+  ASSERT_EQ(cmp_buffer, read_buffer);
 
-  rbd_completion_t comp;
-  rbd_aio_create_completion(NULL, NULL, &comp);
-  ASSERT_EQ(-EINVAL, rbd_aio_writev(image, write_iovs, 0, 0, comp));
-  ASSERT_EQ(-EINVAL, rbd_aio_writev(image, bad_iovs, 2, 0, comp));
-  ASSERT_EQ(0, rbd_aio_writev(image, write_iovs,
-                              sizeof(write_iovs) / sizeof(struct iovec),
-                              1<<order, comp));
-  ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
-  ASSERT_EQ(0, rbd_aio_get_return_value(comp));
-  rbd_aio_release(comp);
+  ASSERT_PASSED(validate_object_map, image);
+  ASSERT_EQ(0, rbd_close(image));
 
-  std::string read_buffer(write_buffer.size(), '1');
-  struct iovec read_iovs[] = {
-    {.iov_base = &read_buffer[0],  .iov_len = 4},
-    {.iov_base = &read_buffer[8],  .iov_len = 4},
-    {.iov_base = &read_buffer[12], .iov_len = 2}
-  };
+  rados_ioctx_destroy(ioctx);
+}
 
-  rbd_aio_create_completion(NULL, NULL, &comp);
-  ASSERT_EQ(-EINVAL, rbd_aio_readv(image, read_iovs, 0, 0, comp));
-  ASSERT_EQ(-EINVAL, rbd_aio_readv(image, bad_iovs, 2, 0, comp));
-  ASSERT_EQ(0, rbd_aio_readv(image, read_iovs,
-                             sizeof(read_iovs) / sizeof(struct iovec),
-                             1<<order, comp));
-  ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
-  ASSERT_EQ(10, rbd_aio_get_return_value(comp));
-  rbd_aio_release(comp);
-  ASSERT_EQ("This1111 is a ", read_buffer);
+TEST_F(TestLibRBD, TestAioCompareAndWriteMismatch)
+{
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
 
-  std::string linear_buffer(write_buffer.size(), '1');
-  struct iovec linear_iovs[] = {
-    {.iov_base = &linear_buffer[4], .iov_len = 4}
-  };
+  rbd_image_t image;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 20 << 20; /* 20MiB */
+  off_t off = 512;
+
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+
+  // We only support to compare and write the same amount of (len) bytes
+  std::string cmp_buffer("This is a test");
+  std::string write_buffer("Write this !!!");
+  std::string mismatch_buffer("This will fail");
+  std::string read_buffer(cmp_buffer.length(), '1');
+
+  ssize_t written = rbd_write(image, off, cmp_buffer.length(),
+                              cmp_buffer.data());
+  ASSERT_EQ(cmp_buffer.length(), written);
+
+  // Compare should fail because of mismatch
+  rbd_completion_t comp;
   rbd_aio_create_completion(NULL, NULL, &comp);
-  ASSERT_EQ(0, rbd_aio_readv(image, linear_iovs,
-                             sizeof(linear_iovs) / sizeof(struct iovec),
-                             1<<order, comp));
+  uint64_t mismatch_off = 0;
+  int ret = rbd_aio_compare_and_write(image, off, write_buffer.length(),
+                                      mismatch_buffer.data(),
+                                      write_buffer.data(), comp,
+                                      &mismatch_off, 0);
+  ASSERT_EQ(0, ret);
   ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
-  ASSERT_EQ(4, rbd_aio_get_return_value(comp));
+  ASSERT_EQ(-EILSEQ, rbd_aio_get_return_value(comp));
+  ASSERT_EQ(5U, mismatch_off);
   rbd_aio_release(comp);
-  ASSERT_EQ("1111This111111", linear_buffer);
+
+  // check nothing was written
+  ssize_t read = rbd_read(image, off, read_buffer.length(), read_buffer.data());
+  ASSERT_EQ(read_buffer.length(), read);
+  ASSERT_EQ(cmp_buffer, read_buffer);
 
   ASSERT_PASSED(validate_object_map, image);
   ASSERT_EQ(0, rbd_close(image));
@@ -2648,7 +2672,7 @@ TEST_F(TestLibRBD, TestScatterGatherIO)
   rados_ioctx_destroy(ioctx);
 }
 
-TEST_F(TestLibRBD, TestEmptyDiscard)
+TEST_F(TestLibRBD, TestCompareAndWriteSuccess)
 {
   rados_ioctx_t ioctx;
   rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
@@ -2656,14 +2680,35 @@ TEST_F(TestLibRBD, TestEmptyDiscard)
   rbd_image_t image;
   int order = 0;
   std::string name = get_temp_image_name();
-  uint64_t size = 20 << 20;
+  uint64_t size = 20 << 20; /* 20MiB */
+  off_t off = 512;
 
   ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
   ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
 
-  ASSERT_PASSED(aio_discard_test_data, image, 0, 1*1024*1024);
-  ASSERT_PASSED(aio_discard_test_data, image, 0, 4*1024*1024);
-  ASSERT_PASSED(aio_discard_test_data, image, 3*1024*1024, 1*1024*1024);
+  // We only support to compare and write the same amount of (len) bytes
+  std::string cmp_buffer("This is a test");
+  std::string write_buffer("Write this !!!");
+  std::string read_buffer(cmp_buffer.length(), '1');
+
+  ssize_t written = rbd_write(image, off, cmp_buffer.length(),
+                              cmp_buffer.data());
+  ASSERT_EQ(cmp_buffer.length(), written);
+
+  /*
+   * we compare against the written buffer (cmp_buffer) and write the buffer
+   * We expect: len bytes written
+   */
+  uint64_t mismatch_off = 0;
+  written = rbd_compare_and_write(image, off, write_buffer.length(),
+                                  cmp_buffer.data(), write_buffer.data(),
+                                  &mismatch_off, 0);
+  ASSERT_EQ(write_buffer.length(), written);
+  ASSERT_EQ(0U, mismatch_off);
+
+  ssize_t read = rbd_read(image, off, read_buffer.length(), read_buffer.data());
+  ASSERT_EQ(read_buffer.length(), read);
+  ASSERT_EQ(write_buffer, read_buffer);
 
   ASSERT_PASSED(validate_object_map, image);
   ASSERT_EQ(0, rbd_close(image));
@@ -2671,113 +2716,761 @@ TEST_F(TestLibRBD, TestEmptyDiscard)
   rados_ioctx_destroy(ioctx);
 }
 
-TEST_F(TestLibRBD, TestFUA)
+TEST_F(TestLibRBD, TestAioCompareAndWriteSuccess)
 {
   rados_ioctx_t ioctx;
   rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
 
-  rbd_image_t image_write;
-  rbd_image_t image_read;
+  rbd_image_t image;
   int order = 0;
   std::string name = get_temp_image_name();
-  uint64_t size = 2 << 20;
+  uint64_t size = 20 << 20; /* 20MiB */
+  off_t off = 512;
 
   ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
-  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image_write, NULL));
-  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image_read, NULL));
-
-  // enable writeback cache
-  rbd_flush(image_write);
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
 
-  char test_data[TEST_IO_SIZE + 1];
-  int i;
+  // We only support to compare and write the same amount of (len) bytes
+  std::string cmp_buffer("This is a test");
+  std::string write_buffer("Write this !!!");
+  std::string read_buffer(cmp_buffer.length(), '1');
 
-  for (i = 0; i < TEST_IO_SIZE; ++i) {
-    test_data[i] = (char) (rand() % (126 - 33) + 33);
-  }
-  test_data[TEST_IO_SIZE] = '\0';
-  for (i = 0; i < 5; ++i)
-    ASSERT_PASSED(write_test_data, image_write, test_data,
-                  TEST_IO_SIZE * i, TEST_IO_SIZE, LIBRADOS_OP_FLAG_FADVISE_FUA);
+  ssize_t written = rbd_write(image, off, cmp_buffer.length(),
+                              cmp_buffer.data());
+  ASSERT_EQ(cmp_buffer.length(), written);
 
-  for (i = 0; i < 5; ++i)
-    ASSERT_PASSED(read_test_data, image_read, test_data,
-                  TEST_IO_SIZE * i, TEST_IO_SIZE, 0);
+  /*
+   * we compare against the written buffer (cmp_buffer) and write the buffer
+   * We expect: len bytes written
+   */
+  rbd_completion_t comp;
+  rbd_aio_create_completion(NULL, NULL, &comp);
+  uint64_t mismatch_off = 0;
+  int ret = rbd_aio_compare_and_write(image, off, write_buffer.length(),
+                                      cmp_buffer.data(), write_buffer.data(),
+                                      comp, &mismatch_off, 0);
+  ASSERT_EQ(0, ret);
+  ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
+  ASSERT_EQ(0, rbd_aio_get_return_value(comp));
+  ASSERT_EQ(0U, mismatch_off);
+  rbd_aio_release(comp);
 
-  for (i = 5; i < 10; ++i)
-    ASSERT_PASSED(aio_write_test_data, image_write, test_data,
-                  TEST_IO_SIZE * i, TEST_IO_SIZE, LIBRADOS_OP_FLAG_FADVISE_FUA);
+  ssize_t read = rbd_read(image, off, read_buffer.length(), read_buffer.data());
+  ASSERT_EQ(read_buffer.length(), read);
+  ASSERT_EQ(write_buffer, read_buffer);
 
-  for (i = 5; i < 10; ++i)
-    ASSERT_PASSED(aio_read_test_data, image_read, test_data,
-                  TEST_IO_SIZE * i, TEST_IO_SIZE, 0);
+  ASSERT_PASSED(validate_object_map, image);
+  ASSERT_EQ(0, rbd_close(image));
 
-  ASSERT_PASSED(validate_object_map, image_write);
-  ASSERT_PASSED(validate_object_map, image_read);
-  ASSERT_EQ(0, rbd_close(image_write));
-  ASSERT_EQ(0, rbd_close(image_read));
-  ASSERT_EQ(0, rbd_remove(ioctx, name.c_str()));
   rados_ioctx_destroy(ioctx);
 }
 
-void simple_write_cb_pp(librbd::completion_t cb, void *arg)
+
+TEST_F(TestLibRBD, TestCompareAndWriteStripeUnitUnaligned)
 {
-  cout << "write completion cb called!" << std::endl;
+  REQUIRE(!is_rbd_pwl_enabled((CephContext *)_rados.cct()));
+
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+  rbd_image_t image;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 20 << 20; /* 20MiB */
+
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+
+  // large write test => we allow stripe unit size writes (aligned)
+  uint64_t stripe_unit;
+  rbd_get_stripe_unit(image, &stripe_unit);
+  std::string large_write_buffer(stripe_unit, '2');
+  std::string large_cmp_buffer(stripe_unit * 2, '4');
+
+  ssize_t written = rbd_write(image, stripe_unit, large_cmp_buffer.length(),
+                              large_cmp_buffer.data());
+  ASSERT_EQ(large_cmp_buffer.length(), written);
+
+  /*
+    * compare and write at offset stripe_unit + 1 and stripe unit size
+    * Expect fail because access exceeds stripe (unaligned)
+    */
+  uint64_t mismatch_off = 0;
+  written = rbd_compare_and_write(image, stripe_unit + 1, stripe_unit,
+                                  large_cmp_buffer.data(),
+                                  large_write_buffer.data(),
+                                  &mismatch_off, 0);
+  ASSERT_EQ(-EINVAL, written);
+  ASSERT_EQ(0U, mismatch_off);
+
+  // check nothing has been written
+  std::string large_read_buffer(large_cmp_buffer.length(), '5');
+  ssize_t read = rbd_read(image, stripe_unit, large_read_buffer.length(),
+                          large_read_buffer.data());
+  ASSERT_EQ(large_read_buffer.length(), read);
+  auto buffer_mismatch = std::mismatch(large_cmp_buffer.begin(),
+                                       large_cmp_buffer.end(),
+                                       large_read_buffer.begin());
+  ASSERT_EQ(large_read_buffer.end(), buffer_mismatch.second);
+
+  ASSERT_PASSED(validate_object_map, image);
+  ASSERT_EQ(0, rbd_close(image));
+
+  rados_ioctx_destroy(ioctx);
 }
 
-void simple_read_cb_pp(librbd::completion_t cb, void *arg)
+TEST_F(TestLibRBD, TestAioCompareAndWriteStripeUnitUnaligned)
 {
-  cout << "read completion cb called!" << std::endl;
+  REQUIRE(!is_rbd_pwl_enabled((CephContext *)_rados.cct()));
+
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+  rbd_image_t image;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 20 << 20; /* 20MiB */
+
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+
+  // large write test => we allow stripe unit size writes (aligned)
+  uint64_t stripe_unit;
+  rbd_get_stripe_unit(image, &stripe_unit);
+  std::string large_write_buffer(stripe_unit, '2');
+  std::string large_cmp_buffer(stripe_unit * 2, '4');
+
+  ssize_t written = rbd_write(image, stripe_unit, large_cmp_buffer.length(),
+                              large_cmp_buffer.data());
+  ASSERT_EQ(large_cmp_buffer.length(), written);
+
+  /*
+    * compare and write at offset stripe_unit + 1 and stripe unit size
+    * Expect fail because access spans stripe unit boundary (unaligned)
+    */
+  rbd_completion_t comp;
+  rbd_aio_create_completion(NULL, NULL, &comp);
+  uint64_t mismatch_off = 0;
+  int ret = rbd_aio_compare_and_write(image, stripe_unit + 1, stripe_unit,
+                                      large_cmp_buffer.data(),
+                                      large_write_buffer.data(),
+                                      comp, &mismatch_off, 0);
+  ASSERT_EQ(0, ret);
+  ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
+  ASSERT_EQ(-EINVAL, rbd_aio_get_return_value(comp));
+  ASSERT_EQ(0U, mismatch_off);
+  rbd_aio_release(comp);
+
+  // check nothing has been written
+  std::string large_read_buffer(large_cmp_buffer.length(), '5');
+  ssize_t read = rbd_read(image, stripe_unit, large_read_buffer.length(),
+                          large_read_buffer.data());
+  ASSERT_EQ(large_read_buffer.length(), read);
+  auto buffer_mismatch = std::mismatch(large_cmp_buffer.begin(),
+                                       large_cmp_buffer.end(),
+                                       large_read_buffer.begin());
+  ASSERT_EQ(large_read_buffer.end(), buffer_mismatch.second);
+
+  ASSERT_PASSED(validate_object_map, image);
+  ASSERT_EQ(0, rbd_close(image));
+
+  rados_ioctx_destroy(ioctx);
 }
 
-void aio_write_test_data(librbd::Image& image, const char *test_data,
-                        off_t off, uint32_t iohint, bool *passed)
+TEST_F(TestLibRBD, TestCompareAndWriteTooLarge)
 {
-  ceph::bufferlist bl;
-  bl.append(test_data, strlen(test_data));
-  librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(NULL, (librbd::callback_t) simple_write_cb_pp);
-  printf("created completion\n");
-  if (iohint)
-    image.aio_write2(off, strlen(test_data), bl, comp, iohint);
-  else
-    image.aio_write(off, strlen(test_data), bl, comp);
-  printf("started write\n");
-  comp->wait_for_complete();
-  int r = comp->get_return_value();
-  printf("return value is: %d\n", r);
-  ASSERT_EQ(0, r);
-  printf("finished write\n");
-  comp->release();
-  *passed = true;
+  REQUIRE(!is_rbd_pwl_enabled((CephContext *)_rados.cct()));
+
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+  rbd_image_t image;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 20 << 20; /* 20MiB */
+
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+
+  // large write test => we allow stripe unit size writes (aligned)
+  uint64_t stripe_unit;
+  rbd_get_stripe_unit(image, &stripe_unit);
+  std::string large_write_buffer(stripe_unit * 2, '2');
+  std::string large_cmp_buffer(large_write_buffer.length(), '4');
+
+  ssize_t written = rbd_write(image, stripe_unit, large_cmp_buffer.length(),
+                              large_cmp_buffer.data());
+  ASSERT_EQ(large_cmp_buffer.length(), written);
+
+  /*
+    * compare and write at offset stripe_unit and stripe unit size + 1
+    * Expect fail because access is larger than stripe unit size
+    */
+  uint64_t mismatch_off = 0;
+  written = rbd_compare_and_write(image, stripe_unit, stripe_unit + 1,
+                                  large_cmp_buffer.data(),
+                                  large_write_buffer.data(),
+                                  &mismatch_off, 0);
+  ASSERT_EQ(-EINVAL, written);
+  ASSERT_EQ(0U, mismatch_off);
+
+  // check nothing has been written
+  std::string large_read_buffer(large_cmp_buffer.length(), '5');
+  ssize_t read = rbd_read(image, stripe_unit, large_read_buffer.length(),
+                          large_read_buffer.data());
+  ASSERT_EQ(large_read_buffer.length(), read);
+  auto buffer_mismatch = std::mismatch(large_cmp_buffer.begin(),
+                                       large_cmp_buffer.end(),
+                                       large_read_buffer.begin());
+  ASSERT_EQ(large_read_buffer.end(), buffer_mismatch.second);
+
+  ASSERT_PASSED(validate_object_map, image);
+  ASSERT_EQ(0, rbd_close(image));
+
+  rados_ioctx_destroy(ioctx);
 }
 
-void aio_discard_test_data(librbd::Image& image, off_t off, size_t len, bool *passed)
+TEST_F(TestLibRBD, TestAioCompareAndWriteTooLarge)
 {
-  librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(NULL, (librbd::callback_t) simple_write_cb_pp);
-  image.aio_discard(off, len, comp);
-  comp->wait_for_complete();
-  int r = comp->get_return_value();
-  ASSERT_EQ(0, r);
-  comp->release();
-  *passed = true;
+  REQUIRE(!is_rbd_pwl_enabled((CephContext *)_rados.cct()));
+
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+  rbd_image_t image;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 20 << 20; /* 20MiB */
+
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+
+  // large write test => we allow stripe unit size writes (aligned)
+  uint64_t stripe_unit;
+  rbd_get_stripe_unit(image, &stripe_unit);
+  std::string large_write_buffer(stripe_unit * 2, '2');
+  std::string large_cmp_buffer(large_write_buffer.length(), '4');
+
+  ssize_t written = rbd_write(image, stripe_unit, large_cmp_buffer.length(),
+                              large_cmp_buffer.data());
+  ASSERT_EQ(large_cmp_buffer.length(), written);
+
+  /*
+    * compare and write at offset stripe_unit and stripe unit size + 1
+    * Expect fail because access is larger than stripe unit size
+    */
+  rbd_completion_t comp;
+  rbd_aio_create_completion(NULL, NULL, &comp);
+  uint64_t mismatch_off = 0;
+  int ret = rbd_aio_compare_and_write(image, stripe_unit, stripe_unit + 1,
+                                      large_cmp_buffer.data(),
+                                      large_write_buffer.data(),
+                                      comp, &mismatch_off, 0);
+  ASSERT_EQ(0, ret);
+  ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
+  ASSERT_EQ(-EINVAL, rbd_aio_get_return_value(comp));
+  ASSERT_EQ(0U, mismatch_off);
+  rbd_aio_release(comp);
+
+  // check nothing has been written
+  std::string large_read_buffer(large_cmp_buffer.length(), '5');
+  ssize_t read = rbd_read(image, stripe_unit, large_read_buffer.length(),
+                          large_read_buffer.data());
+  ASSERT_EQ(large_read_buffer.length(), read);
+  auto buffer_mismatch = std::mismatch(large_cmp_buffer.begin(),
+                                       large_cmp_buffer.end(),
+                                       large_read_buffer.begin());
+  ASSERT_EQ(large_read_buffer.end(), buffer_mismatch.second);
+
+  ASSERT_PASSED(validate_object_map, image);
+  ASSERT_EQ(0, rbd_close(image));
+
+  rados_ioctx_destroy(ioctx);
 }
 
-void write_test_data(librbd::Image& image, const char *test_data, off_t off, uint32_t iohint, bool *passed)
+TEST_F(TestLibRBD, TestCompareAndWriteStripeUnitSuccess)
 {
-  size_t written;
-  size_t len = strlen(test_data);
-  ceph::bufferlist bl;
-  bl.append(test_data, len);
-  if (iohint)
-    written = image.write2(off, len, bl, iohint);
-  else
-    written = image.write(off, len, bl);
-  printf("wrote: %u\n", (unsigned int) written);
-  ASSERT_EQ(bl.length(), written);
-  *passed = true;
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+  rbd_image_t image;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 20 << 20; /* 20MiB */
+
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+
+  // large write test => we allow stripe unit size writes (aligned)
+  uint64_t stripe_unit;
+  rbd_get_stripe_unit(image, &stripe_unit);
+  std::string large_write_buffer(stripe_unit, '2');
+  std::string large_cmp_buffer(stripe_unit * 2, '4');
+
+  ssize_t written = rbd_write(image, stripe_unit, large_cmp_buffer.length(),
+                              large_cmp_buffer.data());
+  ASSERT_EQ(large_cmp_buffer.length(), written);
+
+  // aligned stripe unit size access => expect success
+  uint64_t mismatch_off = 0;
+  written = rbd_compare_and_write(image, stripe_unit, stripe_unit,
+                                  large_cmp_buffer.data(),
+                                  large_write_buffer.data(),
+                                  &mismatch_off, 0);
+  ASSERT_EQ(stripe_unit, written);
+  ASSERT_EQ(0U, mismatch_off);
+
+  // check stripe_unit bytes of large_write_buffer were written
+  std::string large_read_buffer(large_cmp_buffer.length(), '5');
+  ssize_t read = rbd_read(image, stripe_unit, large_read_buffer.length(),
+                          large_read_buffer.data());
+  ASSERT_EQ(large_read_buffer.length(), read);
+  auto buffer_mismatch = std::mismatch(large_read_buffer.begin(),
+                                       large_read_buffer.begin() + stripe_unit,
+                                       large_write_buffer.begin());
+  ASSERT_EQ(large_write_buffer.end(), buffer_mismatch.second);
+  // check data beyond stripe_unit size was not overwritten
+  buffer_mismatch = std::mismatch(large_read_buffer.begin() + stripe_unit,
+                                  large_read_buffer.end(),
+                                  large_cmp_buffer.begin());
+  ASSERT_EQ(large_cmp_buffer.begin() + stripe_unit, buffer_mismatch.second);
+
+  ASSERT_PASSED(validate_object_map, image);
+  ASSERT_EQ(0, rbd_close(image));
+
+  rados_ioctx_destroy(ioctx);
 }
 
-void discard_test_data(librbd::Image& image, off_t off, size_t len, bool *passed)
+TEST_F(TestLibRBD, TestAioCompareAndWriteStripeUnitSuccess)
+{
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+  rbd_image_t image;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 20 << 20; /* 20MiB */
+
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+
+  // large write test => we allow stripe unit size writes (aligned)
+  uint64_t stripe_unit;
+  rbd_get_stripe_unit(image, &stripe_unit);
+  std::string large_write_buffer(stripe_unit, '2');
+  std::string large_cmp_buffer(stripe_unit * 2, '4');
+
+  ssize_t written = rbd_write(image, stripe_unit, large_cmp_buffer.length(),
+                              large_cmp_buffer.data());
+  ASSERT_EQ(large_cmp_buffer.length(), written);
+
+  // aligned stripe unit size access => expect success
+  rbd_completion_t comp;
+  rbd_aio_create_completion(NULL, NULL, &comp);
+  uint64_t mismatch_off = 0;
+  int ret = rbd_aio_compare_and_write(image, stripe_unit, stripe_unit,
+                                      large_cmp_buffer.data(),
+                                      large_write_buffer.data(),
+                                      comp, &mismatch_off, 0);
+  ASSERT_EQ(0, ret);
+  ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
+  ASSERT_EQ(0, rbd_aio_get_return_value(comp));
+  ASSERT_EQ(0U, mismatch_off);
+  rbd_aio_release(comp);
+
+  // check stripe_unit bytes of large_write_buffer were written
+  std::string large_read_buffer(large_cmp_buffer.length(), '5');
+  ssize_t read = rbd_read(image, stripe_unit, large_read_buffer.length(),
+                          large_read_buffer.data());
+  ASSERT_EQ(large_read_buffer.length(), read);
+  auto buffer_mismatch = std::mismatch(large_read_buffer.begin(),
+                                       large_read_buffer.begin() + stripe_unit,
+                                       large_write_buffer.begin());
+  ASSERT_EQ(large_write_buffer.end(), buffer_mismatch.second);
+  // check data beyond stripe_unit size was not overwritten
+  buffer_mismatch = std::mismatch(large_read_buffer.begin() + stripe_unit,
+                                  large_read_buffer.end(),
+                                  large_cmp_buffer.begin());
+  ASSERT_EQ(large_cmp_buffer.begin() + stripe_unit, buffer_mismatch.second);
+
+  ASSERT_PASSED(validate_object_map, image);
+  ASSERT_EQ(0, rbd_close(image));
+
+  rados_ioctx_destroy(ioctx);
+}
+
+TEST_F(TestLibRBD, TestAioCompareAndWriteVIovecLenDiffers)
+{
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+  rbd_image_t image;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 20 << 20; /* 20MiB */
+  off_t off = 512;
+
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+
+  std::string cmp_buffer("This is a test");
+  size_t cmp_len = cmp_buffer.length();
+
+  std::string write_buffer("Write this !!!");
+  struct iovec write_iovs[] = {
+    {.iov_base = &write_buffer[0], .iov_len = 6},
+    {.iov_base = &write_buffer[6], .iov_len = 5},
+    {.iov_base = &write_buffer[11], .iov_len = 3}
+  };
+
+  ASSERT_EQ(cmp_len, rbd_write(image, off, cmp_len, cmp_buffer.data()));
+
+  // should fail because compare iovec len cannot be different to write iovec len
+  rbd_completion_t comp;
+  rbd_aio_create_completion(NULL, NULL, &comp);
+  uint64_t mismatch_off = 0;
+  int ret = rbd_aio_compare_and_writev(image, off,
+                                       write_iovs /* cmp_iovs */, 1,
+                                       write_iovs, std::size(write_iovs),
+                                       comp, &mismatch_off, 0);
+  ASSERT_EQ(-EINVAL, ret);
+  ASSERT_EQ(0U, mismatch_off);
+  rbd_aio_release(comp);
+
+  // check nothing was written
+  std::string read_buffer(cmp_buffer.length(), '1');
+  ssize_t read = rbd_read(image, off, read_buffer.length(), read_buffer.data());
+  ASSERT_EQ(read_buffer.length(), read);
+  ASSERT_EQ(cmp_buffer, read_buffer);
+
+  ASSERT_PASSED(validate_object_map, image);
+  ASSERT_EQ(0, rbd_close(image));
+
+  rados_ioctx_destroy(ioctx);
+}
+
+TEST_F(TestLibRBD, TestAioCompareAndWriteVMismatch)
+{
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+  rbd_image_t image;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 20 << 20; /* 20MiB */
+  off_t off = 512;
+
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+
+  std::string cmp_buffer("This is a test");
+  int cmp_len = cmp_buffer.length();
+
+  std::string write_buffer("Write this !!!");
+  struct iovec write_iovs[] = {
+    {.iov_base = &write_buffer[0], .iov_len = 6},
+    {.iov_base = &write_buffer[6], .iov_len = 5},
+    {.iov_base = &write_buffer[11], .iov_len = 3}
+  };
+
+  std::string mismatch_buffer("This will fail");
+  struct iovec mismatch_iovs[] = {
+    {.iov_base = &mismatch_buffer[0], .iov_len = 5},
+    {.iov_base = &mismatch_buffer[5], .iov_len = 5},
+    {.iov_base = &mismatch_buffer[10], .iov_len = 4}
+  };
+
+  ASSERT_EQ(cmp_len, rbd_write(image, off, cmp_len, cmp_buffer.data()));
+
+  // this should execute the compare but fail because of mismatch
+  rbd_completion_t comp;
+  rbd_aio_create_completion(NULL, NULL, &comp);
+  uint64_t mismatch_off = 0;
+  int ret = rbd_aio_compare_and_writev(image, off,
+                                       mismatch_iovs /* cmp_iovs */,
+                                       std::size(mismatch_iovs),
+                                       write_iovs, std::size(write_iovs),
+                                       comp, &mismatch_off, 0);
+  ASSERT_EQ(0, ret);
+  ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
+  ASSERT_EQ(-EILSEQ, rbd_aio_get_return_value(comp));
+  ASSERT_EQ(5U, mismatch_off);
+  rbd_aio_release(comp);
+
+  // check nothing was written
+  std::string read_buffer(cmp_buffer.length(), '1');
+  ssize_t read = rbd_read(image, off, read_buffer.length(), read_buffer.data());
+  ASSERT_EQ(read_buffer.length(), read);
+  ASSERT_EQ(cmp_buffer, read_buffer);
+
+  ASSERT_PASSED(validate_object_map, image);
+  ASSERT_EQ(0, rbd_close(image));
+
+  rados_ioctx_destroy(ioctx);
+}
+
+TEST_F(TestLibRBD, TestAioCompareAndWriteVSuccess)
+{
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+  rbd_image_t image;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 20 << 20; /* 20MiB */
+  off_t off = 512;
+
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+
+  std::string cmp_buffer("This is a test");
+  struct iovec cmp_iovs[] = {
+    {.iov_base = &cmp_buffer[0], .iov_len = 5},
+    {.iov_base = &cmp_buffer[5], .iov_len = 3},
+    {.iov_base = &cmp_buffer[8], .iov_len = 2},
+    {.iov_base = &cmp_buffer[10], .iov_len = 4}
+  };
+  size_t cmp_len = cmp_buffer.length();
+
+  std::string write_buffer("Write this !!!");
+  struct iovec write_iovs[] = {
+    {.iov_base = &write_buffer[0], .iov_len = 6},
+    {.iov_base = &write_buffer[6], .iov_len = 5},
+    {.iov_base = &write_buffer[11], .iov_len = 3}
+  };
+
+  ASSERT_EQ(cmp_len, rbd_write(image, off, cmp_len, cmp_buffer.data()));
+
+  // compare against the buffer written before => should succeed
+  rbd_completion_t comp;
+  rbd_aio_create_completion(NULL, NULL, &comp);
+  uint64_t mismatch_off = 0;
+  int ret = rbd_aio_compare_and_writev(image, off,
+                                       cmp_iovs, std::size(cmp_iovs),
+                                       write_iovs, std::size(write_iovs),
+                                       comp, &mismatch_off, 0);
+  ASSERT_EQ(0, ret);
+  ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
+  ASSERT_EQ(0, rbd_aio_get_return_value(comp));
+  ASSERT_EQ(0U, mismatch_off);
+  rbd_aio_release(comp);
+
+  // check data was successfully written
+  std::string read_buffer(cmp_buffer.length(), '1');
+  ssize_t read = rbd_read(image, off, read_buffer.length(), read_buffer.data());
+  ASSERT_EQ(read_buffer.length(), read);
+  ASSERT_EQ(write_buffer, read_buffer);
+
+  ASSERT_PASSED(validate_object_map, image);
+  ASSERT_EQ(0, rbd_close(image));
+
+  rados_ioctx_destroy(ioctx);
+}
+
+TEST_F(TestLibRBD, TestScatterGatherIO)
+{
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+  rbd_image_t image;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 20 << 20;
+
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+
+  std::string write_buffer("This is a test");
+  // These iovecs should produce a length overflow
+  struct iovec bad_iovs[] = {
+    {.iov_base = &write_buffer[0], .iov_len = 5},
+    {.iov_base = NULL, .iov_len = std::numeric_limits<size_t>::max()}
+  };
+  struct iovec write_iovs[] = {
+    {.iov_base = &write_buffer[0],  .iov_len = 5},
+    {.iov_base = &write_buffer[5],  .iov_len = 3},
+    {.iov_base = &write_buffer[8],  .iov_len = 2},
+    {.iov_base = &write_buffer[10], .iov_len = 4}
+  };
+
+  rbd_completion_t comp;
+  rbd_aio_create_completion(NULL, NULL, &comp);
+  ASSERT_EQ(-EINVAL, rbd_aio_writev(image, write_iovs, 0, 0, comp));
+  ASSERT_EQ(-EINVAL, rbd_aio_writev(image, bad_iovs, 2, 0, comp));
+  ASSERT_EQ(0, rbd_aio_writev(image, write_iovs,
+                              sizeof(write_iovs) / sizeof(struct iovec),
+                              1<<order, comp));
+  ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
+  ASSERT_EQ(0, rbd_aio_get_return_value(comp));
+  rbd_aio_release(comp);
+
+  std::string read_buffer(write_buffer.size(), '1');
+  struct iovec read_iovs[] = {
+    {.iov_base = &read_buffer[0],  .iov_len = 4},
+    {.iov_base = &read_buffer[8],  .iov_len = 4},
+    {.iov_base = &read_buffer[12], .iov_len = 2}
+  };
+
+  rbd_aio_create_completion(NULL, NULL, &comp);
+  ASSERT_EQ(-EINVAL, rbd_aio_readv(image, read_iovs, 0, 0, comp));
+  ASSERT_EQ(-EINVAL, rbd_aio_readv(image, bad_iovs, 2, 0, comp));
+  ASSERT_EQ(0, rbd_aio_readv(image, read_iovs,
+                             sizeof(read_iovs) / sizeof(struct iovec),
+                             1<<order, comp));
+  ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
+  ASSERT_EQ(10, rbd_aio_get_return_value(comp));
+  rbd_aio_release(comp);
+  ASSERT_EQ("This1111 is a ", read_buffer);
+
+  std::string linear_buffer(write_buffer.size(), '1');
+  struct iovec linear_iovs[] = {
+    {.iov_base = &linear_buffer[4], .iov_len = 4}
+  };
+  rbd_aio_create_completion(NULL, NULL, &comp);
+  ASSERT_EQ(0, rbd_aio_readv(image, linear_iovs,
+                             sizeof(linear_iovs) / sizeof(struct iovec),
+                             1<<order, comp));
+  ASSERT_EQ(0, rbd_aio_wait_for_complete(comp));
+  ASSERT_EQ(4, rbd_aio_get_return_value(comp));
+  rbd_aio_release(comp);
+  ASSERT_EQ("1111This111111", linear_buffer);
+
+  ASSERT_PASSED(validate_object_map, image);
+  ASSERT_EQ(0, rbd_close(image));
+
+  rados_ioctx_destroy(ioctx);
+}
+
+TEST_F(TestLibRBD, TestEmptyDiscard)
+{
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+  rbd_image_t image;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 20 << 20;
+
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+
+  ASSERT_PASSED(aio_discard_test_data, image, 0, 1*1024*1024);
+  ASSERT_PASSED(aio_discard_test_data, image, 0, 4*1024*1024);
+  ASSERT_PASSED(aio_discard_test_data, image, 3*1024*1024, 1*1024*1024);
+
+  ASSERT_PASSED(validate_object_map, image);
+  ASSERT_EQ(0, rbd_close(image));
+
+  rados_ioctx_destroy(ioctx);
+}
+
+TEST_F(TestLibRBD, TestFUA)
+{
+  rados_ioctx_t ioctx;
+  rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+  rbd_image_t image_write;
+  rbd_image_t image_read;
+  int order = 0;
+  std::string name = get_temp_image_name();
+  uint64_t size = 2 << 20;
+
+  ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image_write, NULL));
+  ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image_read, NULL));
+
+  // enable writeback cache
+  rbd_flush(image_write);
+
+  char test_data[TEST_IO_SIZE + 1];
+  int i;
+
+  for (i = 0; i < TEST_IO_SIZE; ++i) {
+    test_data[i] = (char) (rand() % (126 - 33) + 33);
+  }
+  test_data[TEST_IO_SIZE] = '\0';
+  for (i = 0; i < 5; ++i)
+    ASSERT_PASSED(write_test_data, image_write, test_data,
+                  TEST_IO_SIZE * i, TEST_IO_SIZE, LIBRADOS_OP_FLAG_FADVISE_FUA);
+
+  for (i = 0; i < 5; ++i)
+    ASSERT_PASSED(read_test_data, image_read, test_data,
+                  TEST_IO_SIZE * i, TEST_IO_SIZE, 0);
+
+  for (i = 5; i < 10; ++i)
+    ASSERT_PASSED(aio_write_test_data, image_write, test_data,
+                  TEST_IO_SIZE * i, TEST_IO_SIZE, LIBRADOS_OP_FLAG_FADVISE_FUA);
+
+  for (i = 5; i < 10; ++i)
+    ASSERT_PASSED(aio_read_test_data, image_read, test_data,
+                  TEST_IO_SIZE * i, TEST_IO_SIZE, 0);
+
+  ASSERT_PASSED(validate_object_map, image_write);
+  ASSERT_PASSED(validate_object_map, image_read);
+  ASSERT_EQ(0, rbd_close(image_write));
+  ASSERT_EQ(0, rbd_close(image_read));
+  ASSERT_EQ(0, rbd_remove(ioctx, name.c_str()));
+  rados_ioctx_destroy(ioctx);
+}
+
+void simple_write_cb_pp(librbd::completion_t cb, void *arg)
+{
+  cout << "write completion cb called!" << std::endl;
+}
+
+void simple_read_cb_pp(librbd::completion_t cb, void *arg)
+{
+  cout << "read completion cb called!" << std::endl;
+}
+
+void aio_write_test_data(librbd::Image& image, const char *test_data,
+                        off_t off, uint32_t iohint, bool *passed)
+{
+  ceph::bufferlist bl;
+  bl.append(test_data, strlen(test_data));
+  librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(NULL, (librbd::callback_t) simple_write_cb_pp);
+  printf("created completion\n");
+  if (iohint)
+    image.aio_write2(off, strlen(test_data), bl, comp, iohint);
+  else
+    image.aio_write(off, strlen(test_data), bl, comp);
+  printf("started write\n");
+  comp->wait_for_complete();
+  int r = comp->get_return_value();
+  printf("return value is: %d\n", r);
+  ASSERT_EQ(0, r);
+  printf("finished write\n");
+  comp->release();
+  *passed = true;
+}
+
+void aio_discard_test_data(librbd::Image& image, off_t off, size_t len, bool *passed)
+{
+  librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(NULL, (librbd::callback_t) simple_write_cb_pp);
+  image.aio_discard(off, len, comp);
+  comp->wait_for_complete();
+  int r = comp->get_return_value();
+  ASSERT_EQ(0, r);
+  comp->release();
+  *passed = true;
+}
+
+void write_test_data(librbd::Image& image, const char *test_data, off_t off, uint32_t iohint, bool *passed)
+{
+  size_t written;
+  size_t len = strlen(test_data);
+  ceph::bufferlist bl;
+  bl.append(test_data, len);
+  if (iohint)
+    written = image.write2(off, len, bl, iohint);
+  else
+    written = image.write(off, len, bl);
+  printf("wrote: %u\n", (unsigned int) written);
+  ASSERT_EQ(bl.length(), written);
+  *passed = true;
+}
+
+void discard_test_data(librbd::Image& image, off_t off, size_t len, bool *passed)
 {
   size_t written;
   written = image.discard(off, len);
@@ -2786,178 +3479,1139 @@ void discard_test_data(librbd::Image& image, off_t off, size_t len, bool *passed
   *passed = true;
 }
 
-void aio_read_test_data(librbd::Image& image, const char *expected, off_t off, size_t expected_len, uint32_t iohint, bool *passed)
+void aio_read_test_data(librbd::Image& image, const char *expected, off_t off, size_t expected_len, uint32_t iohint, bool *passed)
+{
+  librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(NULL, (librbd::callback_t) simple_read_cb_pp);
+  ceph::bufferlist bl;
+  printf("created completion\n");
+  if (iohint)
+    image.aio_read2(off, expected_len, bl, comp, iohint);
+  else
+    image.aio_read(off, expected_len, bl, comp);
+  printf("started read\n");
+  comp->wait_for_complete();
+  int r = comp->get_return_value();
+  printf("return value is: %d\n", r);
+  ASSERT_EQ(TEST_IO_SIZE, r);
+  ASSERT_EQ(0, memcmp(expected, bl.c_str(), TEST_IO_SIZE));
+  printf("finished read\n");
+  comp->release();
+  *passed = true;
+}
+
+void read_test_data(librbd::Image& image, const char *expected, off_t off, size_t expected_len, uint32_t iohint, bool *passed)
+{
+  int read;
+  size_t len = expected_len;
+  ceph::bufferlist bl;
+  if (iohint)
+    read = image.read2(off, len, bl, iohint);
+  else
+    read = image.read(off, len, bl);
+  ASSERT_TRUE(read >= 0);
+  std::string bl_str(bl.c_str(), read);
+
+  printf("read: %u\n", (unsigned int) read);
+  int result = memcmp(bl_str.c_str(), expected, expected_len);
+  if (result != 0) {
+    printf("read: %s\nexpected: %s\n", bl_str.c_str(), expected);
+    ASSERT_EQ(0, result);
+  }
+  *passed = true;
+}
+
+void aio_writesame_test_data(librbd::Image& image, const char *test_data, off_t off,
+                             size_t len, size_t data_len, uint32_t iohint, bool *passed)
+{
+  ceph::bufferlist bl;
+  bl.append(test_data, data_len);
+  librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(NULL, (librbd::callback_t) simple_write_cb_pp);
+  printf("created completion\n");
+  int r;
+  r = image.aio_writesame(off, len, bl, comp, iohint);
+  printf("started writesame\n");
+  if (len % data_len) {
+    ASSERT_EQ(-EINVAL, r);
+    printf("expected fail, finished writesame\n");
+    comp->release();
+    *passed = true;
+    return;
+  }
+
+  comp->wait_for_complete();
+  r = comp->get_return_value();
+  printf("return value is: %d\n", r);
+  ASSERT_EQ(0, r);
+  printf("finished writesame\n");
+  comp->release();
+
+  //verify data
+  printf("to verify the data\n");
+  int read;
+  uint64_t left = len;
+  while (left > 0) {
+    ceph::bufferlist bl;
+    read = image.read(off, data_len, bl);
+    ASSERT_EQ(data_len, static_cast<size_t>(read));
+    std::string bl_str(bl.c_str(), read);
+    int result = memcmp(bl_str.c_str(), test_data, data_len);
+    if (result !=0 ) {
+      printf("read: %u ~ %u\n", (unsigned int) off, (unsigned int) read);
+      printf("read: %s\nexpected: %s\n", bl_str.c_str(), test_data);
+      ASSERT_EQ(0, result);
+    }
+    off += data_len;
+    left -= data_len;
+  }
+  ASSERT_EQ(0U, left);
+  printf("verified\n");
+
+  *passed = true;
+}
+
+void writesame_test_data(librbd::Image& image, const char *test_data, off_t off,
+                         ssize_t len, size_t data_len, uint32_t iohint,
+                         bool *passed)
+{
+  ssize_t written;
+  ceph::bufferlist bl;
+  bl.append(test_data, data_len);
+  written = image.writesame(off, len, bl, iohint);
+  if (len % data_len) {
+    ASSERT_EQ(-EINVAL, written);
+    printf("expected fail, finished writesame\n");
+    *passed = true;
+    return;
+  }
+  ASSERT_EQ(len, written);
+  printf("wrote: %u\n", (unsigned int) written);
+  *passed = true;
+
+  //verify data
+  printf("to verify the data\n");
+  int read;
+  uint64_t left = len;
+  while (left > 0) {
+    ceph::bufferlist bl;
+    read = image.read(off, data_len, bl);
+    ASSERT_EQ(data_len, static_cast<size_t>(read));
+    std::string bl_str(bl.c_str(), read);
+    int result = memcmp(bl_str.c_str(), test_data, data_len);
+    if (result !=0 ) {
+      printf("read: %u ~ %u\n", (unsigned int) off, (unsigned int) read);
+      printf("read: %s\nexpected: %s\n", bl_str.c_str(), test_data);
+      ASSERT_EQ(0, result);
+    }
+    off += data_len;
+    left -= data_len;
+  }
+  ASSERT_EQ(0U, left);
+  printf("verified\n");
+
+  *passed = true;
+}
+
+void aio_compare_and_write_test_data(librbd::Image& image, const char *cmp_data,
+                                     const char *test_data, off_t off, ssize_t len,
+                                     uint32_t iohint, bool *passed)
+{
+  ceph::bufferlist cmp_bl;
+  cmp_bl.append(cmp_data, strlen(cmp_data));
+  ceph::bufferlist test_bl;
+  test_bl.append(test_data, strlen(test_data));
+  librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(NULL, (librbd::callback_t) simple_write_cb_pp);
+  printf("created completion\n");
+
+  uint64_t mismatch_offset;
+  image.aio_compare_and_write(off, len, cmp_bl, test_bl, comp, &mismatch_offset, iohint);
+  printf("started aio compare and write\n");
+  comp->wait_for_complete();
+  int r = comp->get_return_value();
+  printf("return value is: %d\n", r);
+  ASSERT_EQ(0, r);
+  printf("finished aio compare and write\n");
+  comp->release();
+  *passed = true;
+}
+
+void compare_and_write_test_data(librbd::Image& image, const char *cmp_data, const char *test_data,
+                                 off_t off, ssize_t len, uint64_t *mismatch_off, uint32_t iohint, bool *passed)
+{
+  size_t written;
+  ceph::bufferlist cmp_bl;
+  cmp_bl.append(cmp_data, strlen(cmp_data));
+  ceph::bufferlist test_bl;
+  test_bl.append(test_data, strlen(test_data));
+  printf("start compare and write\n");
+  written = image.compare_and_write(off, len, cmp_bl, test_bl, mismatch_off, iohint);
+  printf("compare and  wrote: %d\n", (int) written);
+  ASSERT_EQ(len, static_cast<ssize_t>(written));
+  *passed = true;
+}
+
+TEST_F(TestLibRBD, TestIOPP)
+{
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 2 << 20;
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    bool skip_discard = this->is_skip_partial_discard_enabled(image);
+
+    char test_data[TEST_IO_SIZE + 1];
+    char zero_data[TEST_IO_SIZE + 1];
+    int i;
+    uint64_t mismatch_offset;
+
+    for (i = 0; i < TEST_IO_SIZE; ++i) {
+      test_data[i] = (char) (rand() % (126 - 33) + 33);
+    }
+    test_data[TEST_IO_SIZE] = '\0';
+    memset(zero_data, 0, sizeof(zero_data));
+
+    for (i = 0; i < 5; ++i)
+      ASSERT_PASSED(write_test_data, image, test_data, strlen(test_data) * i, 0);
+
+    for (i = 5; i < 10; ++i)
+      ASSERT_PASSED(aio_write_test_data, image, test_data, strlen(test_data) * i, 0);
+
+    for (i = 0; i < 5; ++i)
+      ASSERT_PASSED(compare_and_write_test_data, image, test_data, test_data, TEST_IO_SIZE * i,
+        TEST_IO_SIZE, &mismatch_offset, 0);
+
+    for (i = 5; i < 10; ++i)
+      ASSERT_PASSED(aio_compare_and_write_test_data, image, test_data, test_data, TEST_IO_SIZE * i,
+        TEST_IO_SIZE, 0);
+
+    for (i = 0; i < 5; ++i)
+      ASSERT_PASSED(read_test_data, image, test_data, strlen(test_data) * i, TEST_IO_SIZE, 0);
+
+    for (i = 5; i < 10; ++i)
+      ASSERT_PASSED(aio_read_test_data, image, test_data, strlen(test_data) * i, TEST_IO_SIZE, 0);
+
+    // discard 2nd, 4th sections.
+    ASSERT_PASSED(discard_test_data, image, TEST_IO_SIZE, TEST_IO_SIZE);
+    ASSERT_PASSED(aio_discard_test_data, image, TEST_IO_SIZE*3, TEST_IO_SIZE);
+
+    ASSERT_PASSED(read_test_data, image, test_data,  0, TEST_IO_SIZE, 0);
+    ASSERT_PASSED(read_test_data, image, skip_discard ? test_data : zero_data,
+                 TEST_IO_SIZE, TEST_IO_SIZE, 0);
+    ASSERT_PASSED(read_test_data, image, test_data,  TEST_IO_SIZE*2, TEST_IO_SIZE, 0);
+    ASSERT_PASSED(read_test_data, image, skip_discard ? test_data : zero_data,
+                 TEST_IO_SIZE*3, TEST_IO_SIZE, 0);
+    ASSERT_PASSED(read_test_data, image, test_data,  TEST_IO_SIZE*4, TEST_IO_SIZE, 0);
+
+    for (i = 0; i < 15; ++i) {
+      if (i % 3 == 2) {
+        ASSERT_PASSED(writesame_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE, 0);
+        ASSERT_PASSED(writesame_test_data, image, zero_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE, 0);
+      } else if (i % 3 == 1) {
+        ASSERT_PASSED(writesame_test_data, image, test_data, TEST_IO_SIZE + i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+        ASSERT_PASSED(writesame_test_data, image, zero_data, TEST_IO_SIZE + i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+      } else {
+        ASSERT_PASSED(writesame_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+        ASSERT_PASSED(writesame_test_data, image, zero_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+      }
+    }
+    for (i = 0; i < 15; ++i) {
+      if (i % 3 == 2) {
+        ASSERT_PASSED(aio_writesame_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE, 0);
+        ASSERT_PASSED(aio_writesame_test_data, image, zero_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE, 0);
+      } else if (i % 3 == 1) {
+        ASSERT_PASSED(aio_writesame_test_data, image, test_data, TEST_IO_SIZE + i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+        ASSERT_PASSED(aio_writesame_test_data, image, zero_data, TEST_IO_SIZE + i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+      } else {
+        ASSERT_PASSED(aio_writesame_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+        ASSERT_PASSED(aio_writesame_test_data, image, zero_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
+      }
+    }
+
+    ASSERT_PASSED(validate_object_map, image);
+  }
+
+  ioctx.close();
+}
+
+static void compare_written(librbd::Image& image, off_t off, size_t len,
+                            const std::string& buffer, bool *passed)
+{
+  bufferlist read_bl;
+  ssize_t read = image.read(off, len, read_bl);
+  ASSERT_EQ(len, read);
+  std::string read_buffer(read_bl.c_str(), read);
+  ASSERT_EQ(buffer.substr(0, len), read_buffer);
+  *passed = true;
+}
+
+TEST_F(TestLibRBD, TestCompareAndWriteCompareTooSmallPP)
+{
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+    off_t off = 512;
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    std::string cmp_buffer("This is a test");
+    ceph::bufferlist cmp_bl;
+    cmp_bl.append(&cmp_buffer[0], 5);
+    cmp_bl.append(&cmp_buffer[5], 3);
+    cmp_bl.append(&cmp_buffer[8], 2);
+    cmp_bl.append(&cmp_buffer[10], 4);
+
+    std::string write_buffer("Write this !!!");
+    ceph::bufferlist write_bl;
+    write_bl.append(&write_buffer[0], 6);
+    write_bl.append(&write_buffer[6], 5);
+    write_bl.append(&write_buffer[11], 3);
+
+    ssize_t written = image.write(off, cmp_bl.length(), cmp_bl);
+    ASSERT_EQ(cmp_bl.length(), written);
+
+    std::string small_buffer("Too small");
+    ceph::bufferlist small_bl;
+    small_bl.append(&small_buffer[0], 4);
+    small_bl.append(&small_buffer[4], 4);
+
+    // should fail because compare bufferlist cannot be smaller than len
+    uint64_t mismatch_off = 0;
+    written = image.compare_and_write(off, cmp_bl.length(),
+                                      small_bl, /* cmp_bl */
+                                      write_bl,
+                                      &mismatch_off, 0);
+    ASSERT_EQ(-EINVAL, written);
+    ASSERT_EQ(0U, mismatch_off);
+
+    ASSERT_PASSED(validate_object_map, image);
+  }
+
+  ioctx.close();
+}
+
+TEST_F(TestLibRBD, TestAioCompareAndWriteCompareTooSmallPP)
 {
-  librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(NULL, (librbd::callback_t) simple_read_cb_pp);
-  ceph::bufferlist bl;
-  printf("created completion\n");
-  if (iohint)
-    image.aio_read2(off, expected_len, bl, comp, iohint);
-  else
-    image.aio_read(off, expected_len, bl, comp);
-  printf("started read\n");
-  comp->wait_for_complete();
-  int r = comp->get_return_value();
-  printf("return value is: %d\n", r);
-  ASSERT_EQ(TEST_IO_SIZE, r);
-  ASSERT_EQ(0, memcmp(expected, bl.c_str(), TEST_IO_SIZE));
-  printf("finished read\n");
-  comp->release();
-  *passed = true;
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+    off_t off = 512;
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    std::string cmp_buffer("This is a test");
+    ceph::bufferlist cmp_bl;
+    cmp_bl.append(&cmp_buffer[0], 5);
+    cmp_bl.append(&cmp_buffer[5], 3);
+    cmp_bl.append(&cmp_buffer[8], 2);
+    cmp_bl.append(&cmp_buffer[10], 4);
+
+    std::string write_buffer("Write this !!!");
+    ceph::bufferlist write_bl;
+    write_bl.append(&write_buffer[0], 6);
+    write_bl.append(&write_buffer[6], 5);
+    write_bl.append(&write_buffer[11], 3);
+
+    ssize_t written = image.write(off, cmp_bl.length(), cmp_bl);
+    ASSERT_EQ(cmp_bl.length(), written);
+
+    std::string small_buffer("Too small");
+    ceph::bufferlist small_bl;
+    small_bl.append(&small_buffer[0], 4);
+    small_bl.append(&small_buffer[4], 4);
+
+    // should fail because compare bufferlist cannot be smaller than len
+    librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(
+        NULL, (librbd::callback_t) simple_write_cb_pp);
+    uint64_t mismatch_off = 0;
+    int ret = image.aio_compare_and_write(off, cmp_bl.length(),
+                                          small_bl, /* cmp_bl */
+                                          write_bl,
+                                          comp, &mismatch_off, 0);
+    ASSERT_EQ(-EINVAL, ret);
+    ASSERT_EQ(0U, mismatch_off);
+    comp->release();
+
+    ASSERT_PASSED(validate_object_map, image);
+  }
+
+  ioctx.close();
 }
 
-void read_test_data(librbd::Image& image, const char *expected, off_t off, size_t expected_len, uint32_t iohint, bool *passed)
+TEST_F(TestLibRBD, TestCompareAndWriteWriteTooSmallPP)
 {
-  int read;
-  size_t len = expected_len;
-  ceph::bufferlist bl;
-  if (iohint)
-    read = image.read2(off, len, bl, iohint);
-  else
-    read = image.read(off, len, bl);
-  ASSERT_TRUE(read >= 0);
-  std::string bl_str(bl.c_str(), read);
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
 
-  printf("read: %u\n", (unsigned int) read);
-  int result = memcmp(bl_str.c_str(), expected, expected_len);
-  if (result != 0) {
-    printf("read: %s\nexpected: %s\n", bl_str.c_str(), expected);
-    ASSERT_EQ(0, result);
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+    off_t off = 512;
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    std::string cmp_buffer("This is a test");
+    ceph::bufferlist cmp_bl;
+    cmp_bl.append(&cmp_buffer[0], 5);
+    cmp_bl.append(&cmp_buffer[5], 3);
+    cmp_bl.append(&cmp_buffer[8], 2);
+    cmp_bl.append(&cmp_buffer[10], 4);
+
+    ssize_t written = image.write(off, cmp_bl.length(), cmp_bl);
+    ASSERT_EQ(cmp_bl.length(), written);
+
+    std::string small_buffer("Too small");
+    ceph::bufferlist small_bl;
+    small_bl.append(&small_buffer[0], 4);
+    small_bl.append(&small_buffer[4], 4);
+
+    // should fail because write bufferlist cannot be smaller than len
+    uint64_t mismatch_off = 0;
+    written = image.compare_and_write(off, cmp_bl.length(),
+                                      cmp_bl,
+                                      small_bl, /* write_bl */
+                                      &mismatch_off, 0);
+    ASSERT_EQ(-EINVAL, written);
+    ASSERT_EQ(0U, mismatch_off);
+
+    ASSERT_PASSED(validate_object_map, image);
   }
-  *passed = true;
+
+  ioctx.close();
 }
 
-void aio_writesame_test_data(librbd::Image& image, const char *test_data, off_t off,
-                             size_t len, size_t data_len, uint32_t iohint, bool *passed)
+TEST_F(TestLibRBD, TestAioCompareAndWriteWriteTooSmallPP)
 {
-  ceph::bufferlist bl;
-  bl.append(test_data, data_len);
-  librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(NULL, (librbd::callback_t) simple_write_cb_pp);
-  printf("created completion\n");
-  int r;
-  r = image.aio_writesame(off, len, bl, comp, iohint);
-  printf("started writesame\n");
-  if (len % data_len) {
-    ASSERT_EQ(-EINVAL, r);
-    printf("expected fail, finished writesame\n");
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+    off_t off = 512;
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    std::string cmp_buffer("This is a test");
+    ceph::bufferlist cmp_bl;
+    cmp_bl.append(&cmp_buffer[0], 5);
+    cmp_bl.append(&cmp_buffer[5], 3);
+    cmp_bl.append(&cmp_buffer[8], 2);
+    cmp_bl.append(&cmp_buffer[10], 4);
+
+    ssize_t written = image.write(off, cmp_bl.length(), cmp_bl);
+    ASSERT_EQ(cmp_bl.length(), written);
+
+    std::string small_buffer("Too small");
+    ceph::bufferlist small_bl;
+    small_bl.append(&small_buffer[0], 4);
+    small_bl.append(&small_buffer[4], 4);
+
+    // should fail because write bufferlist cannot be smaller than len
+    librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(
+        NULL, (librbd::callback_t) simple_write_cb_pp);
+    uint64_t mismatch_off = 0;
+    int ret = image.aio_compare_and_write(off, cmp_bl.length(),
+                                          cmp_bl,
+                                          small_bl, /* write_bl */
+                                          comp, &mismatch_off, 0);
+    ASSERT_EQ(-EINVAL, ret);
+    ASSERT_EQ(0U, mismatch_off);
+    comp->release();
+
+    ASSERT_PASSED(validate_object_map, image);
+  }
+
+  ioctx.close();
+}
+
+TEST_F(TestLibRBD, TestCompareAndWriteMismatchPP)
+{
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+    off_t off = 512;
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    std::string cmp_buffer("This is a test");
+    ceph::bufferlist cmp_bl;
+    cmp_bl.append(&cmp_buffer[0], 5);
+    cmp_bl.append(&cmp_buffer[5], 3);
+    cmp_bl.append(&cmp_buffer[8], 2);
+    cmp_bl.append(&cmp_buffer[10], 4);
+
+    std::string write_buffer("Write this !!!");
+    ceph::bufferlist write_bl;
+    write_bl.append(&write_buffer[0], 6);
+    write_bl.append(&write_buffer[6], 5);
+    write_bl.append(&write_buffer[11], 3);
+
+    std::string mismatch_buffer("This will fail");
+    ceph::bufferlist mismatch_bl;
+    mismatch_bl.append(&mismatch_buffer[0], 5);
+    mismatch_bl.append(&mismatch_buffer[5], 5);
+    mismatch_bl.append(&mismatch_buffer[10], 4);
+
+    ssize_t written = image.write(off, cmp_bl.length(), cmp_bl);
+    ASSERT_EQ(cmp_bl.length(), written);
+
+    // this should execute the compare but fail because of mismatch
+    uint64_t mismatch_off = 0;
+    written = image.compare_and_write(off, write_bl.length(),
+                                      mismatch_bl, /* cmp_bl */
+                                      write_bl,
+                                      &mismatch_off, 0);
+    ASSERT_EQ(-EILSEQ, written);
+    ASSERT_EQ(5U, mismatch_off);
+
+    // check that nothing was written
+    ASSERT_PASSED(compare_written, image, off, cmp_bl.length(), cmp_buffer);
+
+    ASSERT_PASSED(validate_object_map, image);
+  }
+
+  ioctx.close();
+}
+
+TEST_F(TestLibRBD, TestAioCompareAndWriteMismatchPP)
+{
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+    off_t off = 512;
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    std::string cmp_buffer("This is a test");
+    ceph::bufferlist cmp_bl;
+    cmp_bl.append(&cmp_buffer[0], 5);
+    cmp_bl.append(&cmp_buffer[5], 3);
+    cmp_bl.append(&cmp_buffer[8], 2);
+    cmp_bl.append(&cmp_buffer[10], 4);
+
+    std::string write_buffer("Write this !!!");
+    ceph::bufferlist write_bl;
+    write_bl.append(&write_buffer[0], 6);
+    write_bl.append(&write_buffer[6], 5);
+    write_bl.append(&write_buffer[11], 3);
+
+    std::string mismatch_buffer("This will fail");
+    ceph::bufferlist mismatch_bl;
+    mismatch_bl.append(&mismatch_buffer[0], 5);
+    mismatch_bl.append(&mismatch_buffer[5], 5);
+    mismatch_bl.append(&mismatch_buffer[10], 4);
+
+    ssize_t written = image.write(off, cmp_bl.length(), cmp_bl);
+    ASSERT_EQ(cmp_bl.length(), written);
+
+    // this should execute the compare but fail because of mismatch
+    librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(
+        NULL, (librbd::callback_t) simple_write_cb_pp);
+    uint64_t mismatch_off = 0;
+    int ret = image.aio_compare_and_write(off, write_bl.length(),
+                                          mismatch_bl, /* cmp_bl */
+                                          write_bl,
+                                          comp, &mismatch_off, 0);
+    ASSERT_EQ(0, ret);
+    comp->wait_for_complete();
+    ssize_t aio_ret = comp->get_return_value();
+    ASSERT_EQ(-EILSEQ, aio_ret);
+    ASSERT_EQ(5U, mismatch_off);
+    comp->release();
+
+    // check that nothing was written
+    ASSERT_PASSED(compare_written, image, off, cmp_bl.length(), cmp_buffer);
+
+    ASSERT_PASSED(validate_object_map, image);
+  }
+
+  ioctx.close();
+}
+
+TEST_F(TestLibRBD, TestCompareAndWriteMismatchBufferlistGreaterLenPP)
+{
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+    off_t off = 512;
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    std::string cmp_buffer("This is a test");
+    ceph::bufferlist cmp_bl;
+    cmp_bl.append(&cmp_buffer[0], 5);
+    cmp_bl.append(&cmp_buffer[5], 3);
+    cmp_bl.append(&cmp_buffer[8], 2);
+    cmp_bl.append(&cmp_buffer[10], 4);
+
+    std::string write_buffer("Write this !!!");
+    ceph::bufferlist write_bl;
+    write_bl.append(&write_buffer[0], 6);
+    write_bl.append(&write_buffer[6], 5);
+    write_bl.append(&write_buffer[11], 3);
+
+    std::string mismatch_buffer("This will fail");
+    ceph::bufferlist mismatch_bl;
+    mismatch_bl.append(&mismatch_buffer[0], 5);
+    mismatch_bl.append(&mismatch_buffer[5], 5);
+    mismatch_bl.append(&mismatch_buffer[10], 4);
+
+    ssize_t written = image.write(off, cmp_bl.length(), cmp_bl);
+    ASSERT_EQ(cmp_bl.length(), written);
+
+    /*
+     * we allow cmp_bl and write_bl to be greater than len so this
+     * should execute the compare but fail because of mismatch
+     */
+    uint64_t mismatch_off = 0;
+    written = image.compare_and_write(off, cmp_bl.length() - 1,
+                                      mismatch_bl, /* cmp_bl */
+                                      write_bl,
+                                      &mismatch_off, 0);
+    ASSERT_EQ(-EILSEQ, written);
+    ASSERT_EQ(5U, mismatch_off);
+
+    // check that nothing was written
+    ASSERT_PASSED(compare_written, image, off, cmp_bl.length(), cmp_buffer);
+
+    ASSERT_PASSED(validate_object_map, image);
+  }
+
+  ioctx.close();
+}
+
+TEST_F(TestLibRBD, TestAioCompareAndWriteMismatchBufferlistGreaterLenPP)
+{
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+    off_t off = 512;
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    std::string cmp_buffer("This is a test");
+    ceph::bufferlist cmp_bl;
+    cmp_bl.append(&cmp_buffer[0], 5);
+    cmp_bl.append(&cmp_buffer[5], 3);
+    cmp_bl.append(&cmp_buffer[8], 2);
+    cmp_bl.append(&cmp_buffer[10], 4);
+
+    std::string write_buffer("Write this !!!");
+    ceph::bufferlist write_bl;
+    write_bl.append(&write_buffer[0], 6);
+    write_bl.append(&write_buffer[6], 5);
+    write_bl.append(&write_buffer[11], 3);
+
+    std::string mismatch_buffer("This will fail");
+    ceph::bufferlist mismatch_bl;
+    mismatch_bl.append(&mismatch_buffer[0], 5);
+    mismatch_bl.append(&mismatch_buffer[5], 5);
+    mismatch_bl.append(&mismatch_buffer[10], 4);
+
+    ssize_t written = image.write(off, cmp_bl.length(), cmp_bl);
+    ASSERT_EQ(cmp_bl.length(), written);
+
+    /*
+     * we allow cmp_bl and write_bl to be greater than len so this
+     * should execute the compare but fail because of mismatch
+     */
+    librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(
+        NULL, (librbd::callback_t) simple_write_cb_pp);
+    uint64_t mismatch_off = 0;
+    int ret = image.aio_compare_and_write(off, cmp_bl.length() - 1,
+                                          mismatch_bl, /* cmp_bl */
+                                          write_bl,
+                                          comp, &mismatch_off, 0);
+    ASSERT_EQ(0, ret);
+    comp->wait_for_complete();
+    ssize_t aio_ret = comp->get_return_value();
+    ASSERT_EQ(-EILSEQ, aio_ret);
+    ASSERT_EQ(5U, mismatch_off);
+    comp->release();
+
+    // check that nothing was written
+    ASSERT_PASSED(compare_written, image, off, cmp_bl.length(), cmp_buffer);
+
+    ASSERT_PASSED(validate_object_map, image);
+  }
+
+  ioctx.close();
+}
+
+TEST_F(TestLibRBD, TestCompareAndWriteSuccessPP)
+{
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+    off_t off = 512;
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    std::string cmp_buffer("This is a test");
+    ceph::bufferlist cmp_bl;
+    cmp_bl.append(&cmp_buffer[0], 5);
+    cmp_bl.append(&cmp_buffer[5], 3);
+    cmp_bl.append(&cmp_buffer[8], 2);
+    cmp_bl.append(&cmp_buffer[10], 4);
+
+    std::string write_buffer("Write this !!!");
+    ceph::bufferlist write_bl;
+    write_bl.append(&write_buffer[0], 6);
+    write_bl.append(&write_buffer[6], 5);
+    write_bl.append(&write_buffer[11], 3);
+
+    ssize_t written = image.write(off, cmp_bl.length(), cmp_bl);
+    ASSERT_EQ(cmp_bl.length(), written);
+
+    // compare against the buffer written before => should succeed
+    uint64_t mismatch_off = 0;
+    written = image.compare_and_write(off, cmp_bl.length(),
+                                      cmp_bl,
+                                      write_bl,
+                                      &mismatch_off, 0);
+    ASSERT_EQ(write_bl.length(), written);
+    ASSERT_EQ(0U, mismatch_off);
+
+    // check write_bl was written
+    ASSERT_PASSED(compare_written, image, off, write_bl.length(), write_buffer);
+
+    ASSERT_PASSED(validate_object_map, image);
+  }
+
+  ioctx.close();
+}
+
+TEST_F(TestLibRBD, TestAioCompareAndWriteSuccessPP)
+{
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+    off_t off = 512;
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    std::string cmp_buffer("This is a test");
+    ceph::bufferlist cmp_bl;
+    cmp_bl.append(&cmp_buffer[0], 5);
+    cmp_bl.append(&cmp_buffer[5], 3);
+    cmp_bl.append(&cmp_buffer[8], 2);
+    cmp_bl.append(&cmp_buffer[10], 4);
+
+    std::string write_buffer("Write this !!!");
+    ceph::bufferlist write_bl;
+    write_bl.append(&write_buffer[0], 6);
+    write_bl.append(&write_buffer[6], 5);
+    write_bl.append(&write_buffer[11], 3);
+
+    ssize_t written = image.write(off, cmp_bl.length(), cmp_bl);
+    ASSERT_EQ(cmp_bl.length(), written);
+
+    // compare against the buffer written before => should succeed
+    librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(
+        NULL, (librbd::callback_t) simple_write_cb_pp);
+    uint64_t mismatch_off = 0;
+    int ret = image.aio_compare_and_write(off, write_bl.length(),
+                                          cmp_bl,
+                                          write_bl,
+                                          comp, &mismatch_off, 0);
+    ASSERT_EQ(0, ret);
+    comp->wait_for_complete();
+    ssize_t aio_ret = comp->get_return_value();
+    ASSERT_EQ(0, aio_ret);
+    ASSERT_EQ(0U, mismatch_off);
+    comp->release();
+
+    // check write_bl was written
+    ASSERT_PASSED(compare_written, image, off, write_bl.length(), write_buffer);
+
+    ASSERT_PASSED(validate_object_map, image);
+  }
+
+  ioctx.close();
+}
+
+TEST_F(TestLibRBD, TestCompareAndWriteSuccessBufferlistGreaterLenPP)
+{
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+    off_t off = 512;
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    std::string cmp_buffer("This is a test");
+    ceph::bufferlist cmp_bl;
+    cmp_bl.append(&cmp_buffer[0], 5);
+    cmp_bl.append(&cmp_buffer[5], 3);
+    cmp_bl.append(&cmp_buffer[8], 2);
+    cmp_bl.append(&cmp_buffer[10], 4);
+
+    std::string write_buffer("Write this !!!");
+    ceph::bufferlist write_bl;
+    write_bl.append(&write_buffer[0], 6);
+    write_bl.append(&write_buffer[6], 5);
+    write_bl.append(&write_buffer[11], 3);
+
+    std::string mismatch_buffer("This will fail");
+    ceph::bufferlist mismatch_bl;
+    mismatch_bl.append(&mismatch_buffer[0], 5);
+    mismatch_bl.append(&mismatch_buffer[5], 5);
+    mismatch_bl.append(&mismatch_buffer[10], 4);
+
+    /*
+     * Test len < cmp_bl & write_bl => should succeed but only compare
+     * len bytes resp. only write len bytes
+     */
+    ssize_t written = image.write(off, mismatch_bl.length(), mismatch_bl);
+    ASSERT_EQ(mismatch_bl.length(), written);
+
+    size_t len_m1 = cmp_bl.length() - 1;
+    written = image.write(off, len_m1, cmp_bl);
+    ASSERT_EQ(len_m1, written);
+    // the content of the image at off should now be "This is a tesl"
+
+    uint64_t mismatch_off = 0;
+    written = image.compare_and_write(off, len_m1,
+                                      cmp_bl,
+                                      write_bl,
+                                      &mismatch_off, 0);
+    ASSERT_EQ(len_m1, written);
+    ASSERT_EQ(0U, mismatch_off);
+
+    // check that only write_bl.length() - 1 bytes were written
+    ASSERT_PASSED(compare_written, image, off, len_m1, write_buffer);
+    ASSERT_PASSED(compare_written, image, off + len_m1, 1, std::string("l"));
+
+    ASSERT_PASSED(validate_object_map, image);
+  }
+
+  ioctx.close();
+}
+
+TEST_F(TestLibRBD, TestAioCompareAndWriteSuccessBufferlistGreaterLenPP)
+{
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+    off_t off = 512;
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    std::string cmp_buffer("This is a test");
+    ceph::bufferlist cmp_bl;
+    cmp_bl.append(&cmp_buffer[0], 5);
+    cmp_bl.append(&cmp_buffer[5], 3);
+    cmp_bl.append(&cmp_buffer[8], 2);
+    cmp_bl.append(&cmp_buffer[10], 4);
+
+    std::string write_buffer("Write this !!!");
+    ceph::bufferlist write_bl;
+    write_bl.append(&write_buffer[0], 6);
+    write_bl.append(&write_buffer[6], 5);
+    write_bl.append(&write_buffer[11], 3);
+
+    std::string mismatch_buffer("This will fail");
+    ceph::bufferlist mismatch_bl;
+    mismatch_bl.append(&mismatch_buffer[0], 5);
+    mismatch_bl.append(&mismatch_buffer[5], 5);
+    mismatch_bl.append(&mismatch_buffer[10], 4);
+
+    ssize_t written = image.write(off, cmp_bl.length(), cmp_bl);
+    ASSERT_EQ(cmp_bl.length(), written);
+
+    /*
+     * Test len < cmp_bl & write_bl => should succeed but only compare
+     * len bytes resp. only write len bytes
+     */
+    written = image.write(off, mismatch_bl.length(), mismatch_bl);
+    ASSERT_EQ(mismatch_bl.length(), written);
+
+    size_t len_m1 = cmp_bl.length() - 1;
+    written = image.write(off, len_m1, cmp_bl);
+    ASSERT_EQ(len_m1, written);
+    // the content of the image at off should now be "This is a tesl"
+
+    librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(
+        NULL, (librbd::callback_t) simple_write_cb_pp);
+    uint64_t mismatch_off = 0;
+    int ret = image.aio_compare_and_write(off, len_m1,
+                                          cmp_bl,
+                                          write_bl,
+                                          comp, &mismatch_off, 0);
+    ASSERT_EQ(0, ret);
+    comp->wait_for_complete();
+    ssize_t aio_ret = comp->get_return_value();
+    ASSERT_EQ(0, aio_ret);
+    ASSERT_EQ(0U, mismatch_off);
     comp->release();
-    *passed = true;
-    return;
-  }
 
-  comp->wait_for_complete();
-  r = comp->get_return_value();
-  printf("return value is: %d\n", r);
-  ASSERT_EQ(0, r);
-  printf("finished writesame\n");
-  comp->release();
+    // check that only write_bl.length() - 1 bytes were written
+    ASSERT_PASSED(compare_written, image, off, len_m1, write_buffer);
+    ASSERT_PASSED(compare_written, image, off + len_m1, 1, std::string("l"));
 
-  //verify data
-  printf("to verify the data\n");
-  int read;
-  uint64_t left = len;
-  while (left > 0) {
-    ceph::bufferlist bl;
-    read = image.read(off, data_len, bl);
-    ASSERT_EQ(data_len, static_cast<size_t>(read));
-    std::string bl_str(bl.c_str(), read);
-    int result = memcmp(bl_str.c_str(), test_data, data_len);
-    if (result !=0 ) {
-      printf("read: %u ~ %u\n", (unsigned int) off, (unsigned int) read);
-      printf("read: %s\nexpected: %s\n", bl_str.c_str(), test_data);
-      ASSERT_EQ(0, result);
-    }
-    off += data_len;
-    left -= data_len;
+    ASSERT_PASSED(validate_object_map, image);
   }
-  ASSERT_EQ(0U, left);
-  printf("verified\n");
 
-  *passed = true;
+  ioctx.close();
 }
 
-void writesame_test_data(librbd::Image& image, const char *test_data, off_t off,
-                         ssize_t len, size_t data_len, uint32_t iohint,
-                         bool *passed)
+TEST_F(TestLibRBD, TestCompareAndWriteStripeUnitUnalignedPP)
 {
-  ssize_t written;
-  ceph::bufferlist bl;
-  bl.append(test_data, data_len);
-  written = image.writesame(off, len, bl, iohint);
-  if (len % data_len) {
+  REQUIRE(!is_rbd_pwl_enabled((CephContext *)_rados.cct()));
+
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    // large write test => we allow stripe unit size writes (aligned)
+    uint64_t stripe_unit = image.get_stripe_unit();
+    std::string large_write_buffer(stripe_unit, '2');
+    ceph::bufferlist large_write_bl;
+    large_write_bl.append(large_write_buffer.data(),
+                          large_write_buffer.length());
+
+    std::string large_cmp_buffer(stripe_unit * 2, '3');
+    ceph::bufferlist large_cmp_bl;
+    large_cmp_bl.append(large_cmp_buffer.data(), large_cmp_buffer.length());
+
+    ssize_t written = image.write(stripe_unit, large_cmp_bl.length(),
+                                  large_cmp_bl);
+    ASSERT_EQ(large_cmp_bl.length(), written);
+
+    /*
+     * compare and write at offset stripe_unit + 1 and stripe unit size
+     * Expect fail because access exceeds stripe
+     */
+    uint64_t mismatch_off = 0;
+    written = image.compare_and_write(stripe_unit + 1, stripe_unit,
+                                      large_cmp_bl,
+                                      large_write_bl,
+                                      &mismatch_off, 0);
     ASSERT_EQ(-EINVAL, written);
-    printf("expected fail, finished writesame\n");
-    *passed = true;
-    return;
-  }
-  ASSERT_EQ(len, written);
-  printf("wrote: %u\n", (unsigned int) written);
-  *passed = true;
+    ASSERT_EQ(0U, mismatch_off);
 
-  //verify data
-  printf("to verify the data\n");
-  int read;
-  uint64_t left = len;
-  while (left > 0) {
-    ceph::bufferlist bl;
-    read = image.read(off, data_len, bl);
-    ASSERT_EQ(data_len, static_cast<size_t>(read));
-    std::string bl_str(bl.c_str(), read);
-    int result = memcmp(bl_str.c_str(), test_data, data_len);
-    if (result !=0 ) {
-      printf("read: %u ~ %u\n", (unsigned int) off, (unsigned int) read);
-      printf("read: %s\nexpected: %s\n", bl_str.c_str(), test_data);
-      ASSERT_EQ(0, result);
-    }
-    off += data_len;
-    left -= data_len;
+    // check nothing has been written
+    ASSERT_PASSED(compare_written, image, stripe_unit, large_cmp_bl.length(),
+                  large_cmp_buffer);
+
+    ASSERT_PASSED(validate_object_map, image);
   }
-  ASSERT_EQ(0U, left);
-  printf("verified\n");
 
-  *passed = true;
+  ioctx.close();
 }
 
-void aio_compare_and_write_test_data(librbd::Image& image, const char *cmp_data,
-                                     const char *test_data, off_t off, ssize_t len,
-                                     uint32_t iohint, bool *passed)
+TEST_F(TestLibRBD, TestAioCompareAndWriteStripeUnitUnalignedPP)
 {
-  ceph::bufferlist cmp_bl;
-  cmp_bl.append(cmp_data, strlen(cmp_data));
-  ceph::bufferlist test_bl;
-  test_bl.append(test_data, strlen(test_data));
-  librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(NULL, (librbd::callback_t) simple_write_cb_pp);
-  printf("created completion\n");
+  REQUIRE(!is_rbd_pwl_enabled((CephContext *)_rados.cct()));
 
-  uint64_t mismatch_offset;
-  image.aio_compare_and_write(off, len, cmp_bl, test_bl, comp, &mismatch_offset, iohint);
-  printf("started aio compare and write\n");
-  comp->wait_for_complete();
-  int r = comp->get_return_value();
-  printf("return value is: %d\n", r);
-  ASSERT_EQ(0, r);
-  printf("finished aio compare and write\n");
-  comp->release();
-  *passed = true;
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    // large write test => we allow stripe unit size writes (aligned)
+    uint64_t stripe_unit = image.get_stripe_unit();
+    std::string large_write_buffer(stripe_unit, '2');
+    ceph::bufferlist large_write_bl;
+    large_write_bl.append(large_write_buffer.data(),
+                          large_write_buffer.length());
+
+    std::string large_cmp_buffer(stripe_unit * 2, '3');
+    ceph::bufferlist large_cmp_bl;
+    large_cmp_bl.append(large_cmp_buffer.data(), large_cmp_buffer.length());
+
+    ssize_t written = image.write(stripe_unit, large_cmp_bl.length(),
+                                  large_cmp_bl);
+    ASSERT_EQ(large_cmp_bl.length(), written);
+
+    /*
+     * compare and write at offset stripe_unit + 1 and stripe unit size
+     * Expect fail because access exceeds stripe
+     */
+    librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(
+        NULL, (librbd::callback_t) simple_write_cb_pp);
+    uint64_t mismatch_off = 0;
+    int ret = image.aio_compare_and_write(stripe_unit + 1, stripe_unit,
+                                          large_cmp_bl,
+                                          large_write_bl,
+                                          comp, &mismatch_off, 0);
+    ASSERT_EQ(0, ret);
+    comp->wait_for_complete();
+    ssize_t aio_ret = comp->get_return_value();
+    ASSERT_EQ(-EINVAL, aio_ret);
+    ASSERT_EQ(0U, mismatch_off);
+    comp->release();
+
+    // check nothing has been written
+    ASSERT_PASSED(compare_written, image, stripe_unit, large_cmp_bl.length(),
+                  large_cmp_buffer);
+
+    ASSERT_PASSED(validate_object_map, image);
+  }
+
+  ioctx.close();
 }
 
-void compare_and_write_test_data(librbd::Image& image, const char *cmp_data, const char *test_data,
-                                 off_t off, ssize_t len, uint64_t *mismatch_off, uint32_t iohint, bool *passed)
+TEST_F(TestLibRBD, TestCompareAndWriteTooLargePP)
 {
-  size_t written;
-  ceph::bufferlist cmp_bl;
-  cmp_bl.append(cmp_data, strlen(cmp_data));
-  ceph::bufferlist test_bl;
-  test_bl.append(test_data, strlen(test_data));
-  printf("start compare and write\n");
-  written = image.compare_and_write(off, len, cmp_bl, test_bl, mismatch_off, iohint);
-  printf("compare and  wrote: %d\n", (int) written);
-  ASSERT_EQ(len, static_cast<ssize_t>(written));
-  *passed = true;
+  REQUIRE(!is_rbd_pwl_enabled((CephContext *)_rados.cct()));
+
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    // large write test => we allow stripe unit size writes (aligned)
+    uint64_t stripe_unit = image.get_stripe_unit();
+    std::string large_write_buffer(stripe_unit * 2, '2');
+    ceph::bufferlist large_write_bl;
+    large_write_bl.append(large_write_buffer.data(),
+                          large_write_buffer.length());
+
+    std::string large_cmp_buffer(stripe_unit * 2, '3');
+    ceph::bufferlist large_cmp_bl;
+    large_cmp_bl.append(large_cmp_buffer.data(), large_cmp_buffer.length());
+
+    ssize_t written = image.write(stripe_unit, large_cmp_bl.length(),
+                                  large_cmp_bl);
+    ASSERT_EQ(large_cmp_bl.length(), written);
+
+    /*
+     * compare and write at offset stripe_unit and stripe unit size + 1
+     * Expect fail because access is larger than stripe unit size
+     */
+    uint64_t mismatch_off = 0;
+    written = image.compare_and_write(stripe_unit, stripe_unit + 1,
+                                      large_cmp_bl,
+                                      large_write_bl,
+                                      &mismatch_off, 0);
+    ASSERT_EQ(-EINVAL, written);
+    ASSERT_EQ(0U, mismatch_off);
+
+    // check nothing has been written
+    ASSERT_PASSED(compare_written, image, stripe_unit, large_cmp_bl.length(),
+                  large_cmp_buffer);
+
+    ASSERT_PASSED(validate_object_map, image);
+  }
+
+  ioctx.close();
 }
 
-TEST_F(TestLibRBD, TestIOPP)
+TEST_F(TestLibRBD, TestAioCompareAndWriteTooLargePP)
 {
+  REQUIRE(!is_rbd_pwl_enabled((CephContext *)_rados.cct()));
+
   librados::IoCtx ioctx;
   ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
 
@@ -2966,80 +4620,156 @@ TEST_F(TestLibRBD, TestIOPP)
     librbd::Image image;
     int order = 0;
     std::string name = get_temp_image_name();
-    uint64_t size = 2 << 20;
+    uint64_t size = 20 << 20; /* 20MiB */
 
     ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
     ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
 
-    bool skip_discard = this->is_skip_partial_discard_enabled(image);
+    // large write test => we allow stripe unit size writes (aligned)
+    uint64_t stripe_unit = image.get_stripe_unit();
+    std::string large_write_buffer(stripe_unit * 2, '2');
+    ceph::bufferlist large_write_bl;
+    large_write_bl.append(large_write_buffer.data(),
+                          large_write_buffer.length());
+
+    std::string large_cmp_buffer(stripe_unit * 2, '3');
+    ceph::bufferlist large_cmp_bl;
+    large_cmp_bl.append(large_cmp_buffer.data(), large_cmp_buffer.length());
+
+    ssize_t written = image.write(stripe_unit, large_cmp_bl.length(),
+                                  large_cmp_bl);
+    ASSERT_EQ(large_cmp_bl.length(), written);
+
+    /*
+     * compare and write at offset stripe_unit and stripe unit size + 1
+     * Expect fail because access is larger than stripe unit size
+     */
+    librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(
+        NULL, (librbd::callback_t) simple_write_cb_pp);
+    uint64_t mismatch_off = 0;
+    int ret = image.aio_compare_and_write(stripe_unit, stripe_unit + 1,
+                                          large_cmp_bl,
+                                          large_write_bl,
+                                          comp, &mismatch_off, 0);
+    ASSERT_EQ(0, ret);
+    comp->wait_for_complete();
+    ssize_t aio_ret = comp->get_return_value();
+    ASSERT_EQ(-EINVAL, aio_ret);
+    ASSERT_EQ(0U, mismatch_off);
+    comp->release();
 
-    char test_data[TEST_IO_SIZE + 1];
-    char zero_data[TEST_IO_SIZE + 1];
-    int i;
-    uint64_t mismatch_offset;
+    // check nothing has been written
+    ASSERT_PASSED(compare_written, image, stripe_unit, large_cmp_bl.length(),
+                  large_cmp_buffer);
 
-    for (i = 0; i < TEST_IO_SIZE; ++i) {
-      test_data[i] = (char) (rand() % (126 - 33) + 33);
-    }
-    test_data[TEST_IO_SIZE] = '\0';
-    memset(zero_data, 0, sizeof(zero_data));
+    ASSERT_PASSED(validate_object_map, image);
+  }
 
-    for (i = 0; i < 5; ++i)
-      ASSERT_PASSED(write_test_data, image, test_data, strlen(test_data) * i, 0);
+  ioctx.close();
+}
 
-    for (i = 5; i < 10; ++i)
-      ASSERT_PASSED(aio_write_test_data, image, test_data, strlen(test_data) * i, 0);
+TEST_F(TestLibRBD, TestCompareAndWriteStripeUnitSuccessPP)
+{
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
 
-    for (i = 0; i < 5; ++i)
-      ASSERT_PASSED(compare_and_write_test_data, image, test_data, test_data, TEST_IO_SIZE * i,
-        TEST_IO_SIZE, &mismatch_offset, 0);
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
 
-    for (i = 5; i < 10; ++i)
-      ASSERT_PASSED(aio_compare_and_write_test_data, image, test_data, test_data, TEST_IO_SIZE * i,
-        TEST_IO_SIZE, 0);
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
 
-    for (i = 0; i < 5; ++i)
-      ASSERT_PASSED(read_test_data, image, test_data, strlen(test_data) * i, TEST_IO_SIZE, 0);
+    // large write test => we allow stripe unit size writes (aligned)
+    uint64_t stripe_unit = image.get_stripe_unit();
+    std::string large_write_buffer(stripe_unit * 2, '2');
+    ceph::bufferlist large_write_bl;
+    large_write_bl.append(large_write_buffer.data(),
+                          large_write_buffer.length());
+
+    std::string large_cmp_buffer(stripe_unit * 2, '3');
+    ceph::bufferlist large_cmp_bl;
+    large_cmp_bl.append(large_cmp_buffer.data(), large_cmp_buffer.length());
+
+    ssize_t written = image.write(stripe_unit, large_cmp_bl.length(),
+                                  large_cmp_bl);
+    ASSERT_EQ(large_cmp_bl.length(), written);
+
+    // aligned stripe unit size access => expect success
+    uint64_t mismatch_off = 0;
+    written = image.compare_and_write(stripe_unit, stripe_unit,
+                                      large_cmp_bl,
+                                      large_write_bl,
+                                      &mismatch_off, 0);
+    ASSERT_EQ(stripe_unit, written);
+    ASSERT_EQ(0U, mismatch_off);
+
+    // check large_write_bl was written and nothing beyond
+    ASSERT_PASSED(compare_written, image, stripe_unit, stripe_unit,
+                  large_write_buffer);
+    ASSERT_PASSED(compare_written, image, stripe_unit * 2, stripe_unit,
+                  large_cmp_buffer);
 
-    for (i = 5; i < 10; ++i)
-      ASSERT_PASSED(aio_read_test_data, image, test_data, strlen(test_data) * i, TEST_IO_SIZE, 0);
+    ASSERT_PASSED(validate_object_map, image);
+  }
 
-    // discard 2nd, 4th sections.
-    ASSERT_PASSED(discard_test_data, image, TEST_IO_SIZE, TEST_IO_SIZE);
-    ASSERT_PASSED(aio_discard_test_data, image, TEST_IO_SIZE*3, TEST_IO_SIZE);
+  ioctx.close();
+}
 
-    ASSERT_PASSED(read_test_data, image, test_data,  0, TEST_IO_SIZE, 0);
-    ASSERT_PASSED(read_test_data, image, skip_discard ? test_data : zero_data,
-                 TEST_IO_SIZE, TEST_IO_SIZE, 0);
-    ASSERT_PASSED(read_test_data, image, test_data,  TEST_IO_SIZE*2, TEST_IO_SIZE, 0);
-    ASSERT_PASSED(read_test_data, image, skip_discard ? test_data : zero_data,
-                 TEST_IO_SIZE*3, TEST_IO_SIZE, 0);
-    ASSERT_PASSED(read_test_data, image, test_data,  TEST_IO_SIZE*4, TEST_IO_SIZE, 0);
+TEST_F(TestLibRBD, TestAioCompareAndWriteStripeUnitSuccessPP)
+{
+  librados::IoCtx ioctx;
+  ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
 
-    for (i = 0; i < 15; ++i) {
-      if (i % 3 == 2) {
-        ASSERT_PASSED(writesame_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE, 0);
-        ASSERT_PASSED(writesame_test_data, image, zero_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE, 0);
-      } else if (i % 3 == 1) {
-        ASSERT_PASSED(writesame_test_data, image, test_data, TEST_IO_SIZE + i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-        ASSERT_PASSED(writesame_test_data, image, zero_data, TEST_IO_SIZE + i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-      } else {
-        ASSERT_PASSED(writesame_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-        ASSERT_PASSED(writesame_test_data, image, zero_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-      }
-    }
-    for (i = 0; i < 15; ++i) {
-      if (i % 3 == 2) {
-        ASSERT_PASSED(aio_writesame_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE, 0);
-        ASSERT_PASSED(aio_writesame_test_data, image, zero_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32 + i, TEST_IO_SIZE, 0);
-      } else if (i % 3 == 1) {
-        ASSERT_PASSED(aio_writesame_test_data, image, test_data, TEST_IO_SIZE + i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-        ASSERT_PASSED(aio_writesame_test_data, image, zero_data, TEST_IO_SIZE + i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-      } else {
-        ASSERT_PASSED(aio_writesame_test_data, image, test_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-        ASSERT_PASSED(aio_writesame_test_data, image, zero_data, TEST_IO_SIZE * i, TEST_IO_SIZE * i * 32, TEST_IO_SIZE, 0);
-      }
-    }
+  {
+    librbd::RBD rbd;
+    librbd::Image image;
+    int order = 0;
+    std::string name = get_temp_image_name();
+    uint64_t size = 20 << 20; /* 20MiB */
+
+    ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+    ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+    // large write test => we allow stripe unit size writes (aligned)
+    uint64_t stripe_unit = image.get_stripe_unit();
+    std::string large_write_buffer(stripe_unit * 2, '2');
+    ceph::bufferlist large_write_bl;
+    large_write_bl.append(large_write_buffer.data(),
+                          large_write_buffer.length());
+
+    std::string large_cmp_buffer(stripe_unit * 2, '3');
+    ceph::bufferlist large_cmp_bl;
+    large_cmp_bl.append(large_cmp_buffer.data(),
+                        large_cmp_buffer.length());
+
+    ssize_t written = image.write(stripe_unit, large_cmp_bl.length(),
+                                  large_cmp_bl);
+    ASSERT_EQ(large_cmp_bl.length(), written);
+
+    // aligned stripe unit size access => expect success
+    librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(
+        NULL, (librbd::callback_t) simple_write_cb_pp);
+    uint64_t mismatch_off = 0;
+    int ret = image.aio_compare_and_write(stripe_unit, stripe_unit,
+                                          large_cmp_bl,
+                                          large_write_bl,
+                                          comp, &mismatch_off, 0);
+    ASSERT_EQ(0, ret);
+    comp->wait_for_complete();
+    ssize_t aio_ret = comp->get_return_value();
+    ASSERT_EQ(0, aio_ret);
+    ASSERT_EQ(0U, mismatch_off);
+    comp->release();
+
+    // check large_write_bl was written and nothing beyond
+    ASSERT_PASSED(compare_written, image, stripe_unit, stripe_unit,
+                  large_write_buffer);
+    ASSERT_PASSED(compare_written, image, stripe_unit * 2, stripe_unit,
+                  large_cmp_buffer);
 
     ASSERT_PASSED(validate_object_map, image);
   }
@@ -5127,14 +6857,14 @@ void compare_and_write_copyup(librados::IoCtx &ioctx, bool deep_copyup,
   }
 
   bufferlist cmp_bl;
-  cmp_bl.append(std::string(96, '1'));
+  cmp_bl.append(std::string(512, '1'));
   bufferlist write_bl;
   write_bl.append(std::string(512, '2'));
-  uint64_t mismatch_off;
+  uint64_t mismatch_off = 0;
   ASSERT_EQ((ssize_t)write_bl.length(),
             clone_image.compare_and_write(512, write_bl.length(), cmp_bl,
                                           write_bl, &mismatch_off, 0));
-
+  ASSERT_EQ(0U, mismatch_off);
   bufferlist read_bl;
   ASSERT_EQ(4096, clone_image.read(0, 4096, read_bl));
 
@@ -5191,10 +6921,10 @@ void compare_and_write_copyup_mismatch(librados::IoCtx &ioctx,
 
   bufferlist cmp_bl;
   cmp_bl.append(std::string(48, '1'));
-  cmp_bl.append(std::string(48, '3'));
+  cmp_bl.append(std::string(464, '3'));
   bufferlist write_bl;
   write_bl.append(std::string(512, '2'));
-  uint64_t mismatch_off;
+  uint64_t mismatch_off = 0;
   ASSERT_EQ(-EILSEQ,
             clone_image.compare_and_write(512, write_bl.length(), cmp_bl,
                                           write_bl, &mismatch_off, 0));