1 //  Copyright (c) 2011 Bryce Lelbach
2 //  Copyright (c) 2011-2016 Hartmut Kaiser
3 //
4 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
5 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 
7 #if !defined(HPX_97FC0FA2_E773_4F83_8477_806EC68C2253)
8 #define HPX_97FC0FA2_E773_4F83_8477_806EC68C2253
9 
10 #include <hpx/config.hpp>
11 
12 #include <hpx/apply.hpp>
13 #include <hpx/async.hpp>
14 #include <hpx/components/iostreams/manipulators.hpp>
15 #include <hpx/components/iostreams/server/output_stream.hpp>
16 #include <hpx/runtime/components/client_base.hpp>
17 #include <hpx/util/register_locks.hpp>
18 
19 #include <boost/iostreams/stream.hpp>
20 
21 #include <atomic>
22 #include <cstdint>
23 #include <ios>
24 #include <iostream>
25 #include <iterator>
26 #include <mutex>
27 #include <sstream>
28 #include <string>
29 #include <utility>
30 #include <vector>
31 
32 namespace hpx { namespace iostreams
33 {
34     ///////////////////////////////////////////////////////////////////////////
35     namespace detail
36     {
37         template <typename Char = char>
38         struct buffer_sink;
39     }
40 
41     template <typename Char = char, typename Sink = detail::buffer_sink<Char> >
42     struct ostream;
43 
44     ///////////////////////////////////////////////////////////////////////////
45     namespace detail
46     {
47         ///////////////////////////////////////////////////////////////////////
48         // Tag types to be used to identify standard ostream objects
49         struct cout_tag {};
50         struct cerr_tag {};
51 
52         struct consolestream_tag {};
53 
54         ///////////////////////////////////////////////////////////////////////
55         /// This is a Boost.IoStreams Sink that can be used to create an
56         /// [io]stream on top of a detail::buffer.
57         template <typename Char>
58         struct buffer_sink
59         {
60             typedef Char char_type;
61 
62             struct category
63               : boost::iostreams::sink_tag,
64                 boost::iostreams::flushable_tag
65             {};
66 
buffer_sinkhpx::iostreams::detail::buffer_sink67             explicit buffer_sink(ostream<Char, buffer_sink>& os)
68               : os_(os)
69             {}
70 
71             // Write up to n characters to the underlying data sink into the
72             // buffer s, returning the number of characters written.
73             inline std::streamsize write(char_type const* s, std::streamsize n);
74 
75             // Make sure all content is sent to console
76             inline bool flush();
77 
78         private:
79             ostream<Char, buffer_sink>& os_;
80         };
81 
82         ///////////////////////////////////////////////////////////////////////
83         template <typename Char = char, typename Sink = buffer_sink<Char> >
84         struct ostream_creator
85         {
86             typedef std::back_insert_iterator<std::vector<Char> > iterator_type;
87             typedef Sink device_type;
88             typedef boost::iostreams::stream<device_type> stream_type;
89         };
90 
91         ///////////////////////////////////////////////////////////////////////
get_outstream(cout_tag)92         inline std::ostream& get_outstream(cout_tag)
93         {
94             return std::cout;
95         }
96 
get_outstream(cerr_tag)97         inline std::ostream& get_outstream(cerr_tag)
98         {
99             return std::cerr;
100         }
101 
102         std::stringstream& get_consolestream();
103 
get_outstream(consolestream_tag)104         inline std::ostream& get_outstream(consolestream_tag)
105         {
106             return get_consolestream();
107         }
108 
get_outstream_name(cout_tag)109         inline char const* get_outstream_name(cout_tag)
110         {
111             return "/locality#console/output_stream#cout";
112         }
113 
get_outstream_name(cerr_tag)114         inline char const* get_outstream_name(cerr_tag)
115         {
116             return "/locality#console/output_stream#cerr";
117         }
118 
get_outstream_name(consolestream_tag)119         inline char const* get_outstream_name(consolestream_tag)
120         {
121             return "/locality#console/output_stream#consolestream";
122         }
123 
124         ///////////////////////////////////////////////////////////////////////
125         hpx::future<naming::id_type>
126         create_ostream(char const* name, std::ostream& strm);
127 
128         template <typename Tag>
create_ostream(Tag tag)129         hpx::future<naming::id_type> create_ostream(Tag tag)
130         {
131             return create_ostream(get_outstream_name(tag),
132                 detail::get_outstream(tag));
133         }
134 
135 //         ///////////////////////////////////////////////////////////////////////
136 //         void release_ostream(char const* name, naming::id_type const& id);
137 //
138 //         template <typename Tag>
139 //         void release_ostream(Tag tag, naming::id_type const& id)
140 //         {
141 //             release_ostream(get_outstream_name(tag), id);
142 //         }
143 
144         ///////////////////////////////////////////////////////////////////////
145         void register_ostreams();
146         void unregister_ostreams();
147     }
148 
149     ///////////////////////////////////////////////////////////////////////////
150     template <typename Char, typename Sink>
151     struct ostream
152         : components::client_base<ostream<Char, Sink>, server::output_stream>
153         , detail::buffer
154         , detail::ostream_creator<Char, Sink>::stream_type
155     {
156         HPX_NON_COPYABLE(ostream);
157 
158     private:
159         typedef components::client_base<ostream, server::output_stream> base_type;
160 
161         typedef detail::ostream_creator<Char, Sink> ostream_creator;
162         typedef typename ostream_creator::stream_type stream_base_type;
163         typedef typename ostream_creator::iterator_type iterator_type;
164 
165         typedef typename stream_base_type::traits_type stream_traits_type;
166         typedef BOOST_IOSTREAMS_BASIC_OSTREAM(Char, stream_traits_type) std_stream_type;
167         typedef detail::buffer::mutex_type mutex_type;
168 
169     private:
170         using detail::buffer::mtx_;
171         std::atomic<std::uint64_t> generational_count_;
172 
173         // Performs a lazy streaming operation.
174         template <typename T>
streaming_operator_lazyhpx::iostreams::ostream175         ostream& streaming_operator_lazy(T const& subject)
176         { // {{{
177             // apply the subject to the local stream
178             *static_cast<stream_base_type*>(this) << subject;
179             return *this;
180         } // }}}
181 
182         // Performs an asynchronous streaming operation.
183         template <typename T, typename Lock>
streaming_operator_asynchpx::iostreams::ostream184         ostream& streaming_operator_async(T const& subject, Lock& l)
185         { // {{{
186             // apply the subject to the local stream
187             *static_cast<stream_base_type*>(this) << subject;
188 
189             // If the buffer isn't empty, send it asynchronously to the
190             // destination.
191             if (!this->detail::buffer::empty_locked())
192             {
193                 // Create the next buffer, returns the previous buffer
194                 buffer next = this->detail::buffer::init_locked();
195 
196                 // Unlock the mutex before we cleanup.
197                 l.unlock();
198 
199                 // Perform the write operation, then destroy the old buffer and
200                 // stream.
201                 typedef server::output_stream::write_async_action action_type;
202                 hpx::apply<action_type>(this->get_id(), hpx::get_locality_id(),
203                     generational_count_++, next);
204             }
205 
206             return *this;
207         } // }}}
208 
209         // Performs a synchronous streaming operation.
210         template <typename T, typename Lock>
streaming_operator_synchpx::iostreams::ostream211         ostream& streaming_operator_sync(T const& subject, Lock& l)
212         { // {{{
213             // apply the subject to the local stream
214             *static_cast<stream_base_type*>(this) << subject;
215 
216             // If the buffer isn't empty, send it to the destination.
217             if (!this->detail::buffer::empty_locked())
218             {
219                 // Create the next buffer, returns the previous buffer
220                 buffer next = this->detail::buffer::init_locked();
221 
222                 // Unlock the mutex before we cleanup.
223                 l.unlock();
224 
225                 // Perform the write operation, then destroy the old buffer and
226                 // stream.
227                 typedef server::output_stream::write_sync_action action_type;
228                 hpx::async<action_type>(this->get_id(), hpx::get_locality_id(),
229                     generational_count_++, next).get();
230             }
231             else
232             {
233                 l.unlock();     // must unlock in any case
234             }
235             return *this;
236         } // }}}
237 
238         ///////////////////////////////////////////////////////////////////////
239         friend struct detail::buffer_sink<char>;
240 
flushhpx::iostreams::ostream241         bool flush()
242         {
243             std::unique_lock<mutex_type> l(*mtx_);
244             if (!this->detail::buffer::empty_locked())
245             {
246                 // Create the next buffer, returns the previous buffer
247                 buffer next = this->detail::buffer::init_locked();
248 
249                 // Unlock the mutex before we cleanup.
250                 l.unlock();
251 
252                 // since mtx_ is recursive and apply will do an AGAS lookup,
253                 // we need to ignore the lock here in case we are called
254                 // recursively
255                 hpx::util::ignore_while_checking<std::unique_lock<mutex_type> >
256                     il(&l);
257 
258                 // Perform the write operation, then destroy the old buffer and
259                 // stream.
260                 typedef server::output_stream::write_async_action action_type;
261                 hpx::apply<action_type>(this->get_id(), hpx::get_locality_id(),
262                     generational_count_++, next);
263             }
264             return true;
265         }
266 
267         ///////////////////////////////////////////////////////////////////////
268         friend void detail::register_ostreams();
269         friend void detail::unregister_ostreams();
270 
271         // late initialization during runtime system startup
272         template <typename Tag>
initializehpx::iostreams::ostream273         void initialize(Tag tag)
274         {
275             *static_cast<base_type*>(this) = detail::create_ostream(tag);
276         }
277 
278         // reset this object during runtime system shutdown
279         template <typename Tag>
uninitializehpx::iostreams::ostream280         void uninitialize(Tag tag)
281         {
282             std::unique_lock<mutex_type> l(*mtx_, std::try_to_lock);
283             if (l)
284             {
285                 streaming_operator_sync(hpx::async_flush, l);   // unlocks
286             }
287 
288             // FIXME: find a later spot to invoke this
289 //             detail::release_ostream(tag, this->get_id());
290             this->base_type::free();
291         }
292 
293     public:
ostreamhpx::iostreams::ostream294         ostream()
295           : base_type()
296           , buffer()
297           , stream_base_type(*this)
298           , generational_count_(0)
299         {}
300 
301         // hpx::flush manipulator
operator <<hpx::iostreams::ostream302         ostream& operator<<(hpx::iostreams::flush_type const& m)
303         {
304             std::unique_lock<mutex_type> l(*mtx_);
305             return streaming_operator_sync(m, l);
306         }
307 
308         // hpx::endl manipulator
operator <<hpx::iostreams::ostream309         ostream& operator<<(hpx::iostreams::endl_type const& m)
310         {
311             std::unique_lock<mutex_type> l(*mtx_);
312             return streaming_operator_sync(m, l);
313         }
314 
315         // hpx::async_flush manipulator
operator <<hpx::iostreams::ostream316         ostream& operator<<(hpx::iostreams::async_flush_type const& m)
317         {
318             std::unique_lock<mutex_type> l(*mtx_);
319             return streaming_operator_async(m, l);
320         }
321 
322         // hpx::async_endl manipulator
operator <<hpx::iostreams::ostream323         ostream& operator<<(hpx::iostreams::async_endl_type const& m)
324         {
325             std::unique_lock<mutex_type> l(*mtx_);
326             return streaming_operator_async(m, l);
327         }
328 
329         ///////////////////////////////////////////////////////////////////////
330         template <typename T>
operator <<hpx::iostreams::ostream331         ostream& operator<<(T const& subject)
332         {
333             std::lock_guard<mutex_type> l(*mtx_);
334             return streaming_operator_lazy(subject);
335         }
336 
337         ///////////////////////////////////////////////////////////////////////
operator <<hpx::iostreams::ostream338         ostream& operator<<(std_stream_type& (*manip_fun)(std_stream_type&))
339         {
340             std::unique_lock<mutex_type> l(*mtx_);
341             util::ignore_while_checking<std::unique_lock<mutex_type> > ignore(&l);
342             return streaming_operator_lazy(manip_fun);
343         }
344     };
345 
346     ///////////////////////////////////////////////////////////////////////////
347     namespace detail
348     {
349         template <typename Char>
write(Char const * s,std::streamsize n)350         inline std::streamsize buffer_sink<Char>::write(
351             Char const* s, std::streamsize n)
352         {
353             return static_cast<buffer&>(os_).write(s, n);
354         }
355 
356         template <typename Char>
flush()357         inline bool buffer_sink<Char>::flush()
358         {
359             return os_.flush();
360         }
361     }
362 }}
363 
364 #endif // HPX_97FC0FA2_E773_4F83_8477_806EC68C2253
365 
366