]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "rgw_notify.h" | |
5 | #include <memory> | |
6 | #include <boost/algorithm/hex.hpp> | |
7 | #include "rgw_pubsub.h" | |
8 | #include "rgw_pubsub_push.h" | |
9 | #include "rgw_perf_counters.h" | |
10 | #include "common/dout.h" | |
11 | ||
12 | #define dout_subsys ceph_subsys_rgw | |
13 | ||
14 | namespace rgw::notify { | |
15 | ||
16 | // populate record from request | |
17 | void populate_record_from_request(const req_state *s, | |
9f95a23c TL |
18 | const rgw_obj_key& key, |
19 | uint64_t size, | |
eafe8130 TL |
20 | const ceph::real_time& mtime, |
21 | const std::string& etag, | |
22 | EventType event_type, | |
23 | rgw_pubsub_s3_record& record) { | |
eafe8130 TL |
24 | record.eventTime = mtime; |
25 | record.eventName = to_string(event_type); | |
9f95a23c | 26 | record.userIdentity = s->user->get_id().id; // user that triggered the change |
eafe8130 TL |
27 | record.x_amz_request_id = s->req_id; // request ID of the original change |
28 | record.x_amz_id_2 = s->host_id; // RGW on which the change was made | |
9f95a23c | 29 | // configurationId is filled from notification configuration |
eafe8130 TL |
30 | record.bucket_name = s->bucket_name; |
31 | record.bucket_ownerIdentity = s->bucket_owner.get_id().id; | |
32 | record.bucket_arn = to_string(rgw::ARN(s->bucket)); | |
9f95a23c TL |
33 | record.object_key = key.name; |
34 | record.object_size = size; | |
eafe8130 | 35 | record.object_etag = etag; |
9f95a23c | 36 | record.object_versionId = key.instance; |
eafe8130 TL |
37 | // use timestamp as per key sequence id (hex encoded) |
38 | const utime_t ts(real_clock::now()); | |
39 | boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), | |
40 | std::back_inserter(record.object_sequencer)); | |
92f5a8d4 | 41 | set_event_id(record.id, etag, ts); |
eafe8130 TL |
42 | record.bucket_id = s->bucket.bucket_id; |
43 | // pass meta data | |
44 | record.x_meta_map = s->info.x_meta_map; | |
9f95a23c TL |
45 | // pass tags |
46 | record.tags = s->tagset.get_tags(); | |
47 | // opaque data will be filled from topic configuration | |
eafe8130 TL |
48 | } |
49 | ||
50 | bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event) { | |
51 | if (!::match(filter.events, event)) { | |
52 | return false; | |
53 | } | |
54 | if (!::match(filter.s3_filter.key_filter, s->object.name)) { | |
55 | return false; | |
56 | } | |
57 | if (!::match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) { | |
58 | return false; | |
59 | } | |
9f95a23c TL |
60 | if (!::match(filter.s3_filter.tag_filter, s->tagset.get_tags())) { |
61 | return false; | |
62 | } | |
eafe8130 TL |
63 | return true; |
64 | } | |
65 | ||
66 | int publish(const req_state* s, | |
9f95a23c TL |
67 | const rgw_obj_key& key, |
68 | uint64_t size, | |
eafe8130 TL |
69 | const ceph::real_time& mtime, |
70 | const std::string& etag, | |
71 | EventType event_type, | |
9f95a23c TL |
72 | rgw::sal::RGWRadosStore* store) { |
73 | RGWUserPubSub ps_user(store, s->user->get_id()); | |
eafe8130 TL |
74 | RGWUserPubSub::Bucket ps_bucket(&ps_user, s->bucket); |
75 | rgw_pubsub_bucket_topics bucket_topics; | |
76 | auto rc = ps_bucket.get_topics(&bucket_topics); | |
77 | if (rc < 0) { | |
78 | // failed to fetch bucket topics | |
79 | return rc; | |
80 | } | |
81 | rgw_pubsub_s3_record record; | |
9f95a23c | 82 | populate_record_from_request(s, key, size, mtime, etag, event_type, record); |
eafe8130 TL |
83 | bool event_handled = false; |
84 | bool event_should_be_handled = false; | |
85 | for (const auto& bucket_topic : bucket_topics.topics) { | |
86 | const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second; | |
87 | const rgw_pubsub_topic& topic_cfg = topic_filter.topic; | |
88 | if (!match(topic_filter, s, event_type)) { | |
89 | // topic does not apply to req_state | |
90 | continue; | |
91 | } | |
92 | event_should_be_handled = true; | |
93 | record.configurationId = topic_filter.s3_id; | |
9f95a23c | 94 | record.opaque_data = topic_cfg.opaque_data; |
eafe8130 TL |
95 | ldout(s->cct, 20) << "notification: '" << topic_filter.s3_id << |
96 | "' on topic: '" << topic_cfg.dest.arn_topic << | |
97 | "' and bucket: '" << s->bucket.name << | |
98 | "' (unique topic: '" << topic_cfg.name << | |
99 | "') apply to event of type: '" << to_string(event_type) << "'" << dendl; | |
100 | try { | |
101 | // TODO add endpoint LRU cache | |
102 | const auto push_endpoint = RGWPubSubEndpoint::create(topic_cfg.dest.push_endpoint, | |
103 | topic_cfg.dest.arn_topic, | |
104 | RGWHTTPArgs(topic_cfg.dest.push_endpoint_args), | |
105 | s->cct); | |
106 | const std::string push_endpoint_str = push_endpoint->to_str(); | |
107 | ldout(s->cct, 20) << "push endpoint created: " << push_endpoint_str << dendl; | |
108 | auto rc = push_endpoint->send_to_completion_async(s->cct, record, s->yield); | |
109 | if (rc < 0) { | |
110 | // bail out on first error | |
111 | // TODO: add conf for bail out policy | |
112 | ldout(s->cct, 1) << "push to endpoint " << push_endpoint_str << " failed, with error: " << rc << dendl; | |
113 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); | |
114 | return rc; | |
115 | } | |
116 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); | |
117 | ldout(s->cct, 20) << "successfull push to endpoint " << push_endpoint_str << dendl; | |
118 | event_handled = true; | |
119 | } catch (const RGWPubSubEndpoint::configuration_error& e) { | |
120 | ldout(s->cct, 1) << "ERROR: failed to create push endpoint: " | |
121 | << topic_cfg.dest.push_endpoint << " due to: " << e.what() << dendl; | |
122 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); | |
123 | return -EINVAL; | |
124 | } | |
125 | } | |
126 | ||
127 | if (event_should_be_handled) { | |
128 | // not counting events with no notifications or events that are filtered | |
129 | // counting a single event, regardless of the number of notifications it sends | |
130 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered); | |
131 | if (!event_handled) { | |
132 | // all notifications for this event failed | |
133 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost); | |
134 | } | |
135 | } | |
136 | ||
137 | return 0; | |
138 | } | |
139 | ||
140 | } | |
141 |