]>
Commit | Line | Data |
---|---|---|
eafe8130 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "rgw_notify.h" | |
5 | #include <memory> | |
6 | #include <boost/algorithm/hex.hpp> | |
7 | #include "rgw_pubsub.h" | |
8 | #include "rgw_pubsub_push.h" | |
9 | #include "rgw_perf_counters.h" | |
10 | #include "common/dout.h" | |
11 | ||
12 | #define dout_subsys ceph_subsys_rgw | |
13 | ||
14 | namespace rgw::notify { | |
15 | ||
16 | // populate record from request | |
17 | void populate_record_from_request(const req_state *s, | |
18 | const ceph::real_time& mtime, | |
19 | const std::string& etag, | |
20 | EventType event_type, | |
21 | rgw_pubsub_s3_record& record) { | |
22 | record.eventVersion = "2.1"; | |
23 | record.eventSource = "aws:s3"; | |
24 | record.eventTime = mtime; | |
25 | record.eventName = to_string(event_type); | |
26 | record.userIdentity = s->user->user_id.id; // user that triggered the change | |
27 | record.sourceIPAddress = ""; // IP address of client that triggered the change: TODO | |
28 | record.x_amz_request_id = s->req_id; // request ID of the original change | |
29 | record.x_amz_id_2 = s->host_id; // RGW on which the change was made | |
30 | record.s3SchemaVersion = "1.0"; | |
31 | // configurationId is filled from subscription configuration | |
32 | record.bucket_name = s->bucket_name; | |
33 | record.bucket_ownerIdentity = s->bucket_owner.get_id().id; | |
34 | record.bucket_arn = to_string(rgw::ARN(s->bucket)); | |
35 | record.object_key = s->object.name; | |
36 | record.object_size = s->obj_size; | |
37 | record.object_etag = etag; | |
38 | record.object_versionId = s->object.instance; | |
39 | // use timestamp as per key sequence id (hex encoded) | |
40 | const utime_t ts(real_clock::now()); | |
41 | boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), | |
42 | std::back_inserter(record.object_sequencer)); | |
43 | // event ID is rgw extension (not in the S3 spec), used for acking the event | |
44 | // same format is used in both S3 compliant and Ceph specific events | |
45 | // not used in case of push-only mode | |
46 | record.id = ""; | |
47 | record.bucket_id = s->bucket.bucket_id; | |
48 | // pass meta data | |
49 | record.x_meta_map = s->info.x_meta_map; | |
50 | } | |
51 | ||
52 | bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, EventType event) { | |
53 | if (!::match(filter.events, event)) { | |
54 | return false; | |
55 | } | |
56 | if (!::match(filter.s3_filter.key_filter, s->object.name)) { | |
57 | return false; | |
58 | } | |
59 | if (!::match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) { | |
60 | return false; | |
61 | } | |
62 | return true; | |
63 | } | |
64 | ||
65 | int publish(const req_state* s, | |
66 | const ceph::real_time& mtime, | |
67 | const std::string& etag, | |
68 | EventType event_type, | |
69 | RGWRados* store) { | |
70 | RGWUserPubSub ps_user(store, s->user->user_id); | |
71 | RGWUserPubSub::Bucket ps_bucket(&ps_user, s->bucket); | |
72 | rgw_pubsub_bucket_topics bucket_topics; | |
73 | auto rc = ps_bucket.get_topics(&bucket_topics); | |
74 | if (rc < 0) { | |
75 | // failed to fetch bucket topics | |
76 | return rc; | |
77 | } | |
78 | rgw_pubsub_s3_record record; | |
79 | populate_record_from_request(s, mtime, etag, event_type, record); | |
80 | bool event_handled = false; | |
81 | bool event_should_be_handled = false; | |
82 | for (const auto& bucket_topic : bucket_topics.topics) { | |
83 | const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second; | |
84 | const rgw_pubsub_topic& topic_cfg = topic_filter.topic; | |
85 | if (!match(topic_filter, s, event_type)) { | |
86 | // topic does not apply to req_state | |
87 | continue; | |
88 | } | |
89 | event_should_be_handled = true; | |
90 | record.configurationId = topic_filter.s3_id; | |
91 | ldout(s->cct, 20) << "notification: '" << topic_filter.s3_id << | |
92 | "' on topic: '" << topic_cfg.dest.arn_topic << | |
93 | "' and bucket: '" << s->bucket.name << | |
94 | "' (unique topic: '" << topic_cfg.name << | |
95 | "') apply to event of type: '" << to_string(event_type) << "'" << dendl; | |
96 | try { | |
97 | // TODO add endpoint LRU cache | |
98 | const auto push_endpoint = RGWPubSubEndpoint::create(topic_cfg.dest.push_endpoint, | |
99 | topic_cfg.dest.arn_topic, | |
100 | RGWHTTPArgs(topic_cfg.dest.push_endpoint_args), | |
101 | s->cct); | |
102 | const std::string push_endpoint_str = push_endpoint->to_str(); | |
103 | ldout(s->cct, 20) << "push endpoint created: " << push_endpoint_str << dendl; | |
104 | auto rc = push_endpoint->send_to_completion_async(s->cct, record, s->yield); | |
105 | if (rc < 0) { | |
106 | // bail out on first error | |
107 | // TODO: add conf for bail out policy | |
108 | ldout(s->cct, 1) << "push to endpoint " << push_endpoint_str << " failed, with error: " << rc << dendl; | |
109 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); | |
110 | return rc; | |
111 | } | |
112 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok); | |
113 | ldout(s->cct, 20) << "successfull push to endpoint " << push_endpoint_str << dendl; | |
114 | event_handled = true; | |
115 | } catch (const RGWPubSubEndpoint::configuration_error& e) { | |
116 | ldout(s->cct, 1) << "ERROR: failed to create push endpoint: " | |
117 | << topic_cfg.dest.push_endpoint << " due to: " << e.what() << dendl; | |
118 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); | |
119 | return -EINVAL; | |
120 | } | |
121 | } | |
122 | ||
123 | if (event_should_be_handled) { | |
124 | // not counting events with no notifications or events that are filtered | |
125 | // counting a single event, regardless of the number of notifications it sends | |
126 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_triggered); | |
127 | if (!event_handled) { | |
128 | // all notifications for this event failed | |
129 | if (perfcounter) perfcounter->inc(l_rgw_pubsub_event_lost); | |
130 | } | |
131 | } | |
132 | ||
133 | return 0; | |
134 | } | |
135 | ||
136 | } | |
137 |