1 #ifndef OSMIUM_IO_WRITER_HPP
2 #define OSMIUM_IO_WRITER_HPP
3 
4 /*
5 
6 This file is part of Osmium (https://osmcode.org/libosmium).
7 
8 Copyright 2013-2021 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
36 #include <osmium/io/compression.hpp>
37 #include <osmium/io/detail/output_format.hpp>
38 #include <osmium/io/detail/queue_util.hpp>
39 #include <osmium/io/detail/read_write.hpp>
40 #include <osmium/io/detail/write_thread.hpp>
41 #include <osmium/io/error.hpp>
42 #include <osmium/io/file.hpp>
43 #include <osmium/io/header.hpp>
44 #include <osmium/io/writer_options.hpp>
45 #include <osmium/memory/buffer.hpp>
46 #include <osmium/thread/pool.hpp>
47 #include <osmium/thread/util.hpp>
48 #include <osmium/util/config.hpp>
49 #include <osmium/version.hpp>
50 
51 #include <cassert>
52 #include <cstddef>
53 #include <exception>
54 #include <functional>
55 #include <future>
56 #include <initializer_list>
57 #include <memory>
58 #include <string>
59 #include <utility>
60 
61 namespace osmium {
62 
63     namespace memory {
64         class Item;
65     } //namespace memory
66 
67     namespace io {
68 
69         namespace detail {
70 
get_output_queue_size()71             inline std::size_t get_output_queue_size() noexcept {
72                 return osmium::config::get_max_queue_size("OUTPUT", 20);
73             }
74 
75         } // namespace detail
76 
77         /**
78          * This is the user-facing interface for writing OSM files. Instantiate
79          * an object of this class with a file name or osmium::io::File object
80          * and optionally the data for the header and then call operator() on
81          * it to write Buffers or Items.
82          *
83          * The writer uses multithreading internally to do the actual encoding
84          * of the data into the intended format, possible compress the data and
85          * then write it out. But this is intentionally hidden from the user
86          * of this class who can use it without knowing those details.
87          *
88          * If you are done call the close() method to finish up. Only if you
89          * don't get an exception from the close() method, you can be sure
90          * the data is written correctly (modulo operating system buffering).
91          * The destructor of this class will also do the right thing if you
92          * forget to call close(), but because the destructor can't throw you
93          * will not get informed about any problems.
94          *
95          * The writer is usually used to write complete blocks of data stored
96          * in osmium::memory::Buffers. But you can also write single
97          * osmium::memory::Items. In this case the Writer uses an internal
98          * Buffer.
99          */
100         class Writer {
101 
102             enum {
103                 default_buffer_size = 10UL * 1024UL * 1024UL
104             };
105 
106             osmium::io::File m_file;
107 
108             detail::future_string_queue_type m_output_queue{detail::get_output_queue_size(), "raw_output"};
109 
110             std::unique_ptr<osmium::io::detail::OutputFormat> m_output{nullptr};
111 
112             osmium::memory::Buffer m_buffer{};
113 
114             size_t m_buffer_size = default_buffer_size;
115 
116             std::future<std::size_t> m_write_future{};
117 
118             osmium::thread::thread_handler m_thread{};
119 
120             // Checking the m_write_future is much more expensive then checking
121             // one atomic bool, so we set this bool in the write_thread when
122             // the writer should check the future...
123             std::atomic_bool m_notification{false};
124 
125             enum class status {
126                 okay   = 0, // normal writing
127                 error  = 1, // some error occurred while writing
128                 closed = 2  // close() called successfully
129             } m_status = status::okay;
130 
131             // This function will run in a separate thread.
write_thread(detail::future_string_queue_type & output_queue,std::unique_ptr<osmium::io::Compressor> && compressor,std::promise<std::size_t> && write_promise,std::atomic_bool * notification)132             static void write_thread(detail::future_string_queue_type& output_queue,
133                                      std::unique_ptr<osmium::io::Compressor>&& compressor,
134                                      std::promise<std::size_t>&& write_promise,
135                                      std::atomic_bool* notification) {
136                 detail::WriteThread write_thread{output_queue,
137                                                  std::move(compressor),
138                                                  std::move(write_promise),
139                                                  notification};
140                 write_thread();
141             }
142 
do_write(osmium::memory::Buffer && buffer)143             void do_write(osmium::memory::Buffer&& buffer) {
144                 if (buffer && buffer.committed() > 0) {
145                     m_output->write_buffer(std::move(buffer));
146                 }
147             }
148 
do_flush()149             void do_flush() {
150                 if (m_notification) {
151                     osmium::thread::check_for_exception(m_write_future);
152                 }
153                 if (m_buffer && m_buffer.committed() > 0) {
154                     osmium::memory::Buffer buffer{m_buffer_size,
155                                                   osmium::memory::Buffer::auto_grow::no};
156                     using std::swap;
157                     swap(m_buffer, buffer);
158 
159                     m_output->write_buffer(std::move(buffer));
160                 }
161             }
162 
163             template <typename TFunction, typename... TArgs>
ensure_cleanup(TFunction func,TArgs &&...args)164             void ensure_cleanup(TFunction func, TArgs&&... args) {
165                 if (m_status != status::okay) {
166                     throw io_error("Can not write to writer when in status 'closed' or 'error'");
167                 }
168 
169                 try {
170                     func(std::forward<TArgs>(args)...);
171                 } catch (...) {
172                     m_status = status::error;
173                     detail::add_to_queue(m_output_queue, std::current_exception());
174                     detail::add_end_of_data_to_queue(m_output_queue);
175                     throw;
176                 }
177             }
178 
179             struct options_type {
180                 osmium::io::Header header;
181                 overwrite allow_overwrite = overwrite::no;
182                 fsync sync = fsync::no;
183                 osmium::thread::Pool* pool = nullptr;
184             };
185 
set_option(options_type & options,osmium::thread::Pool & pool)186             static void set_option(options_type& options, osmium::thread::Pool& pool) {
187                 options.pool = &pool;
188             }
189 
set_option(options_type & options,const osmium::io::Header & header)190             static void set_option(options_type& options, const osmium::io::Header& header) {
191                 options.header = header;
192             }
193 
set_option(options_type & options,overwrite value)194             static void set_option(options_type& options, overwrite value) {
195                 options.allow_overwrite = value;
196             }
197 
set_option(options_type & options,fsync value)198             static void set_option(options_type& options, fsync value) {
199                 options.sync = value;
200             }
201 
do_close()202             void do_close() {
203                 if (m_status == status::okay) {
204                     ensure_cleanup([&](){
205                         do_write(std::move(m_buffer));
206                         m_output->write_end();
207                         m_status = status::closed;
208                         detail::add_end_of_data_to_queue(m_output_queue);
209                     });
210                 }
211             }
212 
213         public:
214 
215             /**
216              * The constructor of the Writer object opens a file and writes the
217              * header to it.
218              *
219              * @param file File (contains name and format info) to open.
220              * @param args All further arguments are optional and can appear
221              *             in any order:
222              *
223              * * osmium::io::Header: Optional header data. If this is
224              *       not given, a default constructed osmium::io::Header
225              *       object will be used.
226              *
227              * * osmium::io::overwrite: Allow overwriting of existing file?
228              *       Can be osmium::io::overwrite::allow or
229              *       osmium::io::overwrite::no (default).
230              *
231              * * osmium::io::fsync: Should fsync be called on the file
232              *       before closing it? Can be osmium::io::fsync::yes or
233              *       osmium::io::fsync::no (default).
234              *
235              * * osmium::thread::Pool&: Reference to a thread pool that should
236              *      be used for writing instead of the default pool. Usually
237              *      it is okay to use the statically initialized shared
238              *      default pool, but sometimes you want or need your own.
239              *      For instance when your program will fork, using the
240              *      statically initialized pool will not work.
241              *
242              * @throws osmium::io_error If there was an error.
243              * @throws std::system_error If the file could not be opened.
244              */
245             template <typename... TArgs>
Writer(const osmium::io::File & file,TArgs &&...args)246             explicit Writer(const osmium::io::File& file, TArgs&&... args) :
247                 m_file(file.check()) {
248                 assert(!m_file.buffer()); // XXX can't handle pseudo-files
249 
250                 options_type options;
251                 (void)std::initializer_list<int>{
252                     (set_option(options, args), 0)...
253                 };
254 
255                 if (!options.pool) {
256                     options.pool = &thread::Pool::default_instance();
257                 }
258 
259                 m_output = osmium::io::detail::OutputFormatFactory::instance().create_output(*options.pool, m_file, m_output_queue);
260 
261                 if (options.header.get("generator").empty()) {
262                     options.header.set("generator", "libosmium/" LIBOSMIUM_VERSION_STRING);
263                 }
264 
265                 std::unique_ptr<osmium::io::Compressor> compressor =
266                     CompressionFactory::instance().create_compressor(file.compression(),
267                                                                      osmium::io::detail::open_for_writing(m_file.filename(), options.allow_overwrite),
268                                                                      options.sync);
269 
270                 std::promise<std::size_t> write_promise;
271                 m_write_future = write_promise.get_future();
272                 m_thread = osmium::thread::thread_handler{write_thread, std::ref(m_output_queue), std::move(compressor), std::move(write_promise), &m_notification};
273 
274                 ensure_cleanup([&](){
275                     m_output->write_header(options.header);
276                 });
277             }
278 
279             template <typename... TArgs>
Writer(const std::string & filename,TArgs &&...args)280             explicit Writer(const std::string& filename, TArgs&&... args) :
281                 Writer(osmium::io::File{filename}, std::forward<TArgs>(args)...) {
282             }
283 
284             template <typename... TArgs>
Writer(const char * filename,TArgs &&...args)285             explicit Writer(const char* filename, TArgs&&... args) :
286                 Writer(osmium::io::File{filename}, std::forward<TArgs>(args)...) {
287             }
288 
289             Writer(const Writer&) = delete;
290             Writer& operator=(const Writer&) = delete;
291 
292             Writer(Writer&&) = delete;
293             Writer& operator=(Writer&&) = delete;
294 
~Writer()295             ~Writer() noexcept {
296                 try {
297                     do_close();
298                 } catch (...) {
299                     // Ignore any exceptions because destructor must not throw.
300                 }
301             }
302 
303             /**
304              * Get the currently configured size of the internal buffer.
305              */
buffer_size() const306             size_t buffer_size() const noexcept {
307                 return m_buffer_size;
308             }
309 
310             /**
311              * Set the size of the internal buffer. This will only take effect
312              * if you have not yet written anything or after the next flush().
313              */
set_buffer_size(size_t size)314             void set_buffer_size(size_t size) noexcept {
315                 m_buffer_size = size;
316             }
317 
318             /**
319              * Flush the internal buffer if it contains any data. This is
320              * usually not needed as the buffer gets flushed on close()
321              * automatically.
322              *
323              * @throws Some form of osmium::io_error when there is a problem.
324              */
flush()325             void flush() {
326                 ensure_cleanup([&](){
327                     do_flush();
328                 });
329             }
330 
331             /**
332              * Write contents of a buffer to the output file. The buffer is
333              * moved into this function and will be in an undefined moved-from
334              * state afterwards.
335              *
336              * @param buffer Buffer that is being written out.
337              * @throws Some form of osmium::io_error when there is a problem.
338              */
operator ()(osmium::memory::Buffer && buffer)339             void operator()(osmium::memory::Buffer&& buffer) {
340                 ensure_cleanup([&](){
341                     do_flush();
342                     do_write(std::move(buffer));
343                 });
344             }
345 
346             /**
347              * Add item to the internal buffer for eventual writing to the
348              * output file.
349              *
350              * @param item Item to write (usually an OSM object).
351              * @throws Some form of osmium::io_error when there is a problem.
352              */
operator ()(const osmium::memory::Item & item)353             void operator()(const osmium::memory::Item& item) {
354                 ensure_cleanup([&](){
355                     if (!m_buffer) {
356                         m_buffer = osmium::memory::Buffer{m_buffer_size,
357                                                           osmium::memory::Buffer::auto_grow::no};
358                     }
359                     try {
360                         m_buffer.push_back(item);
361                     } catch (const osmium::buffer_is_full&) {
362                         do_flush();
363                         m_buffer.push_back(item);
364                     }
365                 });
366             }
367 
368             /**
369              * Flushes internal buffer and closes output file. If you do not
370              * call this, the destructor of Writer will also do the same
371              * thing. But because this call might throw an exception, which
372              * the destructor will ignore, it is better to call close()
373              * explicitly.
374              *
375              * @returns Number of bytes written to the file (or 0 if it can
376              *          not be determined).
377              * @throws Some form of osmium::io_error when there is a problem.
378              */
close()379             std::size_t close() {
380                 do_close();
381 
382                 if (m_write_future.valid()) {
383                     return m_write_future.get();
384                 }
385 
386                 return 0;
387             }
388 
389         }; // class Writer
390 
391     } // namespace io
392 
393 } // namespace osmium
394 
395 #endif // OSMIUM_IO_WRITER_HPP
396