]>
git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/plasma/plasma.cc
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
9 // http://www.apache.org/licenses/LICENSE-2.0
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
18 #include "plasma/plasma.h"
20 #include <sys/socket.h>
21 #include <sys/types.h>
24 #include "plasma/common.h"
25 #include "plasma/common_generated.h"
26 #include "plasma/protocol.h"
28 namespace fb
= plasma::flatbuf
;
32 ObjectTableEntry::ObjectTableEntry() : pointer(nullptr), ref_count(0) {}
34 ObjectTableEntry::~ObjectTableEntry() { pointer
= nullptr; }
36 int WarnIfSigpipe(int status
, int client_sock
) {
40 if (errno
== EPIPE
|| errno
== EBADF
|| errno
== ECONNRESET
) {
41 ARROW_LOG(WARNING
) << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when "
42 "sending a message to client on fd "
44 << ". The client on the other end may "
48 ARROW_LOG(FATAL
) << "Failed to write message to client on fd " << client_sock
<< ".";
49 return -1; // This is never reached.
53 * This will create a new ObjectInfo buffer. The first sizeof(int64_t) bytes
54 * of this buffer are the length of the remaining message and the
55 * remaining message is a serialized version of the object info.
57 * \param object_info The object info to be serialized
58 * \return The object info buffer. It is the caller's responsibility to free
59 * this buffer with "delete" after it has been used.
61 std::unique_ptr
<uint8_t[]> CreateObjectInfoBuffer(fb::ObjectInfoT
* object_info
) {
62 flatbuffers::FlatBufferBuilder fbb
;
63 auto message
= fb::CreateObjectInfo(fbb
, object_info
);
66 std::unique_ptr
<uint8_t[]>(new uint8_t[sizeof(int64_t) + fbb
.GetSize()]);
67 *(reinterpret_cast<int64_t*>(notification
.get())) = fbb
.GetSize();
68 memcpy(notification
.get() + sizeof(int64_t), fbb
.GetBufferPointer(), fbb
.GetSize());
72 std::unique_ptr
<uint8_t[]> CreatePlasmaNotificationBuffer(
73 std::vector
<fb::ObjectInfoT
>& object_info
) {
74 flatbuffers::FlatBufferBuilder fbb
;
75 std::vector
<flatbuffers::Offset
<plasma::flatbuf::ObjectInfo
>> info
;
76 for (size_t i
= 0; i
< object_info
.size(); ++i
) {
77 info
.push_back(fb::CreateObjectInfo(fbb
, &object_info
[i
]));
80 auto info_array
= fbb
.CreateVector(info
);
81 auto message
= fb::CreatePlasmaNotification(fbb
, info_array
);
84 std::unique_ptr
<uint8_t[]>(new uint8_t[sizeof(int64_t) + fbb
.GetSize()]);
85 *(reinterpret_cast<int64_t*>(notification
.get())) = fbb
.GetSize();
86 memcpy(notification
.get() + sizeof(int64_t), fbb
.GetBufferPointer(), fbb
.GetSize());
90 ObjectTableEntry
* GetObjectTableEntry(PlasmaStoreInfo
* store_info
,
91 const ObjectID
& object_id
) {
92 auto it
= store_info
->objects
.find(object_id
);
93 if (it
== store_info
->objects
.end()) {
96 return it
->second
.get();