]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/cpp/src/plasma/plasma.h
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / plasma / plasma.h
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 #pragma once
19
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <stdbool.h>
23 #include <stddef.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <unistd.h> // pid_t
28
29 #include <memory>
30 #include <string>
31 #include <unordered_map>
32 #include <unordered_set>
33 #include <vector>
34
35 #include "plasma/compat.h"
36
37 #include "arrow/status.h"
38 #include "arrow/util/logging.h"
39 #include "arrow/util/macros.h"
40 #include "plasma/common.h"
41
42 #ifdef PLASMA_CUDA
43 using arrow::cuda::CudaIpcMemHandle;
44 #endif
45
46 namespace plasma {
47
48 namespace flatbuf {
49 struct ObjectInfoT;
50 } // namespace flatbuf
51
52 #define HANDLE_SIGPIPE(s, fd_) \
53 do { \
54 Status _s = (s); \
55 if (!_s.ok()) { \
56 if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) { \
57 ARROW_LOG(WARNING) \
58 << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " \
59 "sending a message to client on fd " \
60 << fd_ \
61 << ". " \
62 "The client on the other end may have hung up."; \
63 } else { \
64 return _s; \
65 } \
66 } \
67 } while (0);
68
69 /// Allocation granularity used in plasma for object allocation.
70 constexpr int64_t kBlockSize = 64;
71
72 /// Contains all information that is associated with a Plasma store client.
73 struct Client {
74 explicit Client(int fd);
75
76 /// The file descriptor used to communicate with the client.
77 int fd;
78
79 /// Object ids that are used by this client.
80 std::unordered_set<ObjectID> object_ids;
81
82 /// File descriptors that are used by this client.
83 std::unordered_set<int> used_fds;
84
85 /// The file descriptor used to push notifications to client. This is only valid
86 /// if client subscribes to plasma store. -1 indicates invalid.
87 int notification_fd;
88
89 std::string name = "anonymous_client";
90 };
91
92 // TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
93 struct PlasmaObject {
94 #ifdef PLASMA_CUDA
95 // IPC handle for Cuda.
96 std::shared_ptr<CudaIpcMemHandle> ipc_handle;
97 #endif
98 /// The file descriptor of the memory mapped file in the store. It is used as
99 /// a unique identifier of the file in the client to look up the corresponding
100 /// file descriptor on the client's side.
101 int store_fd;
102 /// The offset in bytes in the memory mapped file of the data.
103 ptrdiff_t data_offset;
104 /// The offset in bytes in the memory mapped file of the metadata.
105 ptrdiff_t metadata_offset;
106 /// The size in bytes of the data.
107 int64_t data_size;
108 /// The size in bytes of the metadata.
109 int64_t metadata_size;
110 /// Device number object is on.
111 int device_num;
112
113 bool operator==(const PlasmaObject& other) const {
114 return (
115 #ifdef PLASMA_CUDA
116 (ipc_handle == other.ipc_handle) &&
117 #endif
118 (store_fd == other.store_fd) && (data_offset == other.data_offset) &&
119 (metadata_offset == other.metadata_offset) && (data_size == other.data_size) &&
120 (metadata_size == other.metadata_size) && (device_num == other.device_num));
121 }
122 };
123
124 enum class ObjectStatus : int {
125 /// The object was not found.
126 OBJECT_NOT_FOUND = 0,
127 /// The object was found.
128 OBJECT_FOUND = 1
129 };
130
131 /// The plasma store information that is exposed to the eviction policy.
132 struct PlasmaStoreInfo {
133 /// Objects that are in the Plasma store.
134 ObjectTable objects;
135 /// Boolean flag indicating whether to start the object store with hugepages
136 /// support enabled. Huge pages are substantially larger than normal memory
137 /// pages (e.g. 2MB or 1GB instead of 4KB) and using them can reduce
138 /// bookkeeping overhead from the OS.
139 bool hugepages_enabled;
140 /// A (platform-dependent) directory where to create the memory-backed file.
141 std::string directory;
142 };
143
144 /// Get an entry from the object table and return NULL if the object_id
145 /// is not present.
146 ///
147 /// \param store_info The PlasmaStoreInfo that contains the object table.
148 /// \param object_id The object_id of the entry we are looking for.
149 /// \return The entry associated with the object_id or NULL if the object_id
150 /// is not present.
151 ObjectTableEntry* GetObjectTableEntry(PlasmaStoreInfo* store_info,
152 const ObjectID& object_id);
153
154 /// Print a warning if the status is less than zero. This should be used to check
155 /// the success of messages sent to plasma clients. We print a warning instead of
156 /// failing because the plasma clients are allowed to die. This is used to handle
157 /// situations where the store writes to a client file descriptor, and the client
158 /// may already have disconnected. If we have processed the disconnection and
159 /// closed the file descriptor, we should get a BAD FILE DESCRIPTOR error. If we
160 /// have not, then we should get a SIGPIPE. If we write to a TCP socket that
161 /// isn't connected yet, then we should get an ECONNRESET.
162 ///
163 /// \param status The status to check. If it is less less than zero, we will
164 /// print a warning.
165 /// \param client_sock The client socket. This is just used to print some extra
166 /// information.
167 /// \return The errno set.
168 int WarnIfSigpipe(int status, int client_sock);
169
170 std::unique_ptr<uint8_t[]> CreateObjectInfoBuffer(flatbuf::ObjectInfoT* object_info);
171
172 std::unique_ptr<uint8_t[]> CreatePlasmaNotificationBuffer(
173 std::vector<flatbuf::ObjectInfoT>& object_info);
174
175 } // namespace plasma