]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/plasma/plasma.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / plasma / plasma.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #include "plasma/plasma.h"
19
20 #include <sys/socket.h>
21 #include <sys/types.h>
22 #include <unistd.h>
23
24 #include "plasma/common.h"
25 #include "plasma/common_generated.h"
26 #include "plasma/protocol.h"
27
28 namespace fb = plasma::flatbuf;
29
30 namespace plasma {
31
32 ObjectTableEntry::ObjectTableEntry() : pointer(nullptr), ref_count(0) {}
33
34 ObjectTableEntry::~ObjectTableEntry() { pointer = nullptr; }
35
36 int WarnIfSigpipe(int status, int client_sock) {
37 if (status >= 0) {
38 return 0;
39 }
40 if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {
41 ARROW_LOG(WARNING) << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when "
42 "sending a message to client on fd "
43 << client_sock
44 << ". The client on the other end may "
45 "have hung up.";
46 return errno;
47 }
48 ARROW_LOG(FATAL) << "Failed to write message to client on fd " << client_sock << ".";
49 return -1; // This is never reached.
50 }
51
52 /**
53 * This will create a new ObjectInfo buffer. The first sizeof(int64_t) bytes
54 * of this buffer are the length of the remaining message and the
55 * remaining message is a serialized version of the object info.
56 *
57 * \param object_info The object info to be serialized
58 * \return The object info buffer. It is the caller's responsibility to free
59 * this buffer with "delete" after it has been used.
60 */
61 std::unique_ptr<uint8_t[]> CreateObjectInfoBuffer(fb::ObjectInfoT* object_info) {
62 flatbuffers::FlatBufferBuilder fbb;
63 auto message = fb::CreateObjectInfo(fbb, object_info);
64 fbb.Finish(message);
65 auto notification =
66 std::unique_ptr<uint8_t[]>(new uint8_t[sizeof(int64_t) + fbb.GetSize()]);
67 *(reinterpret_cast<int64_t*>(notification.get())) = fbb.GetSize();
68 memcpy(notification.get() + sizeof(int64_t), fbb.GetBufferPointer(), fbb.GetSize());
69 return notification;
70 }
71
72 std::unique_ptr<uint8_t[]> CreatePlasmaNotificationBuffer(
73 std::vector<fb::ObjectInfoT>& object_info) {
74 flatbuffers::FlatBufferBuilder fbb;
75 std::vector<flatbuffers::Offset<plasma::flatbuf::ObjectInfo>> info;
76 for (size_t i = 0; i < object_info.size(); ++i) {
77 info.push_back(fb::CreateObjectInfo(fbb, &object_info[i]));
78 }
79
80 auto info_array = fbb.CreateVector(info);
81 auto message = fb::CreatePlasmaNotification(fbb, info_array);
82 fbb.Finish(message);
83 auto notification =
84 std::unique_ptr<uint8_t[]>(new uint8_t[sizeof(int64_t) + fbb.GetSize()]);
85 *(reinterpret_cast<int64_t*>(notification.get())) = fbb.GetSize();
86 memcpy(notification.get() + sizeof(int64_t), fbb.GetBufferPointer(), fbb.GetSize());
87 return notification;
88 }
89
90 ObjectTableEntry* GetObjectTableEntry(PlasmaStoreInfo* store_info,
91 const ObjectID& object_id) {
92 auto it = store_info->objects.find(object_id);
93 if (it == store_info->objects.end()) {
94 return NULL;
95 }
96 return it->second.get();
97 }
98
99 } // namespace plasma