]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_notify.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / rgw / rgw_notify.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_notify.h"
5 #include <memory>
6 #include <boost/algorithm/hex.hpp>
7 #include "rgw_pubsub.h"
8 #include "rgw_pubsub_push.h"
9 #include "rgw_perf_counters.h"
10 #include "common/dout.h"
11
12 #define dout_subsys ceph_subsys_rgw
13
14 namespace rgw::notify {
15
16 // populate record from request
17 void populate_record_from_request(const req_state *s,
18 const rgw_obj_key& key,
19 uint64_t size,
20 const ceph::real_time& mtime,
21 const std::string& etag,
22 EventType event_type,
23 rgw_pubsub_s3_record& record) {
24 record.eventTime = mtime;
25 record.eventName = to_string(event_type);
26 record.userIdentity = s->user->get_id().id; // user that triggered the change
27 record.x_amz_request_id = s->req_id; // request ID of the original change
28 record.x_amz_id_2 = s->host_id; // RGW on which the change was made
29 // configurationId is filled from notification configuration
30 record.bucket_name = s->bucket_name;
31 record.bucket_ownerIdentity = s->bucket_owner.get_id().id;
32 record.bucket_arn = to_string(rgw::ARN(s->bucket));
33 record.object_key = key.name;
34 record.object_size = size;
35 record.object_etag = etag;
36 record.object_versionId = key.instance;
37 // use timestamp as per key sequence id (hex encoded)
38 const utime_t ts(real_clock::now());
39 boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
40 std::back_inserter(record.object_sequencer));
41 set_event_id(record.id, etag, ts);
42 record.bucket_id = s->bucket.bucket_id;
43 // pass meta data
44 record.x_meta_map = s->info.x_meta_map;
45 // pass tags
46 record.tags = s->tagset.get_tags();
47 // opaque data will be filled from topic configuration
48 }
49
50 bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event) {
51 if (!::match(filter.events, event)) {
52 return false;
53 }
54 if (!::match(filter.s3_filter.key_filter, s->object.name)) {
55 return false;
56 }
57 if (!::match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
58 return false;
59 }
60 if (!::match(filter.s3_filter.tag_filter, s->tagset.get_tags())) {
61 return false;
62 }
63 return true;
64 }
65
66 int publish(const req_state* s,
67 const rgw_obj_key& key,
68 uint64_t size,
69 const ceph::real_time& mtime,
70 const std::string& etag,
71 EventType event_type,
72 rgw::sal::RGWRadosStore* store) {
73 RGWUserPubSub ps_user(store, s->user->get_id());
74 RGWUserPubSub::Bucket ps_bucket(&ps_user, s->bucket);
75 rgw_pubsub_bucket_topics bucket_topics;
76 auto rc = ps_bucket.get_topics(&bucket_topics);
77 if (rc < 0) {
78 // failed to fetch bucket topics
79 return rc;
80 }
81 rgw_pubsub_s3_record record;
82 populate_record_from_request(s, key, size, mtime, etag, event_type, record);
83 bool event_handled = false;
84 bool event_should_be_handled = false;
85 for (const auto& bucket_topic : bucket_topics.topics) {
86 const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
87 const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
88 if (!match(topic_filter, s, event_type)) {
89 // topic does not apply to req_state
90 continue;
91 }
92 event_should_be_handled = true;
93 record.configurationId = topic_filter.s3_id;
94 record.opaque_data = topic_cfg.opaque_data;
95 ldout(s->cct, 20) << "notification: '" << topic_filter.s3_id <<
96 "' on topic: '" << topic_cfg.dest.arn_topic <<
97 "' and bucket: '" << s->bucket.name <<
98 "' (unique topic: '" << topic_cfg.name <<
99 "') apply to event of type: '" << to_string(event_type) << "'" << dendl;
100 try {
101 // TODO add endpoint LRU cache
102 const auto push_endpoint = RGWPubSubEndpoint::create(topic_cfg.dest.push_endpoint,
103 topic_cfg.dest.arn_topic,
104 RGWHTTPArgs(topic_cfg.dest.push_endpoint_args),
105 s->cct);
106 const std::string push_endpoint_str = push_endpoint->to_str();
107 ldout(s->cct, 20) << "push endpoint created: " << push_endpoint_str << dendl;
108 auto rc = push_endpoint->send_to_completion_async(s->cct, record, s->yield);
109 if (rc < 0) {
110 // bail out on first error
111 // TODO: add conf for bail out policy
112 ldout(s->cct, 1) << "push to endpoint " << push_endpoint_str << " failed, with error: " << rc << dendl;
113 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
114 return rc;
115 }
116 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
117 ldout(s->cct, 20) << "successfull push to endpoint " << push_endpoint_str << dendl;
118 event_handled = true;
119 } catch (const RGWPubSubEndpoint::configuration_error& e) {
120 ldout(s->cct, 1) << "ERROR: failed to create push endpoint: "
121 << topic_cfg.dest.push_endpoint << " due to: " << e.what() << dendl;
122 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
123 return -EINVAL;
124 }
125 }
126
127 if (event_should_be_handled) {
128 // not counting events with no notifications or events that are filtered
129 // counting a single event, regardless of the number of notifications it sends
130 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered);
131 if (!event_handled) {
132 // all notifications for this event failed
133 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost);
134 }
135 }
136
137 return 0;
138 }
139
140 }
141