1 #ifndef OSMIUM_IO_WRITER_HPP 2 #define OSMIUM_IO_WRITER_HPP 3 4 /* 5 6 This file is part of Osmium (https://osmcode.org/libosmium). 7 8 Copyright 2013-2021 Jochen Topf <jochen@topf.org> and others (see README). 9 10 Boost Software License - Version 1.0 - August 17th, 2003 11 12 Permission is hereby granted, free of charge, to any person or organization 13 obtaining a copy of the software and accompanying documentation covered by 14 this license (the "Software") to use, reproduce, display, distribute, 15 execute, and transmit the Software, and to prepare derivative works of the 16 Software, and to permit third-parties to whom the Software is furnished to 17 do so, all subject to the following: 18 19 The copyright notices in the Software and this entire statement, including 20 the above license grant, this restriction and the following disclaimer, 21 must be included in all copies of the Software, in whole or in part, and 22 all derivative works of the Software, unless such copies or derivative 23 works are solely in the form of machine-executable object code generated by 24 a source language processor. 25 26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT 29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE 30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, 31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 32 DEALINGS IN THE SOFTWARE. 33 34 */ 35 36 #include <osmium/io/compression.hpp> 37 #include <osmium/io/detail/output_format.hpp> 38 #include <osmium/io/detail/queue_util.hpp> 39 #include <osmium/io/detail/read_write.hpp> 40 #include <osmium/io/detail/write_thread.hpp> 41 #include <osmium/io/error.hpp> 42 #include <osmium/io/file.hpp> 43 #include <osmium/io/header.hpp> 44 #include <osmium/io/writer_options.hpp> 45 #include <osmium/memory/buffer.hpp> 46 #include <osmium/thread/pool.hpp> 47 #include <osmium/thread/util.hpp> 48 #include <osmium/util/config.hpp> 49 #include <osmium/version.hpp> 50 51 #include <cassert> 52 #include <cstddef> 53 #include <exception> 54 #include <functional> 55 #include <future> 56 #include <initializer_list> 57 #include <memory> 58 #include <string> 59 #include <utility> 60 61 namespace osmium { 62 63 namespace memory { 64 class Item; 65 } //namespace memory 66 67 namespace io { 68 69 namespace detail { 70 get_output_queue_size()71 inline std::size_t get_output_queue_size() noexcept { 72 return osmium::config::get_max_queue_size("OUTPUT", 20); 73 } 74 75 } // namespace detail 76 77 /** 78 * This is the user-facing interface for writing OSM files. Instantiate 79 * an object of this class with a file name or osmium::io::File object 80 * and optionally the data for the header and then call operator() on 81 * it to write Buffers or Items. 82 * 83 * The writer uses multithreading internally to do the actual encoding 84 * of the data into the intended format, possible compress the data and 85 * then write it out. But this is intentionally hidden from the user 86 * of this class who can use it without knowing those details. 87 * 88 * If you are done call the close() method to finish up. Only if you 89 * don't get an exception from the close() method, you can be sure 90 * the data is written correctly (modulo operating system buffering). 91 * The destructor of this class will also do the right thing if you 92 * forget to call close(), but because the destructor can't throw you 93 * will not get informed about any problems. 94 * 95 * The writer is usually used to write complete blocks of data stored 96 * in osmium::memory::Buffers. But you can also write single 97 * osmium::memory::Items. In this case the Writer uses an internal 98 * Buffer. 99 */ 100 class Writer { 101 102 enum { 103 default_buffer_size = 10UL * 1024UL * 1024UL 104 }; 105 106 osmium::io::File m_file; 107 108 detail::future_string_queue_type m_output_queue{detail::get_output_queue_size(), "raw_output"}; 109 110 std::unique_ptr<osmium::io::detail::OutputFormat> m_output{nullptr}; 111 112 osmium::memory::Buffer m_buffer{}; 113 114 size_t m_buffer_size = default_buffer_size; 115 116 std::future<std::size_t> m_write_future{}; 117 118 osmium::thread::thread_handler m_thread{}; 119 120 // Checking the m_write_future is much more expensive then checking 121 // one atomic bool, so we set this bool in the write_thread when 122 // the writer should check the future... 123 std::atomic_bool m_notification{false}; 124 125 enum class status { 126 okay = 0, // normal writing 127 error = 1, // some error occurred while writing 128 closed = 2 // close() called successfully 129 } m_status = status::okay; 130 131 // This function will run in a separate thread. write_thread(detail::future_string_queue_type & output_queue,std::unique_ptr<osmium::io::Compressor> && compressor,std::promise<std::size_t> && write_promise,std::atomic_bool * notification)132 static void write_thread(detail::future_string_queue_type& output_queue, 133 std::unique_ptr<osmium::io::Compressor>&& compressor, 134 std::promise<std::size_t>&& write_promise, 135 std::atomic_bool* notification) { 136 detail::WriteThread write_thread{output_queue, 137 std::move(compressor), 138 std::move(write_promise), 139 notification}; 140 write_thread(); 141 } 142 do_write(osmium::memory::Buffer && buffer)143 void do_write(osmium::memory::Buffer&& buffer) { 144 if (buffer && buffer.committed() > 0) { 145 m_output->write_buffer(std::move(buffer)); 146 } 147 } 148 do_flush()149 void do_flush() { 150 if (m_notification) { 151 osmium::thread::check_for_exception(m_write_future); 152 } 153 if (m_buffer && m_buffer.committed() > 0) { 154 osmium::memory::Buffer buffer{m_buffer_size, 155 osmium::memory::Buffer::auto_grow::no}; 156 using std::swap; 157 swap(m_buffer, buffer); 158 159 m_output->write_buffer(std::move(buffer)); 160 } 161 } 162 163 template <typename TFunction, typename... TArgs> ensure_cleanup(TFunction func,TArgs &&...args)164 void ensure_cleanup(TFunction func, TArgs&&... args) { 165 if (m_status != status::okay) { 166 throw io_error("Can not write to writer when in status 'closed' or 'error'"); 167 } 168 169 try { 170 func(std::forward<TArgs>(args)...); 171 } catch (...) { 172 m_status = status::error; 173 detail::add_to_queue(m_output_queue, std::current_exception()); 174 detail::add_end_of_data_to_queue(m_output_queue); 175 throw; 176 } 177 } 178 179 struct options_type { 180 osmium::io::Header header; 181 overwrite allow_overwrite = overwrite::no; 182 fsync sync = fsync::no; 183 osmium::thread::Pool* pool = nullptr; 184 }; 185 set_option(options_type & options,osmium::thread::Pool & pool)186 static void set_option(options_type& options, osmium::thread::Pool& pool) { 187 options.pool = &pool; 188 } 189 set_option(options_type & options,const osmium::io::Header & header)190 static void set_option(options_type& options, const osmium::io::Header& header) { 191 options.header = header; 192 } 193 set_option(options_type & options,overwrite value)194 static void set_option(options_type& options, overwrite value) { 195 options.allow_overwrite = value; 196 } 197 set_option(options_type & options,fsync value)198 static void set_option(options_type& options, fsync value) { 199 options.sync = value; 200 } 201 do_close()202 void do_close() { 203 if (m_status == status::okay) { 204 ensure_cleanup([&](){ 205 do_write(std::move(m_buffer)); 206 m_output->write_end(); 207 m_status = status::closed; 208 detail::add_end_of_data_to_queue(m_output_queue); 209 }); 210 } 211 } 212 213 public: 214 215 /** 216 * The constructor of the Writer object opens a file and writes the 217 * header to it. 218 * 219 * @param file File (contains name and format info) to open. 220 * @param args All further arguments are optional and can appear 221 * in any order: 222 * 223 * * osmium::io::Header: Optional header data. If this is 224 * not given, a default constructed osmium::io::Header 225 * object will be used. 226 * 227 * * osmium::io::overwrite: Allow overwriting of existing file? 228 * Can be osmium::io::overwrite::allow or 229 * osmium::io::overwrite::no (default). 230 * 231 * * osmium::io::fsync: Should fsync be called on the file 232 * before closing it? Can be osmium::io::fsync::yes or 233 * osmium::io::fsync::no (default). 234 * 235 * * osmium::thread::Pool&: Reference to a thread pool that should 236 * be used for writing instead of the default pool. Usually 237 * it is okay to use the statically initialized shared 238 * default pool, but sometimes you want or need your own. 239 * For instance when your program will fork, using the 240 * statically initialized pool will not work. 241 * 242 * @throws osmium::io_error If there was an error. 243 * @throws std::system_error If the file could not be opened. 244 */ 245 template <typename... TArgs> Writer(const osmium::io::File & file,TArgs &&...args)246 explicit Writer(const osmium::io::File& file, TArgs&&... args) : 247 m_file(file.check()) { 248 assert(!m_file.buffer()); // XXX can't handle pseudo-files 249 250 options_type options; 251 (void)std::initializer_list<int>{ 252 (set_option(options, args), 0)... 253 }; 254 255 if (!options.pool) { 256 options.pool = &thread::Pool::default_instance(); 257 } 258 259 m_output = osmium::io::detail::OutputFormatFactory::instance().create_output(*options.pool, m_file, m_output_queue); 260 261 if (options.header.get("generator").empty()) { 262 options.header.set("generator", "libosmium/" LIBOSMIUM_VERSION_STRING); 263 } 264 265 std::unique_ptr<osmium::io::Compressor> compressor = 266 CompressionFactory::instance().create_compressor(file.compression(), 267 osmium::io::detail::open_for_writing(m_file.filename(), options.allow_overwrite), 268 options.sync); 269 270 std::promise<std::size_t> write_promise; 271 m_write_future = write_promise.get_future(); 272 m_thread = osmium::thread::thread_handler{write_thread, std::ref(m_output_queue), std::move(compressor), std::move(write_promise), &m_notification}; 273 274 ensure_cleanup([&](){ 275 m_output->write_header(options.header); 276 }); 277 } 278 279 template <typename... TArgs> Writer(const std::string & filename,TArgs &&...args)280 explicit Writer(const std::string& filename, TArgs&&... args) : 281 Writer(osmium::io::File{filename}, std::forward<TArgs>(args)...) { 282 } 283 284 template <typename... TArgs> Writer(const char * filename,TArgs &&...args)285 explicit Writer(const char* filename, TArgs&&... args) : 286 Writer(osmium::io::File{filename}, std::forward<TArgs>(args)...) { 287 } 288 289 Writer(const Writer&) = delete; 290 Writer& operator=(const Writer&) = delete; 291 292 Writer(Writer&&) = delete; 293 Writer& operator=(Writer&&) = delete; 294 ~Writer()295 ~Writer() noexcept { 296 try { 297 do_close(); 298 } catch (...) { 299 // Ignore any exceptions because destructor must not throw. 300 } 301 } 302 303 /** 304 * Get the currently configured size of the internal buffer. 305 */ buffer_size() const306 size_t buffer_size() const noexcept { 307 return m_buffer_size; 308 } 309 310 /** 311 * Set the size of the internal buffer. This will only take effect 312 * if you have not yet written anything or after the next flush(). 313 */ set_buffer_size(size_t size)314 void set_buffer_size(size_t size) noexcept { 315 m_buffer_size = size; 316 } 317 318 /** 319 * Flush the internal buffer if it contains any data. This is 320 * usually not needed as the buffer gets flushed on close() 321 * automatically. 322 * 323 * @throws Some form of osmium::io_error when there is a problem. 324 */ flush()325 void flush() { 326 ensure_cleanup([&](){ 327 do_flush(); 328 }); 329 } 330 331 /** 332 * Write contents of a buffer to the output file. The buffer is 333 * moved into this function and will be in an undefined moved-from 334 * state afterwards. 335 * 336 * @param buffer Buffer that is being written out. 337 * @throws Some form of osmium::io_error when there is a problem. 338 */ operator ()(osmium::memory::Buffer && buffer)339 void operator()(osmium::memory::Buffer&& buffer) { 340 ensure_cleanup([&](){ 341 do_flush(); 342 do_write(std::move(buffer)); 343 }); 344 } 345 346 /** 347 * Add item to the internal buffer for eventual writing to the 348 * output file. 349 * 350 * @param item Item to write (usually an OSM object). 351 * @throws Some form of osmium::io_error when there is a problem. 352 */ operator ()(const osmium::memory::Item & item)353 void operator()(const osmium::memory::Item& item) { 354 ensure_cleanup([&](){ 355 if (!m_buffer) { 356 m_buffer = osmium::memory::Buffer{m_buffer_size, 357 osmium::memory::Buffer::auto_grow::no}; 358 } 359 try { 360 m_buffer.push_back(item); 361 } catch (const osmium::buffer_is_full&) { 362 do_flush(); 363 m_buffer.push_back(item); 364 } 365 }); 366 } 367 368 /** 369 * Flushes internal buffer and closes output file. If you do not 370 * call this, the destructor of Writer will also do the same 371 * thing. But because this call might throw an exception, which 372 * the destructor will ignore, it is better to call close() 373 * explicitly. 374 * 375 * @returns Number of bytes written to the file (or 0 if it can 376 * not be determined). 377 * @throws Some form of osmium::io_error when there is a problem. 378 */ close()379 std::size_t close() { 380 do_close(); 381 382 if (m_write_future.valid()) { 383 return m_write_future.get(); 384 } 385 386 return 0; 387 } 388 389 }; // class Writer 390 391 } // namespace io 392 393 } // namespace osmium 394 395 #endif // OSMIUM_IO_WRITER_HPP 396