1 // Copyright (c) 2011 Bryce Lelbach 2 // Copyright (c) 2011-2016 Hartmut Kaiser 3 // 4 // Distributed under the Boost Software License, Version 1.0. (See accompanying 5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 7 #if !defined(HPX_97FC0FA2_E773_4F83_8477_806EC68C2253) 8 #define HPX_97FC0FA2_E773_4F83_8477_806EC68C2253 9 10 #include <hpx/config.hpp> 11 12 #include <hpx/apply.hpp> 13 #include <hpx/async.hpp> 14 #include <hpx/components/iostreams/manipulators.hpp> 15 #include <hpx/components/iostreams/server/output_stream.hpp> 16 #include <hpx/runtime/components/client_base.hpp> 17 #include <hpx/util/register_locks.hpp> 18 19 #include <boost/iostreams/stream.hpp> 20 21 #include <atomic> 22 #include <cstdint> 23 #include <ios> 24 #include <iostream> 25 #include <iterator> 26 #include <mutex> 27 #include <sstream> 28 #include <string> 29 #include <utility> 30 #include <vector> 31 32 namespace hpx { namespace iostreams 33 { 34 /////////////////////////////////////////////////////////////////////////// 35 namespace detail 36 { 37 template <typename Char = char> 38 struct buffer_sink; 39 } 40 41 template <typename Char = char, typename Sink = detail::buffer_sink<Char> > 42 struct ostream; 43 44 /////////////////////////////////////////////////////////////////////////// 45 namespace detail 46 { 47 /////////////////////////////////////////////////////////////////////// 48 // Tag types to be used to identify standard ostream objects 49 struct cout_tag {}; 50 struct cerr_tag {}; 51 52 struct consolestream_tag {}; 53 54 /////////////////////////////////////////////////////////////////////// 55 /// This is a Boost.IoStreams Sink that can be used to create an 56 /// [io]stream on top of a detail::buffer. 57 template <typename Char> 58 struct buffer_sink 59 { 60 typedef Char char_type; 61 62 struct category 63 : boost::iostreams::sink_tag, 64 boost::iostreams::flushable_tag 65 {}; 66 buffer_sinkhpx::iostreams::detail::buffer_sink67 explicit buffer_sink(ostream<Char, buffer_sink>& os) 68 : os_(os) 69 {} 70 71 // Write up to n characters to the underlying data sink into the 72 // buffer s, returning the number of characters written. 73 inline std::streamsize write(char_type const* s, std::streamsize n); 74 75 // Make sure all content is sent to console 76 inline bool flush(); 77 78 private: 79 ostream<Char, buffer_sink>& os_; 80 }; 81 82 /////////////////////////////////////////////////////////////////////// 83 template <typename Char = char, typename Sink = buffer_sink<Char> > 84 struct ostream_creator 85 { 86 typedef std::back_insert_iterator<std::vector<Char> > iterator_type; 87 typedef Sink device_type; 88 typedef boost::iostreams::stream<device_type> stream_type; 89 }; 90 91 /////////////////////////////////////////////////////////////////////// get_outstream(cout_tag)92 inline std::ostream& get_outstream(cout_tag) 93 { 94 return std::cout; 95 } 96 get_outstream(cerr_tag)97 inline std::ostream& get_outstream(cerr_tag) 98 { 99 return std::cerr; 100 } 101 102 std::stringstream& get_consolestream(); 103 get_outstream(consolestream_tag)104 inline std::ostream& get_outstream(consolestream_tag) 105 { 106 return get_consolestream(); 107 } 108 get_outstream_name(cout_tag)109 inline char const* get_outstream_name(cout_tag) 110 { 111 return "/locality#console/output_stream#cout"; 112 } 113 get_outstream_name(cerr_tag)114 inline char const* get_outstream_name(cerr_tag) 115 { 116 return "/locality#console/output_stream#cerr"; 117 } 118 get_outstream_name(consolestream_tag)119 inline char const* get_outstream_name(consolestream_tag) 120 { 121 return "/locality#console/output_stream#consolestream"; 122 } 123 124 /////////////////////////////////////////////////////////////////////// 125 hpx::future<naming::id_type> 126 create_ostream(char const* name, std::ostream& strm); 127 128 template <typename Tag> create_ostream(Tag tag)129 hpx::future<naming::id_type> create_ostream(Tag tag) 130 { 131 return create_ostream(get_outstream_name(tag), 132 detail::get_outstream(tag)); 133 } 134 135 // /////////////////////////////////////////////////////////////////////// 136 // void release_ostream(char const* name, naming::id_type const& id); 137 // 138 // template <typename Tag> 139 // void release_ostream(Tag tag, naming::id_type const& id) 140 // { 141 // release_ostream(get_outstream_name(tag), id); 142 // } 143 144 /////////////////////////////////////////////////////////////////////// 145 void register_ostreams(); 146 void unregister_ostreams(); 147 } 148 149 /////////////////////////////////////////////////////////////////////////// 150 template <typename Char, typename Sink> 151 struct ostream 152 : components::client_base<ostream<Char, Sink>, server::output_stream> 153 , detail::buffer 154 , detail::ostream_creator<Char, Sink>::stream_type 155 { 156 HPX_NON_COPYABLE(ostream); 157 158 private: 159 typedef components::client_base<ostream, server::output_stream> base_type; 160 161 typedef detail::ostream_creator<Char, Sink> ostream_creator; 162 typedef typename ostream_creator::stream_type stream_base_type; 163 typedef typename ostream_creator::iterator_type iterator_type; 164 165 typedef typename stream_base_type::traits_type stream_traits_type; 166 typedef BOOST_IOSTREAMS_BASIC_OSTREAM(Char, stream_traits_type) std_stream_type; 167 typedef detail::buffer::mutex_type mutex_type; 168 169 private: 170 using detail::buffer::mtx_; 171 std::atomic<std::uint64_t> generational_count_; 172 173 // Performs a lazy streaming operation. 174 template <typename T> streaming_operator_lazyhpx::iostreams::ostream175 ostream& streaming_operator_lazy(T const& subject) 176 { // {{{ 177 // apply the subject to the local stream 178 *static_cast<stream_base_type*>(this) << subject; 179 return *this; 180 } // }}} 181 182 // Performs an asynchronous streaming operation. 183 template <typename T, typename Lock> streaming_operator_asynchpx::iostreams::ostream184 ostream& streaming_operator_async(T const& subject, Lock& l) 185 { // {{{ 186 // apply the subject to the local stream 187 *static_cast<stream_base_type*>(this) << subject; 188 189 // If the buffer isn't empty, send it asynchronously to the 190 // destination. 191 if (!this->detail::buffer::empty_locked()) 192 { 193 // Create the next buffer, returns the previous buffer 194 buffer next = this->detail::buffer::init_locked(); 195 196 // Unlock the mutex before we cleanup. 197 l.unlock(); 198 199 // Perform the write operation, then destroy the old buffer and 200 // stream. 201 typedef server::output_stream::write_async_action action_type; 202 hpx::apply<action_type>(this->get_id(), hpx::get_locality_id(), 203 generational_count_++, next); 204 } 205 206 return *this; 207 } // }}} 208 209 // Performs a synchronous streaming operation. 210 template <typename T, typename Lock> streaming_operator_synchpx::iostreams::ostream211 ostream& streaming_operator_sync(T const& subject, Lock& l) 212 { // {{{ 213 // apply the subject to the local stream 214 *static_cast<stream_base_type*>(this) << subject; 215 216 // If the buffer isn't empty, send it to the destination. 217 if (!this->detail::buffer::empty_locked()) 218 { 219 // Create the next buffer, returns the previous buffer 220 buffer next = this->detail::buffer::init_locked(); 221 222 // Unlock the mutex before we cleanup. 223 l.unlock(); 224 225 // Perform the write operation, then destroy the old buffer and 226 // stream. 227 typedef server::output_stream::write_sync_action action_type; 228 hpx::async<action_type>(this->get_id(), hpx::get_locality_id(), 229 generational_count_++, next).get(); 230 } 231 else 232 { 233 l.unlock(); // must unlock in any case 234 } 235 return *this; 236 } // }}} 237 238 /////////////////////////////////////////////////////////////////////// 239 friend struct detail::buffer_sink<char>; 240 flushhpx::iostreams::ostream241 bool flush() 242 { 243 std::unique_lock<mutex_type> l(*mtx_); 244 if (!this->detail::buffer::empty_locked()) 245 { 246 // Create the next buffer, returns the previous buffer 247 buffer next = this->detail::buffer::init_locked(); 248 249 // Unlock the mutex before we cleanup. 250 l.unlock(); 251 252 // since mtx_ is recursive and apply will do an AGAS lookup, 253 // we need to ignore the lock here in case we are called 254 // recursively 255 hpx::util::ignore_while_checking<std::unique_lock<mutex_type> > 256 il(&l); 257 258 // Perform the write operation, then destroy the old buffer and 259 // stream. 260 typedef server::output_stream::write_async_action action_type; 261 hpx::apply<action_type>(this->get_id(), hpx::get_locality_id(), 262 generational_count_++, next); 263 } 264 return true; 265 } 266 267 /////////////////////////////////////////////////////////////////////// 268 friend void detail::register_ostreams(); 269 friend void detail::unregister_ostreams(); 270 271 // late initialization during runtime system startup 272 template <typename Tag> initializehpx::iostreams::ostream273 void initialize(Tag tag) 274 { 275 *static_cast<base_type*>(this) = detail::create_ostream(tag); 276 } 277 278 // reset this object during runtime system shutdown 279 template <typename Tag> uninitializehpx::iostreams::ostream280 void uninitialize(Tag tag) 281 { 282 std::unique_lock<mutex_type> l(*mtx_, std::try_to_lock); 283 if (l) 284 { 285 streaming_operator_sync(hpx::async_flush, l); // unlocks 286 } 287 288 // FIXME: find a later spot to invoke this 289 // detail::release_ostream(tag, this->get_id()); 290 this->base_type::free(); 291 } 292 293 public: ostreamhpx::iostreams::ostream294 ostream() 295 : base_type() 296 , buffer() 297 , stream_base_type(*this) 298 , generational_count_(0) 299 {} 300 301 // hpx::flush manipulator operator <<hpx::iostreams::ostream302 ostream& operator<<(hpx::iostreams::flush_type const& m) 303 { 304 std::unique_lock<mutex_type> l(*mtx_); 305 return streaming_operator_sync(m, l); 306 } 307 308 // hpx::endl manipulator operator <<hpx::iostreams::ostream309 ostream& operator<<(hpx::iostreams::endl_type const& m) 310 { 311 std::unique_lock<mutex_type> l(*mtx_); 312 return streaming_operator_sync(m, l); 313 } 314 315 // hpx::async_flush manipulator operator <<hpx::iostreams::ostream316 ostream& operator<<(hpx::iostreams::async_flush_type const& m) 317 { 318 std::unique_lock<mutex_type> l(*mtx_); 319 return streaming_operator_async(m, l); 320 } 321 322 // hpx::async_endl manipulator operator <<hpx::iostreams::ostream323 ostream& operator<<(hpx::iostreams::async_endl_type const& m) 324 { 325 std::unique_lock<mutex_type> l(*mtx_); 326 return streaming_operator_async(m, l); 327 } 328 329 /////////////////////////////////////////////////////////////////////// 330 template <typename T> operator <<hpx::iostreams::ostream331 ostream& operator<<(T const& subject) 332 { 333 std::lock_guard<mutex_type> l(*mtx_); 334 return streaming_operator_lazy(subject); 335 } 336 337 /////////////////////////////////////////////////////////////////////// operator <<hpx::iostreams::ostream338 ostream& operator<<(std_stream_type& (*manip_fun)(std_stream_type&)) 339 { 340 std::unique_lock<mutex_type> l(*mtx_); 341 util::ignore_while_checking<std::unique_lock<mutex_type> > ignore(&l); 342 return streaming_operator_lazy(manip_fun); 343 } 344 }; 345 346 /////////////////////////////////////////////////////////////////////////// 347 namespace detail 348 { 349 template <typename Char> write(Char const * s,std::streamsize n)350 inline std::streamsize buffer_sink<Char>::write( 351 Char const* s, std::streamsize n) 352 { 353 return static_cast<buffer&>(os_).write(s, n); 354 } 355 356 template <typename Char> flush()357 inline bool buffer_sink<Char>::flush() 358 { 359 return os_.flush(); 360 } 361 } 362 }} 363 364 #endif // HPX_97FC0FA2_E773_4F83_8477_806EC68C2253 365 366