]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_notify.cc
import ceph 14.2.5
[ceph.git] / ceph / src / rgw / rgw_notify.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_notify.h"
5 #include <memory>
6 #include <boost/algorithm/hex.hpp>
7 #include "rgw_pubsub.h"
8 #include "rgw_pubsub_push.h"
9 #include "rgw_perf_counters.h"
10 #include "common/dout.h"
11
12 #define dout_subsys ceph_subsys_rgw
13
14 namespace rgw::notify {
15
16 // populate record from request
17 void populate_record_from_request(const req_state *s,
18 const ceph::real_time& mtime,
19 const std::string& etag,
20 EventType event_type,
21 rgw_pubsub_s3_record& record) {
22 record.eventVersion = "2.1";
23 record.eventSource = "aws:s3";
24 record.eventTime = mtime;
25 record.eventName = to_string(event_type);
26 record.userIdentity = s->user->user_id.id; // user that triggered the change
27 record.sourceIPAddress = ""; // IP address of client that triggered the change: TODO
28 record.x_amz_request_id = s->req_id; // request ID of the original change
29 record.x_amz_id_2 = s->host_id; // RGW on which the change was made
30 record.s3SchemaVersion = "1.0";
31 // configurationId is filled from subscription configuration
32 record.bucket_name = s->bucket_name;
33 record.bucket_ownerIdentity = s->bucket_owner.get_id().id;
34 record.bucket_arn = to_string(rgw::ARN(s->bucket));
35 record.object_key = s->object.name;
36 record.object_size = s->obj_size;
37 record.object_etag = etag;
38 record.object_versionId = s->object.instance;
39 // use timestamp as per key sequence id (hex encoded)
40 const utime_t ts(real_clock::now());
41 boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
42 std::back_inserter(record.object_sequencer));
43 // event ID is rgw extension (not in the S3 spec), used for acking the event
44 // same format is used in both S3 compliant and Ceph specific events
45 // not used in case of push-only mode
46 record.id = "";
47 record.bucket_id = s->bucket.bucket_id;
48 // pass meta data
49 record.x_meta_map = s->info.x_meta_map;
50 }
51
52 bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event) {
53 if (!::match(filter.events, event)) {
54 return false;
55 }
56 if (!::match(filter.s3_filter.key_filter, s->object.name)) {
57 return false;
58 }
59 if (!::match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
60 return false;
61 }
62 return true;
63 }
64
65 int publish(const req_state* s,
66 const ceph::real_time& mtime,
67 const std::string& etag,
68 EventType event_type,
69 RGWRados* store) {
70 RGWUserPubSub ps_user(store, s->user->user_id);
71 RGWUserPubSub::Bucket ps_bucket(&ps_user, s->bucket);
72 rgw_pubsub_bucket_topics bucket_topics;
73 auto rc = ps_bucket.get_topics(&bucket_topics);
74 if (rc < 0) {
75 // failed to fetch bucket topics
76 return rc;
77 }
78 rgw_pubsub_s3_record record;
79 populate_record_from_request(s, mtime, etag, event_type, record);
80 bool event_handled = false;
81 bool event_should_be_handled = false;
82 for (const auto& bucket_topic : bucket_topics.topics) {
83 const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
84 const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
85 if (!match(topic_filter, s, event_type)) {
86 // topic does not apply to req_state
87 continue;
88 }
89 event_should_be_handled = true;
90 record.configurationId = topic_filter.s3_id;
91 ldout(s->cct, 20) << "notification: '" << topic_filter.s3_id <<
92 "' on topic: '" << topic_cfg.dest.arn_topic <<
93 "' and bucket: '" << s->bucket.name <<
94 "' (unique topic: '" << topic_cfg.name <<
95 "') apply to event of type: '" << to_string(event_type) << "'" << dendl;
96 try {
97 // TODO add endpoint LRU cache
98 const auto push_endpoint = RGWPubSubEndpoint::create(topic_cfg.dest.push_endpoint,
99 topic_cfg.dest.arn_topic,
100 RGWHTTPArgs(topic_cfg.dest.push_endpoint_args),
101 s->cct);
102 const std::string push_endpoint_str = push_endpoint->to_str();
103 ldout(s->cct, 20) << "push endpoint created: " << push_endpoint_str << dendl;
104 auto rc = push_endpoint->send_to_completion_async(s->cct, record, s->yield);
105 if (rc < 0) {
106 // bail out on first error
107 // TODO: add conf for bail out policy
108 ldout(s->cct, 1) << "push to endpoint " << push_endpoint_str << " failed, with error: " << rc << dendl;
109 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
110 return rc;
111 }
112 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
113 ldout(s->cct, 20) << "successfull push to endpoint " << push_endpoint_str << dendl;
114 event_handled = true;
115 } catch (const RGWPubSubEndpoint::configuration_error& e) {
116 ldout(s->cct, 1) << "ERROR: failed to create push endpoint: "
117 << topic_cfg.dest.push_endpoint << " due to: " << e.what() << dendl;
118 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
119 return -EINVAL;
120 }
121 }
122
123 if (event_should_be_handled) {
124 // not counting events with no notifications or events that are filtered
125 // counting a single event, regardless of the number of notifications it sends
126 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered);
127 if (!event_handled) {
128 // all notifications for this event failed
129 if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost);
130 }
131 }
132
133 return 0;
134 }
135
136 }
137