]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/cpp/src/plasma/store.cc
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / cpp / src / plasma / store.cc
diff --git a/ceph/src/arrow/cpp/src/plasma/store.cc b/ceph/src/arrow/cpp/src/plasma/store.cc
new file mode 100644 (file)
index 0000000..032a12f
--- /dev/null
@@ -0,0 +1,1353 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// PLASMA STORE: This is a simple object store server process
+//
+// It accepts incoming client connections on a unix domain socket
+// (name passed in via the -s option of the executable) and uses a
+// single thread to serve the clients. Each client establishes a
+// connection and can create objects, wait for objects and seal
+// objects through that connection.
+//
+// It keeps a hash table that maps object_ids (which are 20 byte long,
+// just enough to store and SHA1 hash) to memory mapped files.
+
+#include "plasma/store.h"
+
+#include <assert.h>
+#include <fcntl.h>
+#include <getopt.h>
+#include <limits.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <sys/statvfs.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include <ctime>
+#include <deque>
+#include <iostream>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include <gflags/gflags.h>
+
+#include "arrow/status.h"
+#include "arrow/util/config.h"
+
+#include "plasma/common.h"
+#include "plasma/common_generated.h"
+#include "plasma/fling.h"
+#include "plasma/io.h"
+#include "plasma/malloc.h"
+#include "plasma/plasma_allocator.h"
+#include "plasma/protocol.h"
+
+#ifdef PLASMA_CUDA
+#include "arrow/gpu/cuda_api.h"
+
+using arrow::cuda::CudaBuffer;
+using arrow::cuda::CudaContext;
+using arrow::cuda::CudaDeviceManager;
+#endif
+
+using arrow::util::ArrowLog;
+using arrow::util::ArrowLogLevel;
+
+namespace fb = plasma::flatbuf;
+
+namespace plasma {
+
+void SetMallocGranularity(int value);
+
+struct GetRequest {
+  GetRequest(Client* client, const std::vector<ObjectID>& object_ids);
+  /// The client that called get.
+  Client* client;
+  /// The ID of the timer that will time out and cause this wait to return to
+  ///  the client if it hasn't already returned.
+  int64_t timer;
+  /// The object IDs involved in this request. This is used in the reply.
+  std::vector<ObjectID> object_ids;
+  /// The object information for the objects in this request. This is used in
+  /// the reply.
+  std::unordered_map<ObjectID, PlasmaObject> objects;
+  /// The minimum number of objects to wait for in this request.
+  int64_t num_objects_to_wait_for;
+  /// The number of object requests in this wait request that are already
+  /// satisfied.
+  int64_t num_satisfied;
+};
+
+GetRequest::GetRequest(Client* client, const std::vector<ObjectID>& object_ids)
+    : client(client),
+      timer(-1),
+      object_ids(object_ids.begin(), object_ids.end()),
+      objects(object_ids.size()),
+      num_satisfied(0) {
+  std::unordered_set<ObjectID> unique_ids(object_ids.begin(), object_ids.end());
+  num_objects_to_wait_for = unique_ids.size();
+}
+
+Client::Client(int fd) : fd(fd), notification_fd(-1) {}
+
+PlasmaStore::PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_enabled,
+                         const std::string& socket_name,
+                         std::shared_ptr<ExternalStore> external_store)
+    : loop_(loop),
+      eviction_policy_(&store_info_, PlasmaAllocator::GetFootprintLimit()),
+      external_store_(external_store) {
+  store_info_.directory = directory;
+  store_info_.hugepages_enabled = hugepages_enabled;
+}
+
+// TODO(pcm): Get rid of this destructor by using RAII to clean up data.
+PlasmaStore::~PlasmaStore() {}
+
+const PlasmaStoreInfo* PlasmaStore::GetPlasmaStoreInfo() { return &store_info_; }
+
+// If this client is not already using the object, add the client to the
+// object's list of clients, otherwise do nothing.
+void PlasmaStore::AddToClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry,
+                                       Client* client) {
+  // Check if this client is already using the object.
+  if (client->object_ids.find(object_id) != client->object_ids.end()) {
+    return;
+  }
+  // If there are no other clients using this object, notify the eviction policy
+  // that the object is being used.
+  if (entry->ref_count == 0) {
+    // Tell the eviction policy that this object is being used.
+    eviction_policy_.BeginObjectAccess(object_id);
+  }
+  // Increase reference count.
+  entry->ref_count++;
+
+  // Add object id to the list of object ids that this client is using.
+  client->object_ids.insert(object_id);
+}
+
+// Allocate memory
+uint8_t* PlasmaStore::AllocateMemory(size_t size, bool evict_if_full, int* fd,
+                                     int64_t* map_size, ptrdiff_t* offset, Client* client,
+                                     bool is_create) {
+  // First free up space from the client's LRU queue if quota enforcement is on.
+  if (evict_if_full) {
+    std::vector<ObjectID> client_objects_to_evict;
+    bool quota_ok = eviction_policy_.EnforcePerClientQuota(client, size, is_create,
+                                                           &client_objects_to_evict);
+    if (!quota_ok) {
+      return nullptr;
+    }
+    EvictObjects(client_objects_to_evict);
+  }
+
+  // Try to evict objects until there is enough space.
+  uint8_t* pointer = nullptr;
+  while (true) {
+    // Allocate space for the new object. We use memalign instead of malloc
+    // in order to align the allocated region to a 64-byte boundary. This is not
+    // strictly necessary, but it is an optimization that could speed up the
+    // computation of a hash of the data (see compute_object_hash_parallel in
+    // plasma_client.cc). Note that even though this pointer is 64-byte aligned,
+    // it is not guaranteed that the corresponding pointer in the client will be
+    // 64-byte aligned, but in practice it often will be.
+    pointer = reinterpret_cast<uint8_t*>(PlasmaAllocator::Memalign(kBlockSize, size));
+    if (pointer || !evict_if_full) {
+      // If we manage to allocate the memory, return the pointer. If we cannot
+      // allocate the space, but we are also not allowed to evict anything to
+      // make more space, return an error to the client.
+      break;
+    }
+    // Tell the eviction policy how much space we need to create this object.
+    std::vector<ObjectID> objects_to_evict;
+    bool success = eviction_policy_.RequireSpace(size, &objects_to_evict);
+    EvictObjects(objects_to_evict);
+    // Return an error to the client if not enough space could be freed to
+    // create the object.
+    if (!success) {
+      break;
+    }
+  }
+
+  if (pointer != nullptr) {
+    GetMallocMapinfo(pointer, fd, map_size, offset);
+    ARROW_CHECK(*fd != -1);
+  }
+  return pointer;
+}
+
+#ifdef PLASMA_CUDA
+arrow::Result<std::shared_ptr<CudaContext>> PlasmaStore::GetCudaContext(int device_num) {
+  DCHECK_NE(device_num, 0);
+  ARROW_ASSIGN_OR_RAISE(auto manager, CudaDeviceManager::Instance());
+  return manager->GetContext(device_num - 1);
+}
+
+Status PlasmaStore::AllocateCudaMemory(
+    int device_num, int64_t size, uint8_t** out_pointer,
+    std::shared_ptr<CudaIpcMemHandle>* out_ipc_handle) {
+  ARROW_ASSIGN_OR_RAISE(auto context, GetCudaContext(device_num));
+  ARROW_ASSIGN_OR_RAISE(auto cuda_buffer, context->Allocate(static_cast<int64_t>(size)));
+  *out_pointer = reinterpret_cast<uint8_t*>(cuda_buffer->address());
+  // The IPC handle will keep the buffer memory alive
+  return cuda_buffer->ExportForIpc().Value(out_ipc_handle);
+}
+
+Status PlasmaStore::FreeCudaMemory(int device_num, int64_t size, uint8_t* pointer) {
+  ARROW_ASSIGN_OR_RAISE(auto context, GetCudaContext(device_num));
+  RETURN_NOT_OK(context->Free(pointer, size));
+  return Status::OK();
+}
+#endif
+
+// Create a new object buffer in the hash table.
+PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, bool evict_if_full,
+                                      int64_t data_size, int64_t metadata_size,
+                                      int device_num, Client* client,
+                                      PlasmaObject* result) {
+  ARROW_LOG(DEBUG) << "creating object " << object_id.hex();
+
+  auto entry = GetObjectTableEntry(&store_info_, object_id);
+  if (entry != nullptr) {
+    // There is already an object with the same ID in the Plasma Store, so
+    // ignore this request.
+    return PlasmaError::ObjectExists;
+  }
+
+  int fd = -1;
+  int64_t map_size = 0;
+  ptrdiff_t offset = 0;
+  uint8_t* pointer = nullptr;
+  auto total_size = data_size + metadata_size;
+
+  if (device_num == 0) {
+    pointer =
+        AllocateMemory(total_size, evict_if_full, &fd, &map_size, &offset, client, true);
+    if (!pointer) {
+      ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.hex()
+                       << ", data_size=" << data_size
+                       << ", metadata_size=" << metadata_size
+                       << ", will send a reply of PlasmaError::OutOfMemory";
+      return PlasmaError::OutOfMemory;
+    }
+  } else {
+#ifdef PLASMA_CUDA
+    /// IPC GPU handle to share with clients.
+    std::shared_ptr<::arrow::cuda::CudaIpcMemHandle> ipc_handle;
+    auto st = AllocateCudaMemory(device_num, total_size, &pointer, &ipc_handle);
+    if (!st.ok()) {
+      ARROW_LOG(ERROR) << "Failed to allocate CUDA memory: " << st.ToString();
+      return PlasmaError::OutOfMemory;
+    }
+    result->ipc_handle = ipc_handle;
+#else
+    ARROW_LOG(ERROR) << "device_num != 0 but CUDA not enabled";
+    return PlasmaError::OutOfMemory;
+#endif
+  }
+
+  auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
+  entry = store_info_.objects.emplace(object_id, std::move(ptr)).first->second.get();
+  entry->data_size = data_size;
+  entry->metadata_size = metadata_size;
+  entry->pointer = pointer;
+  // TODO(pcm): Set the other fields.
+  entry->fd = fd;
+  entry->map_size = map_size;
+  entry->offset = offset;
+  entry->state = ObjectState::PLASMA_CREATED;
+  entry->device_num = device_num;
+  entry->create_time = std::time(nullptr);
+  entry->construct_duration = -1;
+
+#ifdef PLASMA_CUDA
+  entry->ipc_handle = result->ipc_handle;
+#endif
+
+  result->store_fd = fd;
+  result->data_offset = offset;
+  result->metadata_offset = offset + data_size;
+  result->data_size = data_size;
+  result->metadata_size = metadata_size;
+  result->device_num = device_num;
+  // Notify the eviction policy that this object was created. This must be done
+  // immediately before the call to AddToClientObjectIds so that the
+  // eviction policy does not have an opportunity to evict the object.
+  eviction_policy_.ObjectCreated(object_id, client, true);
+  // Record that this client is using this object.
+  AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client);
+  return PlasmaError::OK;
+}
+
+void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) {
+  DCHECK(object != nullptr);
+  DCHECK(entry != nullptr);
+  DCHECK(entry->state == ObjectState::PLASMA_SEALED);
+#ifdef PLASMA_CUDA
+  if (entry->device_num != 0) {
+    object->ipc_handle = entry->ipc_handle;
+  }
+#endif
+  object->store_fd = entry->fd;
+  object->data_offset = entry->offset;
+  object->metadata_offset = entry->offset + entry->data_size;
+  object->data_size = entry->data_size;
+  object->metadata_size = entry->metadata_size;
+  object->device_num = entry->device_num;
+}
+
+void PlasmaStore::RemoveGetRequest(GetRequest* get_request) {
+  // Remove the get request from each of the relevant object_get_requests hash
+  // tables if it is present there. It should only be present there if the get
+  // request timed out or if it was issued by a client that has disconnected.
+  for (ObjectID& object_id : get_request->object_ids) {
+    auto object_request_iter = object_get_requests_.find(object_id);
+    if (object_request_iter != object_get_requests_.end()) {
+      auto& get_requests = object_request_iter->second;
+      // Erase get_req from the vector.
+      auto it = std::find(get_requests.begin(), get_requests.end(), get_request);
+      if (it != get_requests.end()) {
+        get_requests.erase(it);
+        // If the vector is empty, remove the object ID from the map.
+        if (get_requests.empty()) {
+          object_get_requests_.erase(object_request_iter);
+        }
+      }
+    }
+  }
+  // Remove the get request.
+  if (get_request->timer != -1) {
+    ARROW_CHECK(loop_->RemoveTimer(get_request->timer) == kEventLoopOk);
+  }
+  delete get_request;
+}
+
+void PlasmaStore::RemoveGetRequestsForClient(Client* client) {
+  std::unordered_set<GetRequest*> get_requests_to_remove;
+  for (auto const& pair : object_get_requests_) {
+    for (GetRequest* get_request : pair.second) {
+      if (get_request->client == client) {
+        get_requests_to_remove.insert(get_request);
+      }
+    }
+  }
+
+  // It shouldn't be possible for a given client to be in the middle of multiple get
+  // requests.
+  ARROW_CHECK(get_requests_to_remove.size() <= 1);
+  for (GetRequest* get_request : get_requests_to_remove) {
+    RemoveGetRequest(get_request);
+  }
+}
+
+void PlasmaStore::ReturnFromGet(GetRequest* get_req) {
+  // Figure out how many file descriptors we need to send.
+  std::unordered_set<int> fds_to_send;
+  std::vector<int> store_fds;
+  std::vector<int64_t> mmap_sizes;
+  for (const auto& object_id : get_req->object_ids) {
+    PlasmaObject& object = get_req->objects[object_id];
+    int fd = object.store_fd;
+    if (object.data_size != -1 && fds_to_send.count(fd) == 0 && fd != -1) {
+      fds_to_send.insert(fd);
+      store_fds.push_back(fd);
+      mmap_sizes.push_back(GetMmapSize(fd));
+    }
+  }
+
+  // Send the get reply to the client.
+  Status s = SendGetReply(get_req->client->fd, &get_req->object_ids[0], get_req->objects,
+                          get_req->object_ids.size(), store_fds, mmap_sizes);
+  WarnIfSigpipe(s.ok() ? 0 : -1, get_req->client->fd);
+  // If we successfully sent the get reply message to the client, then also send
+  // the file descriptors.
+  if (s.ok()) {
+    // Send all of the file descriptors for the present objects.
+    for (int store_fd : store_fds) {
+      // Only send the file descriptor if it hasn't been sent (see analogous
+      // logic in GetStoreFd in client.cc).
+      if (get_req->client->used_fds.find(store_fd) == get_req->client->used_fds.end()) {
+        WarnIfSigpipe(send_fd(get_req->client->fd, store_fd), get_req->client->fd);
+        get_req->client->used_fds.insert(store_fd);
+      }
+    }
+  }
+
+  // Remove the get request from each of the relevant object_get_requests hash
+  // tables if it is present there. It should only be present there if the get
+  // request timed out.
+  RemoveGetRequest(get_req);
+}
+
+void PlasmaStore::UpdateObjectGetRequests(const ObjectID& object_id) {
+  auto it = object_get_requests_.find(object_id);
+  // If there are no get requests involving this object, then return.
+  if (it == object_get_requests_.end()) {
+    return;
+  }
+
+  auto& get_requests = it->second;
+
+  // After finishing the loop below, get_requests and it will have been
+  // invalidated by the removal of object_id from object_get_requests_.
+  size_t index = 0;
+  size_t num_requests = get_requests.size();
+  for (size_t i = 0; i < num_requests; ++i) {
+    auto get_req = get_requests[index];
+    auto entry = GetObjectTableEntry(&store_info_, object_id);
+    ARROW_CHECK(entry != nullptr);
+
+    PlasmaObject_init(&get_req->objects[object_id], entry);
+    get_req->num_satisfied += 1;
+    // Record the fact that this client will be using this object and will
+    // be responsible for releasing this object.
+    AddToClientObjectIds(object_id, entry, get_req->client);
+
+    // If this get request is done, reply to the client.
+    if (get_req->num_satisfied == get_req->num_objects_to_wait_for) {
+      ReturnFromGet(get_req);
+    } else {
+      // The call to ReturnFromGet will remove the current element in the
+      // array, so we only increment the counter in the else branch.
+      index += 1;
+    }
+  }
+
+  // No get requests should be waiting for this object anymore. The object ID
+  // may have been removed from the object_get_requests_ by ReturnFromGet, but
+  // if the get request has not returned yet, then remove the object ID from the
+  // map here.
+  it = object_get_requests_.find(object_id);
+  if (it != object_get_requests_.end()) {
+    object_get_requests_.erase(object_id);
+  }
+}
+
+void PlasmaStore::ProcessGetRequest(Client* client,
+                                    const std::vector<ObjectID>& object_ids,
+                                    int64_t timeout_ms) {
+  // Create a get request for this object.
+  auto get_req = new GetRequest(client, object_ids);
+  std::vector<ObjectID> evicted_ids;
+  std::vector<ObjectTableEntry*> evicted_entries;
+  for (auto object_id : object_ids) {
+    // Check if this object is already present locally. If so, record that the
+    // object is being used and mark it as accounted for.
+    auto entry = GetObjectTableEntry(&store_info_, object_id);
+    if (entry && entry->state == ObjectState::PLASMA_SEALED) {
+      // Update the get request to take into account the present object.
+      PlasmaObject_init(&get_req->objects[object_id], entry);
+      get_req->num_satisfied += 1;
+      // If necessary, record that this client is using this object. In the case
+      // where entry == NULL, this will be called from SealObject.
+      AddToClientObjectIds(object_id, entry, client);
+    } else if (entry && entry->state == ObjectState::PLASMA_EVICTED) {
+      // Make sure the object pointer is not already allocated
+      ARROW_CHECK(!entry->pointer);
+
+      entry->pointer =
+          AllocateMemory(entry->data_size + entry->metadata_size, /*evict=*/true,
+                         &entry->fd, &entry->map_size, &entry->offset, client, false);
+      if (entry->pointer) {
+        entry->state = ObjectState::PLASMA_CREATED;
+        entry->create_time = std::time(nullptr);
+        eviction_policy_.ObjectCreated(object_id, client, false);
+        AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client);
+        evicted_ids.push_back(object_id);
+        evicted_entries.push_back(entry);
+      } else {
+        // We are out of memory and cannot allocate memory for this object.
+        // Change the state of the object back to PLASMA_EVICTED so some
+        // other request can try again.
+        entry->state = ObjectState::PLASMA_EVICTED;
+      }
+    } else {
+      // Add a placeholder plasma object to the get request to indicate that the
+      // object is not present. This will be parsed by the client. We set the
+      // data size to -1 to indicate that the object is not present.
+      get_req->objects[object_id].data_size = -1;
+      // Add the get request to the relevant data structures.
+      object_get_requests_[object_id].push_back(get_req);
+    }
+  }
+
+  if (!evicted_ids.empty()) {
+    unsigned char digest[kDigestSize] = {};
+    std::vector<std::shared_ptr<Buffer>> buffers;
+    for (size_t i = 0; i < evicted_ids.size(); ++i) {
+      ARROW_CHECK(evicted_entries[i]->pointer != nullptr);
+      buffers.emplace_back(new arrow::MutableBuffer(evicted_entries[i]->pointer,
+                                                    evicted_entries[i]->data_size));
+    }
+    if (external_store_->Get(evicted_ids, buffers).ok()) {
+      for (size_t i = 0; i < evicted_ids.size(); ++i) {
+        evicted_entries[i]->state = ObjectState::PLASMA_SEALED;
+        std::memcpy(&evicted_entries[i]->digest[0], &digest[0], kDigestSize);
+        evicted_entries[i]->construct_duration =
+            std::time(nullptr) - evicted_entries[i]->create_time;
+        PlasmaObject_init(&get_req->objects[evicted_ids[i]], evicted_entries[i]);
+        get_req->num_satisfied += 1;
+      }
+    } else {
+      // We tried to get the objects from the external store, but could not get them.
+      // Set the state of these objects back to PLASMA_EVICTED so some other request
+      // can try again.
+      for (size_t i = 0; i < evicted_ids.size(); ++i) {
+        evicted_entries[i]->state = ObjectState::PLASMA_EVICTED;
+      }
+    }
+  }
+
+  // If all of the objects are present already or if the timeout is 0, return to
+  // the client.
+  if (get_req->num_satisfied == get_req->num_objects_to_wait_for || timeout_ms == 0) {
+    ReturnFromGet(get_req);
+  } else if (timeout_ms != -1) {
+    // Set a timer that will cause the get request to return to the client. Note
+    // that a timeout of -1 is used to indicate that no timer should be set.
+    get_req->timer = loop_->AddTimer(timeout_ms, [this, get_req](int64_t timer_id) {
+      ReturnFromGet(get_req);
+      return kEventLoopTimerDone;
+    });
+  }
+}
+
+int PlasmaStore::RemoveFromClientObjectIds(const ObjectID& object_id,
+                                           ObjectTableEntry* entry, Client* client) {
+  auto it = client->object_ids.find(object_id);
+  if (it != client->object_ids.end()) {
+    client->object_ids.erase(it);
+    // Decrease reference count.
+    entry->ref_count--;
+
+    // If no more clients are using this object, notify the eviction policy
+    // that the object is no longer being used.
+    if (entry->ref_count == 0) {
+      if (deletion_cache_.count(object_id) == 0) {
+        // Tell the eviction policy that this object is no longer being used.
+        eviction_policy_.EndObjectAccess(object_id);
+      } else {
+        // Above code does not really delete an object. Instead, it just put an
+        // object to LRU cache which will be cleaned when the memory is not enough.
+        deletion_cache_.erase(object_id);
+        EvictObjects({object_id});
+      }
+    }
+    // Return 1 to indicate that the client was removed.
+    return 1;
+  } else {
+    // Return 0 to indicate that the client was not removed.
+    return 0;
+  }
+}
+
+void PlasmaStore::EraseFromObjectTable(const ObjectID& object_id) {
+  auto& object = store_info_.objects[object_id];
+  auto buff_size = object->data_size + object->metadata_size;
+  if (object->device_num == 0) {
+    PlasmaAllocator::Free(object->pointer, buff_size);
+  } else {
+#ifdef PLASMA_CUDA
+    ARROW_CHECK_OK(FreeCudaMemory(object->device_num, buff_size, object->pointer));
+#endif
+  }
+  store_info_.objects.erase(object_id);
+}
+
+void PlasmaStore::ReleaseObject(const ObjectID& object_id, Client* client) {
+  auto entry = GetObjectTableEntry(&store_info_, object_id);
+  ARROW_CHECK(entry != nullptr);
+  // Remove the client from the object's array of clients.
+  ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1);
+}
+
+// Check if an object is present.
+ObjectStatus PlasmaStore::ContainsObject(const ObjectID& object_id) {
+  auto entry = GetObjectTableEntry(&store_info_, object_id);
+  return entry && (entry->state == ObjectState::PLASMA_SEALED ||
+                   entry->state == ObjectState::PLASMA_EVICTED)
+             ? ObjectStatus::OBJECT_FOUND
+             : ObjectStatus::OBJECT_NOT_FOUND;
+}
+
+void PlasmaStore::SealObjects(const std::vector<ObjectID>& object_ids,
+                              const std::vector<std::string>& digests) {
+  std::vector<ObjectInfoT> infos;
+
+  ARROW_LOG(DEBUG) << "sealing " << object_ids.size() << " objects";
+  for (size_t i = 0; i < object_ids.size(); ++i) {
+    ObjectInfoT object_info;
+    auto entry = GetObjectTableEntry(&store_info_, object_ids[i]);
+    ARROW_CHECK(entry != nullptr);
+    ARROW_CHECK(entry->state == ObjectState::PLASMA_CREATED);
+    // Set the state of object to SEALED.
+    entry->state = ObjectState::PLASMA_SEALED;
+    // Set the object digest.
+    std::memcpy(&entry->digest[0], digests[i].c_str(), kDigestSize);
+    // Set object construction duration.
+    entry->construct_duration = std::time(nullptr) - entry->create_time;
+
+    object_info.object_id = object_ids[i].binary();
+    object_info.data_size = entry->data_size;
+    object_info.metadata_size = entry->metadata_size;
+    object_info.digest = digests[i];
+    infos.push_back(object_info);
+  }
+
+  PushNotifications(infos);
+
+  for (size_t i = 0; i < object_ids.size(); ++i) {
+    UpdateObjectGetRequests(object_ids[i]);
+  }
+}
+
+int PlasmaStore::AbortObject(const ObjectID& object_id, Client* client) {
+  auto entry = GetObjectTableEntry(&store_info_, object_id);
+  ARROW_CHECK(entry != nullptr) << "To abort an object it must be in the object table.";
+  ARROW_CHECK(entry->state != ObjectState::PLASMA_SEALED)
+      << "To abort an object it must not have been sealed.";
+  auto it = client->object_ids.find(object_id);
+  if (it == client->object_ids.end()) {
+    // If the client requesting the abort is not the creator, do not
+    // perform the abort.
+    return 0;
+  } else {
+    // The client requesting the abort is the creator. Free the object.
+    EraseFromObjectTable(object_id);
+    client->object_ids.erase(it);
+    return 1;
+  }
+}
+
+PlasmaError PlasmaStore::DeleteObject(ObjectID& object_id) {
+  auto entry = GetObjectTableEntry(&store_info_, object_id);
+  // TODO(rkn): This should probably not fail, but should instead throw an
+  // error. Maybe we should also support deleting objects that have been
+  // created but not sealed.
+  if (entry == nullptr) {
+    // To delete an object it must be in the object table.
+    return PlasmaError::ObjectNotFound;
+  }
+
+  if (entry->state != ObjectState::PLASMA_SEALED) {
+    // To delete an object it must have been sealed.
+    // Put it into deletion cache, it will be deleted later.
+    deletion_cache_.emplace(object_id);
+    return PlasmaError::ObjectNotSealed;
+  }
+
+  if (entry->ref_count != 0) {
+    // To delete an object, there must be no clients currently using it.
+    // Put it into deletion cache, it will be deleted later.
+    deletion_cache_.emplace(object_id);
+    return PlasmaError::ObjectInUse;
+  }
+
+  eviction_policy_.RemoveObject(object_id);
+  EraseFromObjectTable(object_id);
+  // Inform all subscribers that the object has been deleted.
+  fb::ObjectInfoT notification;
+  notification.object_id = object_id.binary();
+  notification.is_deletion = true;
+  PushNotification(&notification);
+
+  return PlasmaError::OK;
+}
+
+void PlasmaStore::EvictObjects(const std::vector<ObjectID>& object_ids) {
+  if (object_ids.size() == 0) {
+    return;
+  }
+
+  std::vector<std::shared_ptr<arrow::Buffer>> evicted_object_data;
+  std::vector<ObjectTableEntry*> evicted_entries;
+  for (const auto& object_id : object_ids) {
+    ARROW_LOG(DEBUG) << "evicting object " << object_id.hex();
+    auto entry = GetObjectTableEntry(&store_info_, object_id);
+    // TODO(rkn): This should probably not fail, but should instead throw an
+    // error. Maybe we should also support deleting objects that have been
+    // created but not sealed.
+    ARROW_CHECK(entry != nullptr) << "To evict an object it must be in the object table.";
+    ARROW_CHECK(entry->state == ObjectState::PLASMA_SEALED)
+        << "To evict an object it must have been sealed.";
+    ARROW_CHECK(entry->ref_count == 0)
+        << "To evict an object, there must be no clients currently using it.";
+
+    // If there is a backing external store, then mark object for eviction to
+    // external store, free the object data pointer and keep a placeholder
+    // entry in ObjectTable
+    if (external_store_) {
+      evicted_object_data.push_back(std::make_shared<arrow::Buffer>(
+          entry->pointer, entry->data_size + entry->metadata_size));
+      evicted_entries.push_back(entry);
+    } else {
+      // If there is no backing external store, just erase the object entry
+      // and send a deletion notification.
+      EraseFromObjectTable(object_id);
+      // Inform all subscribers that the object has been deleted.
+      fb::ObjectInfoT notification;
+      notification.object_id = object_id.binary();
+      notification.is_deletion = true;
+      PushNotification(&notification);
+    }
+  }
+
+  if (external_store_ && !object_ids.empty()) {
+    ARROW_CHECK_OK(external_store_->Put(object_ids, evicted_object_data));
+    for (auto entry : evicted_entries) {
+      PlasmaAllocator::Free(entry->pointer, entry->data_size + entry->metadata_size);
+      entry->pointer = nullptr;
+      entry->state = ObjectState::PLASMA_EVICTED;
+    }
+  }
+}
+
+void PlasmaStore::ConnectClient(int listener_sock) {
+  int client_fd = AcceptClient(listener_sock);
+
+  Client* client = new Client(client_fd);
+  connected_clients_[client_fd] = std::unique_ptr<Client>(client);
+
+  // Add a callback to handle events on this socket.
+  // TODO(pcm): Check return value.
+  loop_->AddFileEvent(client_fd, kEventLoopRead, [this, client](int events) {
+    Status s = ProcessMessage(client);
+    if (!s.ok()) {
+      ARROW_LOG(FATAL) << "Failed to process file event: " << s;
+    }
+  });
+  ARROW_LOG(DEBUG) << "New connection with fd " << client_fd;
+}
+
+void PlasmaStore::DisconnectClient(int client_fd) {
+  ARROW_CHECK(client_fd > 0);
+  auto it = connected_clients_.find(client_fd);
+  ARROW_CHECK(it != connected_clients_.end());
+  loop_->RemoveFileEvent(client_fd);
+  // Close the socket.
+  close(client_fd);
+  ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd;
+  // Release all the objects that the client was using.
+  auto client = it->second.get();
+  eviction_policy_.ClientDisconnected(client);
+  std::unordered_map<ObjectID, ObjectTableEntry*> sealed_objects;
+  for (const auto& object_id : client->object_ids) {
+    auto it = store_info_.objects.find(object_id);
+    if (it == store_info_.objects.end()) {
+      continue;
+    }
+
+    if (it->second->state == ObjectState::PLASMA_SEALED) {
+      // Add sealed objects to a temporary list of object IDs. Do not perform
+      // the remove here, since it potentially modifies the object_ids table.
+      sealed_objects[it->first] = it->second.get();
+    } else {
+      // Abort unsealed object.
+      // Don't call AbortObject() because client->object_ids would be modified.
+      EraseFromObjectTable(object_id);
+    }
+  }
+
+  /// Remove all of the client's GetRequests.
+  RemoveGetRequestsForClient(client);
+
+  for (const auto& entry : sealed_objects) {
+    RemoveFromClientObjectIds(entry.first, entry.second, client);
+  }
+
+  if (client->notification_fd > 0) {
+    // This client has subscribed for notifications.
+    auto notify_fd = client->notification_fd;
+    loop_->RemoveFileEvent(notify_fd);
+    // Close socket.
+    close(notify_fd);
+    // Remove notification queue for this fd from global map.
+    pending_notifications_.erase(notify_fd);
+    // Reset fd.
+    client->notification_fd = -1;
+  }
+
+  connected_clients_.erase(it);
+}
+
+/// Send notifications about sealed objects to the subscribers. This is called
+/// in SealObject. If the socket's send buffer is full, the notification will
+/// be buffered, and this will be called again when the send buffer has room.
+/// Since we call erase on pending_notifications_, all iterators get
+/// invalidated, which is why we return a valid iterator to the next client to
+/// be used in PushNotification.
+///
+/// \param it Iterator that points to the client to send the notification to.
+/// \return Iterator pointing to the next client.
+PlasmaStore::NotificationMap::iterator PlasmaStore::SendNotifications(
+    PlasmaStore::NotificationMap::iterator it) {
+  int client_fd = it->first;
+  auto& notifications = it->second.object_notifications;
+
+  int num_processed = 0;
+  bool closed = false;
+  // Loop over the array of pending notifications and send as many of them as
+  // possible.
+  for (size_t i = 0; i < notifications.size(); ++i) {
+    auto& notification = notifications.at(i);
+    // Decode the length, which is the first bytes of the message.
+    int64_t size = *(reinterpret_cast<int64_t*>(notification.get()));
+
+    // Attempt to send a notification about this object ID.
+    ssize_t nbytes = send(client_fd, notification.get(), sizeof(int64_t) + size, 0);
+    if (nbytes >= 0) {
+      ARROW_CHECK(nbytes == static_cast<ssize_t>(sizeof(int64_t)) + size);
+    } else if (nbytes == -1 &&
+               (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
+      ARROW_LOG(DEBUG) << "The socket's send buffer is full, so we are caching this "
+                          "notification and will send it later.";
+      // Add a callback to the event loop to send queued notifications whenever
+      // there is room in the socket's send buffer. Callbacks can be added
+      // more than once here and will be overwritten. The callback is removed
+      // at the end of the method.
+      // TODO(pcm): Introduce status codes and check in case the file descriptor
+      // is added twice.
+      loop_->AddFileEvent(client_fd, kEventLoopWrite, [this, client_fd](int events) {
+        SendNotifications(pending_notifications_.find(client_fd));
+      });
+      break;
+    } else {
+      ARROW_LOG(WARNING) << "Failed to send notification to client on fd " << client_fd;
+      if (errno == EPIPE) {
+        closed = true;
+        break;
+      }
+    }
+    num_processed += 1;
+  }
+  // Remove the sent notifications from the array.
+  notifications.erase(notifications.begin(), notifications.begin() + num_processed);
+
+  // If we have sent all notifications, remove the fd from the event loop.
+  if (notifications.empty()) {
+    loop_->RemoveFileEvent(client_fd);
+  }
+
+  // Stop sending notifications if the pipe was broken.
+  if (closed) {
+    close(client_fd);
+    return pending_notifications_.erase(it);
+  } else {
+    return ++it;
+  }
+}
+
+void PlasmaStore::PushNotification(fb::ObjectInfoT* object_info) {
+  auto it = pending_notifications_.begin();
+  while (it != pending_notifications_.end()) {
+    std::vector<fb::ObjectInfoT> info;
+    info.push_back(*object_info);
+    auto notification = CreatePlasmaNotificationBuffer(info);
+    it->second.object_notifications.emplace_back(std::move(notification));
+    it = SendNotifications(it);
+  }
+}
+
+void PlasmaStore::PushNotifications(std::vector<fb::ObjectInfoT>& object_info) {
+  auto it = pending_notifications_.begin();
+  while (it != pending_notifications_.end()) {
+    auto notifications = CreatePlasmaNotificationBuffer(object_info);
+    it->second.object_notifications.emplace_back(std::move(notifications));
+    it = SendNotifications(it);
+  }
+}
+
+void PlasmaStore::PushNotification(fb::ObjectInfoT* object_info, int client_fd) {
+  auto it = pending_notifications_.find(client_fd);
+  if (it != pending_notifications_.end()) {
+    std::vector<fb::ObjectInfoT> info;
+    info.push_back(*object_info);
+    auto notification = CreatePlasmaNotificationBuffer(info);
+    it->second.object_notifications.emplace_back(std::move(notification));
+    SendNotifications(it);
+  }
+}
+
+// Subscribe to notifications about sealed objects.
+void PlasmaStore::SubscribeToUpdates(Client* client) {
+  ARROW_LOG(DEBUG) << "subscribing to updates on fd " << client->fd;
+  if (client->notification_fd > 0) {
+    // This client has already subscribed. Return.
+    return;
+  }
+
+  // TODO(rkn): The store could block here if the client doesn't send a file
+  // descriptor.
+  int fd = recv_fd(client->fd);
+  if (fd < 0) {
+    // This may mean that the client died before sending the file descriptor.
+    ARROW_LOG(WARNING) << "Failed to receive file descriptor from client on fd "
+                       << client->fd << ".";
+    return;
+  }
+
+  // Add this fd to global map, which is needed for this client to receive notifications.
+  pending_notifications_[fd];
+  client->notification_fd = fd;
+
+  // Push notifications to the new subscriber about existing sealed objects.
+  for (const auto& entry : store_info_.objects) {
+    if (entry.second->state == ObjectState::PLASMA_SEALED) {
+      ObjectInfoT info;
+      info.object_id = entry.first.binary();
+      info.data_size = entry.second->data_size;
+      info.metadata_size = entry.second->metadata_size;
+      info.digest =
+          std::string(reinterpret_cast<char*>(&entry.second->digest[0]), kDigestSize);
+      PushNotification(&info, fd);
+    }
+  }
+}
+
+Status PlasmaStore::ProcessMessage(Client* client) {
+  fb::MessageType type;
+  Status s = ReadMessage(client->fd, &type, &input_buffer_);
+  ARROW_CHECK(s.ok() || s.IsIOError());
+
+  uint8_t* input = input_buffer_.data();
+  size_t input_size = input_buffer_.size();
+  ObjectID object_id;
+  PlasmaObject object = {};
+
+  // Process the different types of requests.
+  switch (type) {
+    case fb::MessageType::PlasmaCreateRequest: {
+      bool evict_if_full;
+      int64_t data_size;
+      int64_t metadata_size;
+      int device_num;
+      RETURN_NOT_OK(ReadCreateRequest(input, input_size, &object_id, &evict_if_full,
+                                      &data_size, &metadata_size, &device_num));
+      PlasmaError error_code = CreateObject(object_id, evict_if_full, data_size,
+                                            metadata_size, device_num, client, &object);
+      int64_t mmap_size = 0;
+      if (error_code == PlasmaError::OK && device_num == 0) {
+        mmap_size = GetMmapSize(object.store_fd);
+      }
+      HANDLE_SIGPIPE(
+          SendCreateReply(client->fd, object_id, &object, error_code, mmap_size),
+          client->fd);
+      // Only send the file descriptor if it hasn't been sent (see analogous
+      // logic in GetStoreFd in client.cc). Similar in ReturnFromGet.
+      if (error_code == PlasmaError::OK && device_num == 0 &&
+          client->used_fds.find(object.store_fd) == client->used_fds.end()) {
+        WarnIfSigpipe(send_fd(client->fd, object.store_fd), client->fd);
+        client->used_fds.insert(object.store_fd);
+      }
+    } break;
+    case fb::MessageType::PlasmaCreateAndSealRequest: {
+      bool evict_if_full;
+      std::string data;
+      std::string metadata;
+      std::string digest;
+      digest.reserve(kDigestSize);
+      RETURN_NOT_OK(ReadCreateAndSealRequest(input, input_size, &object_id,
+                                             &evict_if_full, &data, &metadata, &digest));
+      // CreateAndSeal currently only supports device_num = 0, which corresponds
+      // to the host.
+      int device_num = 0;
+      PlasmaError error_code = CreateObject(object_id, evict_if_full, data.size(),
+                                            metadata.size(), device_num, client, &object);
+
+      // If the object was successfully created, fill out the object data and seal it.
+      if (error_code == PlasmaError::OK) {
+        auto entry = GetObjectTableEntry(&store_info_, object_id);
+        ARROW_CHECK(entry != nullptr);
+        // Write the inlined data and metadata into the allocated object.
+        std::memcpy(entry->pointer, data.data(), data.size());
+        std::memcpy(entry->pointer + data.size(), metadata.data(), metadata.size());
+        SealObjects({object_id}, {digest});
+        // Remove the client from the object's array of clients because the
+        // object is not being used by any client. The client was added to the
+        // object's array of clients in CreateObject. This is analogous to the
+        // Release call that happens in the client's Seal method.
+        ARROW_CHECK(RemoveFromClientObjectIds(object_id, entry, client) == 1);
+      }
+
+      // Reply to the client.
+      HANDLE_SIGPIPE(SendCreateAndSealReply(client->fd, error_code), client->fd);
+    } break;
+    case fb::MessageType::PlasmaCreateAndSealBatchRequest: {
+      bool evict_if_full;
+      std::vector<ObjectID> object_ids;
+      std::vector<std::string> data;
+      std::vector<std::string> metadata;
+      std::vector<std::string> digests;
+
+      RETURN_NOT_OK(ReadCreateAndSealBatchRequest(
+          input, input_size, &object_ids, &evict_if_full, &data, &metadata, &digests));
+
+      // CreateAndSeal currently only supports device_num = 0, which corresponds
+      // to the host.
+      int device_num = 0;
+      size_t i = 0;
+      PlasmaError error_code = PlasmaError::OK;
+      for (i = 0; i < object_ids.size(); i++) {
+        error_code = CreateObject(object_ids[i], evict_if_full, data[i].size(),
+                                  metadata[i].size(), device_num, client, &object);
+        if (error_code != PlasmaError::OK) {
+          break;
+        }
+      }
+
+      // if OK, seal all the objects,
+      // if error, abort the previous i objects immediately
+      if (error_code == PlasmaError::OK) {
+        for (i = 0; i < object_ids.size(); i++) {
+          auto entry = GetObjectTableEntry(&store_info_, object_ids[i]);
+          ARROW_CHECK(entry != nullptr);
+          // Write the inlined data and metadata into the allocated object.
+          std::memcpy(entry->pointer, data[i].data(), data[i].size());
+          std::memcpy(entry->pointer + data[i].size(), metadata[i].data(),
+                      metadata[i].size());
+        }
+
+        SealObjects(object_ids, digests);
+        // Remove the client from the object's array of clients because the
+        // object is not being used by any client. The client was added to the
+        // object's array of clients in CreateObject. This is analogous to the
+        // Release call that happens in the client's Seal method.
+        for (i = 0; i < object_ids.size(); i++) {
+          auto entry = GetObjectTableEntry(&store_info_, object_ids[i]);
+          ARROW_CHECK(RemoveFromClientObjectIds(object_ids[i], entry, client) == 1);
+        }
+      } else {
+        for (size_t j = 0; j < i; j++) {
+          AbortObject(object_ids[j], client);
+        }
+      }
+
+      HANDLE_SIGPIPE(SendCreateAndSealBatchReply(client->fd, error_code), client->fd);
+    } break;
+    case fb::MessageType::PlasmaAbortRequest: {
+      RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id));
+      ARROW_CHECK(AbortObject(object_id, client) == 1) << "To abort an object, the only "
+                                                          "client currently using it "
+                                                          "must be the creator.";
+      HANDLE_SIGPIPE(SendAbortReply(client->fd, object_id), client->fd);
+    } break;
+    case fb::MessageType::PlasmaGetRequest: {
+      std::vector<ObjectID> object_ids_to_get;
+      int64_t timeout_ms;
+      RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get, &timeout_ms));
+      ProcessGetRequest(client, object_ids_to_get, timeout_ms);
+    } break;
+    case fb::MessageType::PlasmaReleaseRequest: {
+      RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id));
+      ReleaseObject(object_id, client);
+    } break;
+    case fb::MessageType::PlasmaDeleteRequest: {
+      std::vector<ObjectID> object_ids;
+      std::vector<PlasmaError> error_codes;
+      RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_ids));
+      error_codes.reserve(object_ids.size());
+      for (auto& object_id : object_ids) {
+        error_codes.push_back(DeleteObject(object_id));
+      }
+      HANDLE_SIGPIPE(SendDeleteReply(client->fd, object_ids, error_codes), client->fd);
+    } break;
+    case fb::MessageType::PlasmaContainsRequest: {
+      RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id));
+      if (ContainsObject(object_id) == ObjectStatus::OBJECT_FOUND) {
+        HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 1), client->fd);
+      } else {
+        HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd);
+      }
+    } break;
+    case fb::MessageType::PlasmaListRequest: {
+      RETURN_NOT_OK(ReadListRequest(input, input_size));
+      HANDLE_SIGPIPE(SendListReply(client->fd, store_info_.objects), client->fd);
+    } break;
+    case fb::MessageType::PlasmaSealRequest: {
+      std::string digest;
+      RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest));
+      SealObjects({object_id}, {digest});
+      HANDLE_SIGPIPE(SendSealReply(client->fd, object_id, PlasmaError::OK), client->fd);
+    } break;
+    case fb::MessageType::PlasmaEvictRequest: {
+      // This code path should only be used for testing.
+      int64_t num_bytes;
+      RETURN_NOT_OK(ReadEvictRequest(input, input_size, &num_bytes));
+      std::vector<ObjectID> objects_to_evict;
+      int64_t num_bytes_evicted =
+          eviction_policy_.ChooseObjectsToEvict(num_bytes, &objects_to_evict);
+      EvictObjects(objects_to_evict);
+      HANDLE_SIGPIPE(SendEvictReply(client->fd, num_bytes_evicted), client->fd);
+    } break;
+    case fb::MessageType::PlasmaRefreshLRURequest: {
+      std::vector<ObjectID> object_ids;
+      RETURN_NOT_OK(ReadRefreshLRURequest(input, input_size, &object_ids));
+      eviction_policy_.RefreshObjects(object_ids);
+      HANDLE_SIGPIPE(SendRefreshLRUReply(client->fd), client->fd);
+    } break;
+    case fb::MessageType::PlasmaSubscribeRequest:
+      SubscribeToUpdates(client);
+      break;
+    case fb::MessageType::PlasmaConnectRequest: {
+      HANDLE_SIGPIPE(SendConnectReply(client->fd, PlasmaAllocator::GetFootprintLimit()),
+                     client->fd);
+    } break;
+    case fb::MessageType::PlasmaDisconnectClient:
+      ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd;
+      DisconnectClient(client->fd);
+      break;
+    case fb::MessageType::PlasmaSetOptionsRequest: {
+      std::string client_name;
+      int64_t output_memory_quota;
+      RETURN_NOT_OK(
+          ReadSetOptionsRequest(input, input_size, &client_name, &output_memory_quota));
+      client->name = client_name;
+      bool success = eviction_policy_.SetClientQuota(client, output_memory_quota);
+      HANDLE_SIGPIPE(SendSetOptionsReply(client->fd, success ? PlasmaError::OK
+                                                             : PlasmaError::OutOfMemory),
+                     client->fd);
+    } break;
+    case fb::MessageType::PlasmaGetDebugStringRequest: {
+      HANDLE_SIGPIPE(SendGetDebugStringReply(client->fd, eviction_policy_.DebugString()),
+                     client->fd);
+    } break;
+    default:
+      // This code should be unreachable.
+      ARROW_CHECK(0);
+  }
+  return Status::OK();
+}
+
+class PlasmaStoreRunner {
+ public:
+  PlasmaStoreRunner() {}
+
+  void Start(char* socket_name, std::string directory, bool hugepages_enabled,
+             std::shared_ptr<ExternalStore> external_store) {
+    // Create the event loop.
+    loop_.reset(new EventLoop);
+    store_.reset(new PlasmaStore(loop_.get(), directory, hugepages_enabled, socket_name,
+                                 external_store));
+    plasma_config = store_->GetPlasmaStoreInfo();
+
+    // We are using a single memory-mapped file by mallocing and freeing a single
+    // large amount of space up front. According to the documentation,
+    // dlmalloc might need up to 128*sizeof(size_t) bytes for internal
+    // bookkeeping.
+    void* pointer = plasma::PlasmaAllocator::Memalign(
+        kBlockSize, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t));
+    ARROW_CHECK(pointer != nullptr);
+    // This will unmap the file, but the next one created will be as large
+    // as this one (this is an implementation detail of dlmalloc).
+    plasma::PlasmaAllocator::Free(
+        pointer, PlasmaAllocator::GetFootprintLimit() - 256 * sizeof(size_t));
+
+    int socket = BindIpcSock(socket_name, true);
+    // TODO(pcm): Check return value.
+    ARROW_CHECK(socket >= 0);
+
+    loop_->AddFileEvent(socket, kEventLoopRead, [this, socket](int events) {
+      this->store_->ConnectClient(socket);
+    });
+    loop_->Start();
+  }
+
+  void Stop() { loop_->Stop(); }
+
+  void Shutdown() {
+    loop_->Shutdown();
+    loop_ = nullptr;
+    store_ = nullptr;
+  }
+
+ private:
+  std::unique_ptr<EventLoop> loop_;
+  std::unique_ptr<PlasmaStore> store_;
+};
+
+static std::unique_ptr<PlasmaStoreRunner> g_runner = nullptr;
+
+void HandleSignal(int signal) {
+  if (signal == SIGTERM) {
+    ARROW_LOG(INFO) << "SIGTERM Signal received, closing Plasma Server...";
+    if (g_runner != nullptr) {
+      g_runner->Stop();
+    }
+  }
+}
+
+void StartServer(char* socket_name, std::string plasma_directory, bool hugepages_enabled,
+                 std::shared_ptr<ExternalStore> external_store) {
+  // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
+  // to a client that has already died, the store could die.
+  signal(SIGPIPE, SIG_IGN);
+
+  g_runner.reset(new PlasmaStoreRunner());
+  signal(SIGTERM, HandleSignal);
+  g_runner->Start(socket_name, plasma_directory, hugepages_enabled, external_store);
+}
+
+// Function to use (instead of ARROW_LOG(FATAL)) for usage, etc. errors before
+// the main server loop starts, so users don't get a backtrace if they
+// simply forgot a command-line switch.
+void ExitWithUsageError(const char* error_msg) {
+  std::cerr << gflags::ProgramInvocationShortName() << ": " << error_msg << std::endl;
+  exit(1);
+}
+
+}  // namespace plasma
+
+#ifdef __linux__
+#define SHM_DEFAULT_PATH "/dev/shm"
+#else
+#define SHM_DEFAULT_PATH "/tmp"
+#endif
+
+// Command-line flags.
+DEFINE_string(d, SHM_DEFAULT_PATH, "directory where to create the memory-backed file");
+DEFINE_string(e, "",
+              "endpoint for external storage service, where objects "
+              "evicted from Plasma store can be written to, optional");
+DEFINE_bool(h, false, "whether to enable hugepage support");
+DEFINE_string(s, "",
+              "socket name where the Plasma store will listen for requests, required");
+DEFINE_string(m, "", "amount of memory in bytes to use for Plasma store, required");
+
+int main(int argc, char* argv[]) {
+  ArrowLog::StartArrowLog(argv[0], ArrowLogLevel::ARROW_INFO);
+  ArrowLog::InstallFailureSignalHandler();
+
+  gflags::SetUsageMessage("Shared-memory server for Arrow data.\nUsage: ");
+  gflags::SetVersionString(ARROW_VERSION_STRING);
+
+  char* socket_name = nullptr;
+  // Directory where plasma memory mapped files are stored.
+  std::string plasma_directory;
+  std::string external_store_endpoint;
+  bool hugepages_enabled = false;
+  int64_t system_memory = -1;
+
+  gflags::ParseCommandLineFlags(&argc, &argv, /*remove_flags=*/true);
+  plasma_directory = FLAGS_d;
+  external_store_endpoint = FLAGS_e;
+  hugepages_enabled = FLAGS_h;
+  if (!FLAGS_s.empty()) {
+    // We only check below if socket_name is null, so don't set it if the flag was empty.
+    socket_name = const_cast<char*>(FLAGS_s.c_str());
+  }
+
+  if (!FLAGS_m.empty()) {
+    char extra;
+    int scanned = sscanf(FLAGS_m.c_str(), "%" SCNd64 "%c", &system_memory, &extra);
+    if (scanned != 1) {
+      plasma::ExitWithUsageError(
+          "-m switch takes memory in bytes, with no letter suffix allowed");
+    }
+
+    // Set system memory capacity
+    plasma::PlasmaAllocator::SetFootprintLimit(static_cast<size_t>(system_memory));
+    ARROW_LOG(INFO) << "Allowing the Plasma store to use up to "
+                    << static_cast<double>(system_memory) / 1000000000 << "GB of memory.";
+  }
+
+  // Sanity check command line options.
+  if (socket_name == nullptr && system_memory == -1) {
+    // Nicer error message for the case where the user ran the program without
+    // any of the required command-line switches.
+    plasma::ExitWithUsageError(
+        "please specify socket for incoming connections with -s, "
+        "and the amount of memory (in bytes) to use with -m");
+  } else if (socket_name == nullptr) {
+    plasma::ExitWithUsageError("please specify socket for incoming connections with -s");
+  } else if (system_memory == -1) {
+    plasma::ExitWithUsageError(
+        "please specify the amount of memory (in bytes) to use with -m");
+  }
+  if (hugepages_enabled && plasma_directory.empty()) {
+    plasma::ExitWithUsageError(
+        "if you want to use hugepages, please specify path to huge pages "
+        "filesystem with -d");
+  }
+  ARROW_CHECK(!plasma_directory.empty());
+  ARROW_LOG(INFO) << "Starting object store with directory " << plasma_directory
+                  << " and huge page support "
+                  << (hugepages_enabled ? "enabled" : "disabled");
+
+#ifdef __linux__
+  if (!hugepages_enabled) {
+    // On Linux, check that the amount of memory available in /dev/shm is large
+    // enough to accommodate the request. If it isn't, then fail.
+    int shm_fd = open(plasma_directory.c_str(), O_RDONLY);
+    struct statvfs shm_vfs_stats;
+    fstatvfs(shm_fd, &shm_vfs_stats);
+    // The value shm_vfs_stats.f_bsize is the block size, and the value
+    // shm_vfs_stats.f_bavail is the number of available blocks.
+    int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail;
+    close(shm_fd);
+    // Keep some safety margin for allocator fragmentation.
+    shm_mem_avail = 9 * shm_mem_avail / 10;
+    if (system_memory > shm_mem_avail) {
+      ARROW_LOG(WARNING)
+          << "System memory request exceeds memory available in " << plasma_directory
+          << ". The request is for " << system_memory
+          << " bytes, and the amount available is " << shm_mem_avail
+          << " bytes. You may be able to free up space by deleting files in "
+             "/dev/shm. If you are inside a Docker container, you may need to "
+             "pass an argument with the flag '--shm-size' to 'docker run'.";
+      system_memory = shm_mem_avail;
+    }
+  } else {
+    plasma::SetMallocGranularity(1024 * 1024 * 1024);  // 1 GiB
+  }
+#endif
+
+  // Get external store
+  std::shared_ptr<plasma::ExternalStore> external_store{nullptr};
+  if (!external_store_endpoint.empty()) {
+    std::string name;
+    ARROW_CHECK_OK(
+        plasma::ExternalStores::ExtractStoreName(external_store_endpoint, &name));
+    external_store = plasma::ExternalStores::GetStore(name);
+    if (external_store == nullptr) {
+      std::ostringstream error_msg;
+      error_msg << "no such external store \"" << name << "\"";
+      plasma::ExitWithUsageError(error_msg.str().c_str());
+    }
+    ARROW_LOG(DEBUG) << "connecting to external store...";
+    ARROW_CHECK_OK(external_store->Connect(external_store_endpoint));
+  }
+
+  ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
+  plasma::StartServer(socket_name, plasma_directory, hugepages_enabled, external_store);
+  plasma::g_runner->Shutdown();
+  plasma::g_runner = nullptr;
+
+  ArrowLog::UninstallSignalAction();
+  ArrowLog::ShutDownArrowLog();
+  return 0;
+}