1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements.  See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership.  The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License.  You may obtain a copy of the License at
8 //
9 //   http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied.  See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #include "plasma/plasma.h"
19 
20 #include <sys/socket.h>
21 #include <sys/types.h>
22 #include <unistd.h>
23 
24 #include "plasma/common.h"
25 #include "plasma/common_generated.h"
26 #include "plasma/protocol.h"
27 
28 namespace fb = plasma::flatbuf;
29 
30 namespace plasma {
31 
ObjectTableEntry()32 ObjectTableEntry::ObjectTableEntry() : pointer(nullptr), ref_count(0) {}
33 
~ObjectTableEntry()34 ObjectTableEntry::~ObjectTableEntry() { pointer = nullptr; }
35 
WarnIfSigpipe(int status,int client_sock)36 int WarnIfSigpipe(int status, int client_sock) {
37   if (status >= 0) {
38     return 0;
39   }
40   if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {
41     ARROW_LOG(WARNING) << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when "
42                           "sending a message to client on fd "
43                        << client_sock
44                        << ". The client on the other end may "
45                           "have hung up.";
46     return errno;
47   }
48   ARROW_LOG(FATAL) << "Failed to write message to client on fd " << client_sock << ".";
49   return -1;  // This is never reached.
50 }
51 
52 /**
53  * This will create a new ObjectInfo buffer. The first sizeof(int64_t) bytes
54  * of this buffer are the length of the remaining message and the
55  * remaining message is a serialized version of the object info.
56  *
57  * \param object_info The object info to be serialized
58  * \return The object info buffer. It is the caller's responsibility to free
59  *         this buffer with "delete" after it has been used.
60  */
CreateObjectInfoBuffer(fb::ObjectInfoT * object_info)61 std::unique_ptr<uint8_t[]> CreateObjectInfoBuffer(fb::ObjectInfoT* object_info) {
62   flatbuffers::FlatBufferBuilder fbb;
63   auto message = fb::CreateObjectInfo(fbb, object_info);
64   fbb.Finish(message);
65   auto notification =
66       std::unique_ptr<uint8_t[]>(new uint8_t[sizeof(int64_t) + fbb.GetSize()]);
67   *(reinterpret_cast<int64_t*>(notification.get())) = fbb.GetSize();
68   memcpy(notification.get() + sizeof(int64_t), fbb.GetBufferPointer(), fbb.GetSize());
69   return notification;
70 }
71 
CreatePlasmaNotificationBuffer(std::vector<fb::ObjectInfoT> & object_info)72 std::unique_ptr<uint8_t[]> CreatePlasmaNotificationBuffer(
73     std::vector<fb::ObjectInfoT>& object_info) {
74   flatbuffers::FlatBufferBuilder fbb;
75   std::vector<flatbuffers::Offset<plasma::flatbuf::ObjectInfo>> info;
76   for (size_t i = 0; i < object_info.size(); ++i) {
77     info.push_back(fb::CreateObjectInfo(fbb, &object_info[i]));
78   }
79 
80   auto info_array = fbb.CreateVector(info);
81   auto message = fb::CreatePlasmaNotification(fbb, info_array);
82   fbb.Finish(message);
83   auto notification =
84       std::unique_ptr<uint8_t[]>(new uint8_t[sizeof(int64_t) + fbb.GetSize()]);
85   *(reinterpret_cast<int64_t*>(notification.get())) = fbb.GetSize();
86   memcpy(notification.get() + sizeof(int64_t), fbb.GetBufferPointer(), fbb.GetSize());
87   return notification;
88 }
89 
GetObjectTableEntry(PlasmaStoreInfo * store_info,const ObjectID & object_id)90 ObjectTableEntry* GetObjectTableEntry(PlasmaStoreInfo* store_info,
91                                       const ObjectID& object_id) {
92   auto it = store_info->objects.find(object_id);
93   if (it == store_info->objects.end()) {
94     return NULL;
95   }
96   return it->second.get();
97 }
98 
99 }  // namespace plasma
100