]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_rados.cc
update sources to 12.2.10
[ceph.git] / ceph / src / rgw / rgw_rados.cc
index 876147417a68d2044c6d41aa0e3d2858ba36d087..f123514e470e29f3677cf63d4f394202cabfad7c 100644 (file)
@@ -3239,10 +3239,9 @@ class RGWDataSyncProcessorThread : public RGWSyncProcessorThread
   }
 public:
   RGWDataSyncProcessorThread(RGWRados *_store, RGWAsyncRadosProcessor *async_rados,
-                             const string& _source_zone,
-                             rgw::BucketChangeObserver *observer)
+                             const string& _source_zone)
     : RGWSyncProcessorThread(_store, "data-sync"),
-      sync(_store, async_rados, _source_zone, observer),
+      sync(_store, async_rados, _source_zone),
       initialized(false) {}
 
   void wakeup_sync_shards(map<int, set<string> >& shard_ids) {
@@ -3602,7 +3601,6 @@ public:
     for (int i = 0; i < num_shards; ++i) {
       Mutex::Locker l(*locks[i]);
       for (auto c : completions[i]) {
-        Mutex::Locker cl(c->lock);
         c->stop();
       }
     }
@@ -4601,12 +4599,12 @@ int RGWRados::init_complete()
       ldout(cct, 0) << "ERROR: failed to start bucket trim manager" << dendl;
       return ret;
     }
+    data_log->set_observer(&*bucket_trim);
 
     Mutex::Locker dl(data_sync_thread_lock);
     for (auto iter : zone_data_sync_from_map) {
       ldout(cct, 5) << "starting data sync thread for zone " << iter.first << dendl;
-      auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first,
-                                                    &*bucket_trim);
+      auto *thread = new RGWDataSyncProcessorThread(this, async_rados, iter.first);
       ret = thread->init();
       if (ret < 0) {
         ldout(cct, 0) << "ERROR: failed to initialize data sync thread" << dendl;
@@ -7123,7 +7121,8 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
     orig_size = state->accounted_size;
   }
 
-  bool versioned_target = (meta.olh_epoch > 0 || !obj.key.instance.empty());
+  bool versioned_target = (meta.olh_epoch && *meta.olh_epoch > 0) ||
+                          !obj.key.instance.empty();
 
   bool versioned_op = (target->versioning_enabled() || is_olh || versioned_target);
 
@@ -7170,8 +7169,8 @@ int RGWRados::Object::Write::_do_write_meta(uint64_t size, uint64_t accounted_si
   target->invalidate_state();
   state = NULL;
 
-  if (versioned_op) {
-    r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, meta.olh_epoch, real_time(), false, meta.zones_trace);
+  if (versioned_op && meta.olh_epoch) {
+    r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, *meta.olh_epoch, real_time(), false, meta.zones_trace);
     if (r < 0) {
       return r;
     }
@@ -7893,7 +7892,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
                bool copy_if_newer,
                map<string, bufferlist>& attrs,
                RGWObjCategory category,
-               uint64_t olh_epoch,
+               boost::optional<uint64_t> olh_epoch,
               real_time delete_at,
                string *version_id,
                string *ptag,
@@ -7917,7 +7916,9 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   if (version_id && *version_id != "null") {
     processor.set_version_id(*version_id);
   }
-  processor.set_olh_epoch(olh_epoch);
+  if (olh_epoch) {
+    processor.set_olh_epoch(*olh_epoch);
+  }
   int ret = processor.prepare(this, NULL);
   if (ret < 0) {
     return ret;
@@ -8117,7 +8118,16 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   return 0;
 set_err_state:
   if (copy_if_newer && ret == -ERR_NOT_MODIFIED) {
-    ret = 0;
+    // we may have already fetched during sync of OP_ADD, but were waiting
+    // for OP_LINK_OLH to call set_olh() with a real olh_epoch
+    if (olh_epoch && *olh_epoch > 0) {
+      constexpr bool log_data_change = true;
+      ret = set_olh(obj_ctx, dest_bucket_info, dest_obj, false, nullptr,
+                    *olh_epoch, real_time(), false, zones_trace, log_data_change);
+    } else {
+      // we already have the latest copy
+      ret = 0;
+    }
   }
   if (opstate) {
     RGWOpState::OpState state;
@@ -9040,6 +9050,8 @@ int RGWRados::Object::Delete::delete_obj()
       }
 
       result.version_id = marker.key.instance;
+      if (result.version_id.empty())
+        result.version_id = "null";
       result.delete_marker = true;
 
       struct rgw_bucket_dir_entry_meta meta;
@@ -11216,7 +11228,8 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat
                                     const string& op_tag,
                                     struct rgw_bucket_dir_entry_meta *meta,
                                     uint64_t olh_epoch,
-                                    real_time unmod_since, bool high_precision_time, rgw_zone_set *_zones_trace)
+                                    real_time unmod_since, bool high_precision_time,
+                                    rgw_zone_set *_zones_trace, bool log_data_change)
 {
   rgw_rados_ref ref;
   int r = get_obj_head_ref(bucket_info, obj_instance, &ref);
@@ -11246,6 +11259,10 @@ int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjStat
     return r;
   }
 
+  if (log_data_change && bucket_info.datasync_flag_enabled()) {
+    data_log->add_entry(bs.bucket, bs.shard_id);
+  }
+
   return 0;
 }
 
@@ -11538,7 +11555,8 @@ int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBuc
 }
 
 int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
-                      uint64_t olh_epoch, real_time unmod_since, bool high_precision_time, rgw_zone_set *zones_trace)
+                      uint64_t olh_epoch, real_time unmod_since, bool high_precision_time,
+                      rgw_zone_set *zones_trace, bool log_data_change)
 {
   string op_tag;
 
@@ -11569,7 +11587,9 @@ int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const r
       }
       return ret;
     }
-    ret = bucket_index_link_olh(bucket_info, *state, target_obj, delete_marker, op_tag, meta, olh_epoch, unmod_since, high_precision_time, zones_trace);
+    ret = bucket_index_link_olh(bucket_info, *state, target_obj, delete_marker,
+                                op_tag, meta, olh_epoch, unmod_since, high_precision_time,
+                                zones_trace, log_data_change);
     if (ret < 0) {
       ldout(cct, 20) << "bucket_index_link_olh() target_obj=" << target_obj << " delete_marker=" << (int)delete_marker << " returned " << ret << dendl;
       if (ret == -ECANCELED) {