]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | local json = require ("lunajson") |
2 | local nats = require ("nats") | |
3 | ||
4 | function nats_connect(nats_host, nats_port) | |
5 | local nats_params = { | |
6 | host = nats_host, | |
7 | port = nats_port, | |
8 | } | |
9 | client = nats.connect(nats_params) | |
10 | client:connect() | |
11 | end | |
12 | ||
13 | function toJson(request, eventName, opaqueData, configure) | |
14 | supported_event = true | |
15 | local notification = { | |
16 | ["Records"] = { | |
17 | ["eventVersion"] = "2.1", | |
18 | ["eventSource"] = "ceph:s3", | |
19 | ["awsRegion"] = request.ZoneGroup.Name, | |
20 | ["eventTime"] = request.Time, | |
21 | ["eventName"] = eventName, | |
22 | ["userIdentity"] = { | |
23 | ["principalId"] = request.User.Id | |
24 | }, | |
25 | ["requestParameters"] = { | |
26 | ["sourceIPAddress"] = "" | |
27 | }, | |
28 | ["responseElements"] = { | |
29 | ["x-amz-request-id"] = request.Id, | |
30 | ["x-amz-id-2"] = request.RGWId | |
31 | }, | |
32 | ["s3"] = { | |
33 | ["s3SchemaVersion"] = "1.0", | |
34 | ["configurationId"] = configure, | |
35 | ["bucket"] = { | |
36 | ["name"] = request.Bucket.Name, | |
37 | ["ownerIdentity"] = { | |
38 | ["principalId"] = request.Bucket.User.Id | |
39 | }, | |
40 | ["arn"] = "arn:aws:s3:" .. request.ZoneGroup.Name .. "::" .. request.Bucket.Name, | |
41 | ["id"] = request.Bucket.Id | |
42 | }, | |
43 | ["object"] = { | |
44 | ["key"] = request.Object.Name, | |
45 | ["size"] = request.Object.Size, | |
46 | ["eTag"] = "", -- eTag is not supported yet | |
47 | ["versionId"] = request.Object.Instance, | |
48 | ["sequencer"] = string.format("%x", os.time()), | |
49 | ["metadata"] = { | |
50 | json.encode(request.HTTP.Metadata) | |
51 | }, | |
52 | ["tags"] = { | |
53 | json.encode(request.Tags) | |
54 | } | |
55 | } | |
56 | }, | |
57 | ["eventId"] = "", | |
58 | ["opaqueData"] = opaqueData | |
59 | } | |
60 | } | |
61 | return notification | |
62 | end | |
63 | ||
64 | supported_event = false | |
65 | configure = "mynotif1" | |
66 | opaqueData = "me@example.com" | |
67 | topic = "Bucket_Notification" | |
68 | bucket_name = "mybucket" | |
69 | nats_host = '0.0.0.0' | |
70 | nats_port = 4222 | |
71 | ||
72 | if bucket_name == Request.Bucket.Name then | |
73 | --Object Created | |
74 | if Request.RGWOp == "put_obj" then | |
75 | notification = toJson(Request ,'ObjectCreated:Put', opaqueData, configure) | |
76 | elseif Request.RGWOp == "post_obj" then | |
77 | notification = toJson(Request ,'ObjectCreated:Post', opaqueData, configure) | |
78 | ||
79 | elseif Request.RGWOp == "copy_obj" then | |
80 | notification = toJson(Request ,'ObjectCreated:Copy', opaqueData, configure) | |
81 | ||
82 | --Object Removed | |
83 | elseif Request.RGWOp == "delete_obj" then | |
84 | notification = toJson(Request ,'ObjectRemoved:Delete', opaqueData, configure) | |
85 | end | |
86 | ||
87 | if supported_event == true then | |
88 | nats_connect() | |
89 | local payload = json.encode(notification) | |
90 | client:publish(topic, payload) | |
91 | RGWDebugLog("bucket notification sent to nats://" .. nats_host .. ":" .. nats_port .. "/" .. topic) | |
92 | end | |
93 | end |