1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "rgw_notify.h"
6 #include <boost/algorithm/hex.hpp>
7 #include "rgw_pubsub.h"
8 #include "rgw_pubsub_push.h"
9 #include "rgw_perf_counters.h"
10 #include "common/dout.h"
12 #define dout_subsys ceph_subsys_rgw
14 namespace rgw::notify
{
16 // populate record from request
17 void populate_record_from_request(const req_state
*s
,
18 const ceph::real_time
& mtime
,
19 const std::string
& etag
,
21 rgw_pubsub_s3_record
& record
) {
22 record
.eventVersion
= "2.1";
23 record
.eventSource
= "aws:s3";
24 record
.eventTime
= mtime
;
25 record
.eventName
= to_string(event_type
);
26 record
.userIdentity
= s
->user
->user_id
.id
; // user that triggered the change
27 record
.sourceIPAddress
= ""; // IP address of client that triggered the change: TODO
28 record
.x_amz_request_id
= s
->req_id
; // request ID of the original change
29 record
.x_amz_id_2
= s
->host_id
; // RGW on which the change was made
30 record
.s3SchemaVersion
= "1.0";
31 // configurationId is filled from subscription configuration
32 record
.bucket_name
= s
->bucket_name
;
33 record
.bucket_ownerIdentity
= s
->bucket_owner
.get_id().id
;
34 record
.bucket_arn
= to_string(rgw::ARN(s
->bucket
));
35 record
.object_key
= s
->object
.name
;
36 record
.object_size
= s
->obj_size
;
37 record
.object_etag
= etag
;
38 record
.object_versionId
= s
->object
.instance
;
39 // use timestamp as per key sequence id (hex encoded)
40 const utime_t
ts(real_clock::now());
41 boost::algorithm::hex((const char*)&ts
, (const char*)&ts
+ sizeof(utime_t
),
42 std::back_inserter(record
.object_sequencer
));
43 // event ID is rgw extension (not in the S3 spec), used for acking the event
44 // same format is used in both S3 compliant and Ceph specific events
45 // not used in case of push-only mode
47 record
.bucket_id
= s
->bucket
.bucket_id
;
49 record
.x_meta_map
= s
->info
.x_meta_map
;
52 bool match(const rgw_pubsub_topic_filter
& filter
, const req_state
* s
, EventType event
) {
53 if (!::match(filter
.events
, event
)) {
56 if (!::match(filter
.s3_filter
.key_filter
, s
->object
.name
)) {
59 if (!::match(filter
.s3_filter
.metadata_filter
, s
->info
.x_meta_map
)) {
65 int publish(const req_state
* s
,
66 const ceph::real_time
& mtime
,
67 const std::string
& etag
,
70 RGWUserPubSub
ps_user(store
, s
->user
->user_id
);
71 RGWUserPubSub::Bucket
ps_bucket(&ps_user
, s
->bucket
);
72 rgw_pubsub_bucket_topics bucket_topics
;
73 auto rc
= ps_bucket
.get_topics(&bucket_topics
);
75 // failed to fetch bucket topics
78 rgw_pubsub_s3_record record
;
79 populate_record_from_request(s
, mtime
, etag
, event_type
, record
);
80 bool event_handled
= false;
81 bool event_should_be_handled
= false;
82 for (const auto& bucket_topic
: bucket_topics
.topics
) {
83 const rgw_pubsub_topic_filter
& topic_filter
= bucket_topic
.second
;
84 const rgw_pubsub_topic
& topic_cfg
= topic_filter
.topic
;
85 if (!match(topic_filter
, s
, event_type
)) {
86 // topic does not apply to req_state
89 event_should_be_handled
= true;
90 record
.configurationId
= topic_filter
.s3_id
;
91 ldout(s
->cct
, 20) << "notification: '" << topic_filter
.s3_id
<<
92 "' on topic: '" << topic_cfg
.dest
.arn_topic
<<
93 "' and bucket: '" << s
->bucket
.name
<<
94 "' (unique topic: '" << topic_cfg
.name
<<
95 "') apply to event of type: '" << to_string(event_type
) << "'" << dendl
;
97 // TODO add endpoint LRU cache
98 const auto push_endpoint
= RGWPubSubEndpoint::create(topic_cfg
.dest
.push_endpoint
,
99 topic_cfg
.dest
.arn_topic
,
100 RGWHTTPArgs(topic_cfg
.dest
.push_endpoint_args
),
102 const std::string push_endpoint_str
= push_endpoint
->to_str();
103 ldout(s
->cct
, 20) << "push endpoint created: " << push_endpoint_str
<< dendl
;
104 auto rc
= push_endpoint
->send_to_completion_async(s
->cct
, record
, s
->yield
);
106 // bail out on first error
107 // TODO: add conf for bail out policy
108 ldout(s
->cct
, 1) << "push to endpoint " << push_endpoint_str
<< " failed, with error: " << rc
<< dendl
;
109 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
112 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_ok
);
113 ldout(s
->cct
, 20) << "successfull push to endpoint " << push_endpoint_str
<< dendl
;
114 event_handled
= true;
115 } catch (const RGWPubSubEndpoint::configuration_error
& e
) {
116 ldout(s
->cct
, 1) << "ERROR: failed to create push endpoint: "
117 << topic_cfg
.dest
.push_endpoint
<< " due to: " << e
.what() << dendl
;
118 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_push_failed
);
123 if (event_should_be_handled
) {
124 // not counting events with no notifications or events that are filtered
125 // counting a single event, regardless of the number of notifications it sends
126 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_event_triggered
);
127 if (!event_handled
) {
128 // all notifications for this event failed
129 if (perfcounter
) perfcounter
->inc(l_rgw_pubsub_event_lost
);