1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #pragma once
19 
20 #include <errno.h>
21 #include <inttypes.h>
22 #include <stdbool.h>
23 #include <stddef.h>
24 #include <stdio.h>
25 #include <stdlib.h>
26 #include <string.h>
27 #include <unistd.h>  // pid_t
28 
29 #include <memory>
30 #include <string>
31 #include <unordered_map>
32 #include <unordered_set>
33 #include <vector>
34 
35 #include "plasma/compat.h"
36 
37 #include "arrow/status.h"
38 #include "arrow/util/logging.h"
39 #include "arrow/util/macros.h"
40 #include "plasma/common.h"
41 
42 #ifdef PLASMA_CUDA
43 using arrow::cuda::CudaIpcMemHandle;
44 #endif
45 
46 namespace plasma {
47 
48 namespace flatbuf {
49 struct ObjectInfoT;
50 }  // namespace flatbuf
51 
52 #define HANDLE_SIGPIPE(s, fd_)                                              \
53   do {                                                                      \
54     Status _s = (s);                                                        \
55     if (!_s.ok()) {                                                         \
56       if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {        \
57         ARROW_LOG(WARNING)                                                  \
58             << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " \
59                "sending a message to client on fd "                         \
60             << fd_                                                          \
61             << ". "                                                         \
62                "The client on the other end may have hung up.";             \
63       } else {                                                              \
64         return _s;                                                          \
65       }                                                                     \
66     }                                                                       \
67   } while (0);
68 
69 /// Allocation granularity used in plasma for object allocation.
70 constexpr int64_t kBlockSize = 64;
71 
72 /// Contains all information that is associated with a Plasma store client.
73 struct Client {
74   explicit Client(int fd);
75 
76   /// The file descriptor used to communicate with the client.
77   int fd;
78 
79   /// Object ids that are used by this client.
80   std::unordered_set<ObjectID> object_ids;
81 
82   /// File descriptors that are used by this client.
83   std::unordered_set<int> used_fds;
84 
85   /// The file descriptor used to push notifications to client. This is only valid
86   /// if client subscribes to plasma store. -1 indicates invalid.
87   int notification_fd;
88 
89   std::string name = "anonymous_client";
90 };
91 
92 // TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
93 struct PlasmaObject {
94 #ifdef PLASMA_CUDA
95   // IPC handle for Cuda.
96   std::shared_ptr<CudaIpcMemHandle> ipc_handle;
97 #endif
98   /// The file descriptor of the memory mapped file in the store. It is used as
99   /// a unique identifier of the file in the client to look up the corresponding
100   /// file descriptor on the client's side.
101   int store_fd;
102   /// The offset in bytes in the memory mapped file of the data.
103   ptrdiff_t data_offset;
104   /// The offset in bytes in the memory mapped file of the metadata.
105   ptrdiff_t metadata_offset;
106   /// The size in bytes of the data.
107   int64_t data_size;
108   /// The size in bytes of the metadata.
109   int64_t metadata_size;
110   /// Device number object is on.
111   int device_num;
112 
113   bool operator==(const PlasmaObject& other) const {
114     return (
115 #ifdef PLASMA_CUDA
116         (ipc_handle == other.ipc_handle) &&
117 #endif
118         (store_fd == other.store_fd) && (data_offset == other.data_offset) &&
119         (metadata_offset == other.metadata_offset) && (data_size == other.data_size) &&
120         (metadata_size == other.metadata_size) && (device_num == other.device_num));
121   }
122 };
123 
124 enum class ObjectStatus : int {
125   /// The object was not found.
126   OBJECT_NOT_FOUND = 0,
127   /// The object was found.
128   OBJECT_FOUND = 1
129 };
130 
131 /// The plasma store information that is exposed to the eviction policy.
132 struct PlasmaStoreInfo {
133   /// Objects that are in the Plasma store.
134   ObjectTable objects;
135   /// Boolean flag indicating whether to start the object store with hugepages
136   /// support enabled. Huge pages are substantially larger than normal memory
137   /// pages (e.g. 2MB or 1GB instead of 4KB) and using them can reduce
138   /// bookkeeping overhead from the OS.
139   bool hugepages_enabled;
140   /// A (platform-dependent) directory where to create the memory-backed file.
141   std::string directory;
142 };
143 
144 /// Get an entry from the object table and return NULL if the object_id
145 /// is not present.
146 ///
147 /// \param store_info The PlasmaStoreInfo that contains the object table.
148 /// \param object_id The object_id of the entry we are looking for.
149 /// \return The entry associated with the object_id or NULL if the object_id
150 ///         is not present.
151 ObjectTableEntry* GetObjectTableEntry(PlasmaStoreInfo* store_info,
152                                       const ObjectID& object_id);
153 
154 /// Print a warning if the status is less than zero. This should be used to check
155 /// the success of messages sent to plasma clients. We print a warning instead of
156 /// failing because the plasma clients are allowed to die. This is used to handle
157 /// situations where the store writes to a client file descriptor, and the client
158 /// may already have disconnected. If we have processed the disconnection and
159 /// closed the file descriptor, we should get a BAD FILE DESCRIPTOR error. If we
160 /// have not, then we should get a SIGPIPE. If we write to a TCP socket that
161 /// isn't connected yet, then we should get an ECONNRESET.
162 ///
163 /// \param status The status to check. If it is less less than zero, we will
164 ///        print a warning.
165 /// \param client_sock The client socket. This is just used to print some extra
166 ///        information.
167 /// \return The errno set.
168 int WarnIfSigpipe(int status, int client_sock);
169 
170 std::unique_ptr<uint8_t[]> CreateObjectInfoBuffer(flatbuf::ObjectInfoT* object_info);
171 
172 std::unique_ptr<uint8_t[]> CreatePlasmaNotificationBuffer(
173     std::vector<flatbuf::ObjectInfoT>& object_info);
174 
175 }  // namespace plasma
176