1 /**
2  * Copyright (c) 2014-present, Facebook, Inc.
3  *
4  * This source code is licensed under the MIT license found in the
5  * LICENSE file in the root directory of this source tree.
6  */
7 
8 #pragma once
9 
10 #include <folly/Expected.h>
11 #include <folly/Range.h>
12 
13 #include <fbzmq/zmq/Common.h>
14 
15 namespace fbzmq {
16 
17 // forward declaration of detail::SocketImpl
18 namespace detail {
19 class SocketImpl;
20 }
21 
22 /**
23  * Wrapper over zmq_msg_t with lot of convenience methods for creating from
24  * various data types and reading binary blob into various data types.
25  *
26  * Supported convenience types are
27  * - primitive types (int, double, bool)
28  * - string
29  * - thrift objects
30  */
31 class Message {
32  public:
33   Message() noexcept;
34   ~Message() noexcept;
35 
36   /**
37    * Factory methods - contruct message by various means
38    */
39 
40   /**
41    * Allocate message, content undefined (to be written by user)
42    */
43   static folly::Expected<Message, Error> allocate(size_t size) noexcept;
44 
45   /**
46    * Wrap existing IOBuf. Notice that this does not copy buffer, rather adopts
47    * its content. The IOBuf pointer will be released when Message destructs.
48    */
49   static folly::Expected<Message, Error> wrapBuffer(
50       std::unique_ptr<folly::IOBuf> buf) noexcept;
51 
52   /**
53    * construct message from Thrift object using supplied serializer
54    */
55   template <typename ThriftType, typename Serializer>
56   static folly::Expected<Message, Error>
fromThriftObj(ThriftType const & obj,Serializer & serializer)57   fromThriftObj(ThriftType const& obj, Serializer& serializer) noexcept {
58     return wrapBuffer(util::writeThriftObj(obj, serializer));
59   }
60 
61   /**
62    * Construct message from fundamental type
63    */
64   template <
65       typename T,
66       std::enable_if_t<std::is_fundamental<std::decay_t<T>>::value>* = nullptr>
67   static folly::Expected<Message, Error>
from(T obj)68   from(T obj) noexcept {
69     return allocate(sizeof(T)).then([&obj](Message&& msg) {
70       ::memcpy(msg.writeableData().data(), &obj, sizeof(T));
71       return std::move(msg);
72     });
73   }
74 
75   /**
76    * Construct message by copying string contents
77    */
78   static folly::Expected<Message, Error>
from(std::string const & str)79   from(std::string const& str) noexcept {
80     return Message::wrapBuffer(folly::IOBuf::copyBuffer(str));
81   }
82 
83   /**
84    * Read concrete type from message. All methods treat message atomically.
85    * Thus, if you try to read<bool> and msg.size() != sizeof(bool) you will
86    * get back EPROTO error
87    */
88 
89   /**
90    * Read fundamental type. Message size must match sizeof(T)
91    */
92   template <
93       typename T,
94       std::enable_if_t<std::is_fundamental<std::decay_t<T>>::value>* = nullptr>
95   folly::Expected<T, Error>
read()96   read() const noexcept {
97     if (sizeof(T) != size()) {
98       return folly::makeUnexpected(Error(EPROTO));
99     }
100     T obj;
101     ::memcpy(&obj, reinterpret_cast<const void*>(data().data()), size());
102     return obj;
103   }
104 
105   /**
106    * Read into string
107    */
108   template <
109       typename T,
110       std::enable_if_t<std::is_same<std::decay_t<T>, std::string>::value>* =
111           nullptr>
112   folly::Expected<T, Error>
read()113   read() noexcept {
114     return std::string(reinterpret_cast<const char*>(data().data()), size());
115   }
116 
117   /**
118    * Read thrift object from message payload
119    */
120   template <typename ThriftType, typename Serializer>
121   folly::Expected<ThriftType, Error>
readThriftObj(Serializer & serializer)122   readThriftObj(Serializer& serializer) const noexcept {
123     auto buf = folly::IOBuf::wrapBufferAsValue(data());
124     try {
125       return util::readThriftObj<ThriftType>(buf, serializer);
126     } catch (std::exception const& e) {
127       LOG(ERROR) << "Failed to serialize thrift object. "
128                  << "Exception: " << folly::exceptionStr(e) << "Received: "
129                  << folly::humanify(std::string(
130                         reinterpret_cast<const char*>(data().data()), size()));
131     }
132     return folly::makeUnexpected(Error(EPROTO));
133   }
134 
135   /**
136    * Message is movable and copyable
137    */
138 
139   Message& operator=(Message&& other) noexcept;
140   Message(Message&& other) noexcept;
141 
142   Message(Message const& other) noexcept;
143   Message& operator=(Message const&) noexcept;
144 
145   /**
146    * True if last message in sequence. Only valid for message received
147    * from the wire, not locally constructed
148    */
149   bool isLast() const noexcept;
150 
151   /**
152    * Grab pointer to raw data (mutable)
153    */
154   folly::MutableByteRange writeableData() noexcept;
155 
156   /**
157    * Ditto but read-only, yo
158    */
159   folly::ByteRange data() const noexcept;
160 
161   /**
162    * Size of data contained within message
163    */
164   size_t size() const noexcept;
165 
166   /**
167    * Convenient function to make check message.size() == 0
168    */
169   bool empty() const noexcept;
170 
171   /**
172    * Retrieve the int value of a property for this message
173    */
174   folly::Expected<int, Error> getProperty(int property) const noexcept;
175 
176   /**
177    * Retrieve the string value of a metadata property for this message
178    */
179   folly::Expected<std::string, Error> getMetadataProperty(
180       std::string const& property) const noexcept;
181 
182  private:
183   friend class detail::SocketImpl;
184 
185   // we wrap zmq message
186   zmq_msg_t msg_;
187 };
188 
189 } // namespace fbzmq
190