]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_reshard.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / rgw / rgw_reshard.cc
index 12ba93bd9d86e285de07aed5fb6b91239e0bf2b6..cfbada535cfc681a686589217d8a7f05be12b506 100644 (file)
@@ -345,7 +345,7 @@ int RGWBucketReshard::cancel()
 class BucketInfoReshardUpdate
 {
   RGWRados *store;
-  RGWBucketInfo bucket_info;
+  RGWBucketInfo& bucket_info;
   std::map<string, bufferlist> bucket_attrs;
 
   bool in_progress{false};
@@ -527,16 +527,16 @@ int RGWBucketReshard::do_reshard(int num_shards,
 
   BucketReshardManager target_shards_mgr(store, new_bucket_info, num_target_shards);
 
-  verbose = verbose && (formatter != nullptr);
+  bool verbose_json_out = verbose && (formatter != nullptr) && (out != nullptr);
 
-  if (verbose) {
+  if (verbose_json_out) {
     formatter->open_array_section("entries");
   }
 
   uint64_t total_entries = 0;
 
-  if (!verbose) {
-    cout << "total entries:";
+  if (!verbose_json_out && out) {
+    (*out) << "total entries:";
   }
 
   const int num_source_shards =
@@ -555,7 +555,7 @@ int RGWBucketReshard::do_reshard(int num_shards,
 
       for (auto iter = entries.begin(); iter != entries.end(); ++iter) {
        rgw_cls_bi_entry& entry = *iter;
-       if (verbose) {
+       if (verbose_json_out) {
          formatter->open_object_section("entry");
 
          encode_json("shard_id", i, formatter);
@@ -573,6 +573,11 @@ int RGWBucketReshard::do_reshard(int num_shards,
        bool account = entry.get_info(&cls_key, &category, &stats);
        rgw_obj_key key(cls_key);
        rgw_obj obj(new_bucket_info.bucket, key);
+       RGWMPObj mp;
+       if (key.ns == RGW_OBJ_NS_MULTIPART && mp.from_meta(key.name)) {
+         // place the multipart .meta object on the same shard as its head object
+         obj.index_hash_source = mp.get_key();
+       }
        int ret = store->get_target_shard_id(new_bucket_info, obj.get_hash_object(), &target_shard_id);
        if (ret < 0) {
          lderr(store->ctx()) << "ERROR: get_target_shard_id() returned ret=" << ret << dendl;
@@ -604,11 +609,9 @@ int RGWBucketReshard::do_reshard(int num_shards,
          }
        }
 
-       if (verbose) {
+       if (verbose_json_out) {
          formatter->close_section();
-         if (out) {
-           formatter->flush(*out);
-         }
+         formatter->flush(*out);
        } else if (out && !(total_entries % 1000)) {
          (*out) << " " << total_entries;
        }
@@ -616,11 +619,9 @@ int RGWBucketReshard::do_reshard(int num_shards,
     }
   }
 
-  if (verbose) {
+  if (verbose_json_out) {
     formatter->close_section();
-    if (out) {
-      formatter->flush(*out);
-    }
+    formatter->flush(*out);
   } else if (out) {
     (*out) << " " << total_entries << std::endl;
   }
@@ -982,12 +983,12 @@ int RGWReshard::process_single_logshard(int logshard_num)
   RGWBucketReshardLock logshard_lock(store, logshard_oid, false);
 
   int ret = logshard_lock.lock();
-  if (ret == -EBUSY) { /* already locked by another processor */
+  if (ret < 0) { 
     ldout(store->ctx(), 5) << __func__ << "(): failed to acquire lock on " <<
-      logshard_oid << dendl;
+      logshard_oid << ", ret = " << ret <<dendl;
     return ret;
   }
-
+  
   do {
     std::list<cls_rgw_reshard_entry> entries;
     ret = list(logshard_num, marker, max_entries, entries, &truncated);
@@ -1011,35 +1012,57 @@ int RGWReshard::process_single_logshard(int logshard_num)
        ret = store->get_bucket_info(obj_ctx, entry.tenant, entry.bucket_name,
                                     bucket_info, nullptr, &attrs);
        if (ret < 0) {
-         ldout(cct, 0) <<  __func__ << ": Error in get_bucket_info: " <<
-           cpp_strerror(-ret) << dendl;
-         return -ret;
+         ldout(cct, 0) <<  __func__ <<
+           ": Error in get_bucket_info for bucket " << entry.bucket_name <<
+           ": " << cpp_strerror(-ret) << dendl;
+         if (ret != -ENOENT) {
+           // any error other than ENOENT will abort
+           return ret;
+         }
+
+         // we've encountered a reshard queue entry for an apparently
+         // non-existent bucket; let's try to recover by cleaning up
+         ldout(cct, 0) <<  __func__ <<
+           ": removing reshard queue entry for non-existent bucket " <<
+           entry.bucket_name << dendl;
+
+         ret = remove(entry);
+         if (ret < 0) {
+           ldout(cct, 0) << __func__ <<
+             ": Error removing non-existent bucket " <<
+             entry.bucket_name << " from resharding queue: " <<
+             cpp_strerror(-ret) << dendl;
+           return ret;
+         }
+
+         // we cleaned up, move on to the next entry
+         goto finished_entry;
        }
 
        RGWBucketReshard br(store, bucket_info, attrs, nullptr);
-
-       Formatter* formatter = new JSONFormatter(false);
-       auto formatter_ptr = std::unique_ptr<Formatter>(formatter);
-       ret = br.execute(entry.new_num_shards, max_entries, true, nullptr,
-                        formatter, this);
+       ret = br.execute(entry.new_num_shards, max_entries, false, nullptr,
+                        nullptr, this);
        if (ret < 0) {
-         ldout (store->ctx(), 0) <<  __func__ <<
-           "ERROR in reshard_bucket " << entry.bucket_name << ":" <<
+         ldout(store->ctx(), 0) <<  __func__ <<
+           ": Error during resharding bucket " << entry.bucket_name << ":" <<
            cpp_strerror(-ret)<< dendl;
          return ret;
        }
 
-       ldout (store->ctx(), 20) <<  " removing entry" << entry.bucket_name <<
+       ldout(store->ctx(), 20) << __func__ <<
+         " removing reshard queue entry for bucket " << entry.bucket_name <<
          dendl;
 
        ret = remove(entry);
        if (ret < 0) {
-         ldout(cct, 0)<< __func__ << ":Error removing bucket " <<
-           entry.bucket_name << " for resharding queue: " <<
+         ldout(cct, 0) << __func__ << ": Error removing bucket " <<
+           entry.bucket_name << " from resharding queue: " <<
            cpp_strerror(-ret) << dendl;
          return ret;
        }
-      }
+      } // if new instance id is empty
+
+    finished_entry:
 
       Clock::time_point now = Clock::now();
       if (logshard_lock.should_renew(now)) {
@@ -1050,7 +1073,7 @@ int RGWReshard::process_single_logshard(int logshard_num)
       }
 
       entry.get_key(&marker);
-    }
+    } // entry for loop
   } while (truncated);
 
   logshard_lock.unlock();