1 // Licensed to the Apache Software Foundation (ASF) under one 2 // or more contributor license agreements. See the NOTICE file 3 // distributed with this work for additional information 4 // regarding copyright ownership. The ASF licenses this file 5 // to you under the Apache License, Version 2.0 (the 6 // "License"); you may not use this file except in compliance 7 // with the License. You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, 12 // software distributed under the License is distributed on an 13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14 // KIND, either express or implied. See the License for the 15 // specific language governing permissions and limitations 16 // under the License. 17 18 #pragma once 19 20 #include <errno.h> 21 #include <inttypes.h> 22 #include <stdbool.h> 23 #include <stddef.h> 24 #include <stdio.h> 25 #include <stdlib.h> 26 #include <string.h> 27 #include <unistd.h> // pid_t 28 29 #include <memory> 30 #include <string> 31 #include <unordered_map> 32 #include <unordered_set> 33 #include <vector> 34 35 #include "plasma/compat.h" 36 37 #include "arrow/status.h" 38 #include "arrow/util/logging.h" 39 #include "arrow/util/macros.h" 40 #include "plasma/common.h" 41 42 #ifdef PLASMA_CUDA 43 using arrow::cuda::CudaIpcMemHandle; 44 #endif 45 46 namespace plasma { 47 48 namespace flatbuf { 49 struct ObjectInfoT; 50 } // namespace flatbuf 51 52 #define HANDLE_SIGPIPE(s, fd_) \ 53 do { \ 54 Status _s = (s); \ 55 if (!_s.ok()) { \ 56 if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) { \ 57 ARROW_LOG(WARNING) \ 58 << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " \ 59 "sending a message to client on fd " \ 60 << fd_ \ 61 << ". " \ 62 "The client on the other end may have hung up."; \ 63 } else { \ 64 return _s; \ 65 } \ 66 } \ 67 } while (0); 68 69 /// Allocation granularity used in plasma for object allocation. 70 constexpr int64_t kBlockSize = 64; 71 72 /// Contains all information that is associated with a Plasma store client. 73 struct Client { 74 explicit Client(int fd); 75 76 /// The file descriptor used to communicate with the client. 77 int fd; 78 79 /// Object ids that are used by this client. 80 std::unordered_set<ObjectID> object_ids; 81 82 /// File descriptors that are used by this client. 83 std::unordered_set<int> used_fds; 84 85 /// The file descriptor used to push notifications to client. This is only valid 86 /// if client subscribes to plasma store. -1 indicates invalid. 87 int notification_fd; 88 89 std::string name = "anonymous_client"; 90 }; 91 92 // TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec. 93 struct PlasmaObject { 94 #ifdef PLASMA_CUDA 95 // IPC handle for Cuda. 96 std::shared_ptr<CudaIpcMemHandle> ipc_handle; 97 #endif 98 /// The file descriptor of the memory mapped file in the store. It is used as 99 /// a unique identifier of the file in the client to look up the corresponding 100 /// file descriptor on the client's side. 101 int store_fd; 102 /// The offset in bytes in the memory mapped file of the data. 103 ptrdiff_t data_offset; 104 /// The offset in bytes in the memory mapped file of the metadata. 105 ptrdiff_t metadata_offset; 106 /// The size in bytes of the data. 107 int64_t data_size; 108 /// The size in bytes of the metadata. 109 int64_t metadata_size; 110 /// Device number object is on. 111 int device_num; 112 113 bool operator==(const PlasmaObject& other) const { 114 return ( 115 #ifdef PLASMA_CUDA 116 (ipc_handle == other.ipc_handle) && 117 #endif 118 (store_fd == other.store_fd) && (data_offset == other.data_offset) && 119 (metadata_offset == other.metadata_offset) && (data_size == other.data_size) && 120 (metadata_size == other.metadata_size) && (device_num == other.device_num)); 121 } 122 }; 123 124 enum class ObjectStatus : int { 125 /// The object was not found. 126 OBJECT_NOT_FOUND = 0, 127 /// The object was found. 128 OBJECT_FOUND = 1 129 }; 130 131 /// The plasma store information that is exposed to the eviction policy. 132 struct PlasmaStoreInfo { 133 /// Objects that are in the Plasma store. 134 ObjectTable objects; 135 /// Boolean flag indicating whether to start the object store with hugepages 136 /// support enabled. Huge pages are substantially larger than normal memory 137 /// pages (e.g. 2MB or 1GB instead of 4KB) and using them can reduce 138 /// bookkeeping overhead from the OS. 139 bool hugepages_enabled; 140 /// A (platform-dependent) directory where to create the memory-backed file. 141 std::string directory; 142 }; 143 144 /// Get an entry from the object table and return NULL if the object_id 145 /// is not present. 146 /// 147 /// \param store_info The PlasmaStoreInfo that contains the object table. 148 /// \param object_id The object_id of the entry we are looking for. 149 /// \return The entry associated with the object_id or NULL if the object_id 150 /// is not present. 151 ObjectTableEntry* GetObjectTableEntry(PlasmaStoreInfo* store_info, 152 const ObjectID& object_id); 153 154 /// Print a warning if the status is less than zero. This should be used to check 155 /// the success of messages sent to plasma clients. We print a warning instead of 156 /// failing because the plasma clients are allowed to die. This is used to handle 157 /// situations where the store writes to a client file descriptor, and the client 158 /// may already have disconnected. If we have processed the disconnection and 159 /// closed the file descriptor, we should get a BAD FILE DESCRIPTOR error. If we 160 /// have not, then we should get a SIGPIPE. If we write to a TCP socket that 161 /// isn't connected yet, then we should get an ECONNRESET. 162 /// 163 /// \param status The status to check. If it is less less than zero, we will 164 /// print a warning. 165 /// \param client_sock The client socket. This is just used to print some extra 166 /// information. 167 /// \return The errno set. 168 int WarnIfSigpipe(int status, int client_sock); 169 170 std::unique_ptr<uint8_t[]> CreateObjectInfoBuffer(flatbuf::ObjectInfoT* object_info); 171 172 std::unique_ptr<uint8_t[]> CreatePlasmaNotificationBuffer( 173 std::vector<flatbuf::ObjectInfoT>& object_info); 174 175 } // namespace plasma 176