1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "rgw_notify.h"
6 #include <boost/algorithm/hex.hpp>
7 #include "rgw_pubsub.h"
8 #include "rgw_pubsub_push.h"
9 #include "rgw_perf_counters.h"
10 #include "common/dout.h"
12 #define dout_subsys ceph_subsys_rgw
14 namespace rgw::notify
{
16 // populate record from request
17 void populate_record_from_request(const req_state
*s
,
18 const rgw_obj_key
& key
,
20 const ceph::real_time
& mtime
,
21 const std::string
& etag
,
23 rgw_pubsub_s3_record
& record
) {
24 record
.eventTime
= mtime
;
25 record
.eventName
= to_string(event_type
);
26 record
.userIdentity
= s
->user
->get_id().id
; // user that triggered the change
27 record
.x_amz_request_id
= s
->req_id
; // request ID of the original change
28 record
.x_amz_id_2
= s
->host_id
; // RGW on which the change was made
29 // configurationId is filled from notification configuration
30 record
.bucket_name
= s
->bucket_name
;
31 record
.bucket_ownerIdentity
= s
->bucket_owner
.get_id().id
;
32 record
.bucket_arn
= to_string(rgw::ARN(s
->bucket
));
33 record
.object_key
= key
.name
;
34 record
.object_size
= size
;
35 record
.object_etag
= etag
;
36 record
.object_versionId
= key
.instance
;
37 // use timestamp as per key sequence id (hex encoded)
38 const utime_t
ts(real_clock::now());
39 boost::algorithm::hex((const char*)&ts
, (const char*)&ts
+ sizeof(utime_t
),
40 std::back_inserter(record
.object_sequencer
));
41 set_event_id(record
.id
, etag
, ts
);
42 record
.bucket_id
= s
->bucket
.bucket_id
;
44 record
.x_meta_map
= s
->info
.x_meta_map
;
46 record
.tags
= s
->tagset
.get_tags();
47 // opaque data will be filled from topic configuration
50 bool match(const rgw_pubsub_topic_filter
& filter
, const req_state
* s
, EventType event
) {
51 if (!::match(filter
.events
, event
)) {
54 if (!::match(filter
.s3_filter
.key_filter
, s
->object
.name
)) {
57 if (!::match(filter
.s3_filter
.metadata_filter
, s
->info
.x_meta_map
)) {
60 if (!::match(filter
.s3_filter
.tag_filter
, s
->tagset
.get_tags())) {
66 int publish(const req_state
* s
,
67 const rgw_obj_key
& key
,
69 const ceph::real_time
& mtime
,
70 const std::string
& etag
,
72 rgw::sal::RGWRadosStore
* store
) {
73 RGWUserPubSub
ps_user(store
, s
->user
->get_id());
74 RGWUserPubSub::Bucket
ps_bucket(&ps_user
, s
->bucket
);
75 rgw_pubsub_bucket_topics bucket_topics
;
76 auto rc
= ps_bucket
.get_topics(&bucket_topics
);
78 // failed to fetch bucket topics
81 rgw_pubsub_s3_record record
;
82 populate_record_from_request(s
, key
, size
, mtime
, etag
, event_type
, record
);
83 bool event_handled
= false;
84 bool event_should_be_handled
= false;
85 for (const auto& bucket_topic
: bucket_topics
.topics
) {
86 const rgw_pubsub_topic_filter
& topic_filter
= bucket_topic
.second
;
87 const rgw_pubsub_topic
& topic_cfg
= topic_filter
.topic
;
88 if (!match(topic_filter
, s
, event_type
)) {
89 // topic does not apply to req_state
92 event_should_be_handled
= true;
93 record
.configurationId
= topic_filter
.s3_id
;
94 record
.opaque_data
= topic_cfg
.opaque_data
;
95 ldout(s
->cct
, 20) << "notification: '" << topic_filter
.s3_id
<<
96 "' on topic: '" << topic_cfg
.dest
.arn_topic
<<
97 "' and bucket: '" << s
->bucket
.name
<<
98 "' (unique topic: '" << topic_cfg
.name
<<
99 "') apply to event of type: '" << to_string(event_type
) << "'" << dendl
;
101 // TODO add endpoint LRU cache
102 const auto push_endpoint
= RGWPubSubEndpoint::create(topic_cfg
.dest
.push_endpoint
,
103 topic_cfg
.dest
.arn_topic
,
104 RGWHTTPArgs(topic_cfg
.dest
.push_endpoint_args
),
106 const std::string push_endpoint_str
= push_endpoint
->to_str();
107 ldout(s
->cct
, 20) << "push endpoint created: " << push_endpoint_str
<< dendl
;
108 auto rc
= push_endpoint
->send_to_completion_async(s
->cct
, record
, s
->yield
);
110 // bail out on first error
111 // TODO: add conf for bail out policy
112 ldout(s
->cct
, 1) << "push to endpoint " << push_endpoint_str
<< " failed, with error: " << rc
<< dendl
;
113 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
116 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_ok
);
117 ldout(s
->cct
, 20) << "successfull push to endpoint " << push_endpoint_str
<< dendl
;
118 event_handled
= true;
119 } catch (const RGWPubSubEndpoint::configuration_error
& e
) {
120 ldout(s
->cct
, 1) << "ERROR: failed to create push endpoint: "
121 << topic_cfg
.dest
.push_endpoint
<< " due to: " << e
.what() << dendl
;
122 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
127 if (event_should_be_handled
) {
128 // not counting events with no notifications or events that are filtered
129 // counting a single event, regardless of the number of notifications it sends
130 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_event_triggered
);
131 if (!event_handled
) {
132 // all notifications for this event failed
133 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_event_lost
);