Libosmium  2.17.0
Fast and flexible C++ library for working with OpenStreetMap data
Loading...
Searching...
No Matches
writer.hpp
Go to the documentation of this file.
1#ifndef OSMIUM_IO_WRITER_HPP
2#define OSMIUM_IO_WRITER_HPP
3
4/*
5
6This file is part of Osmium (https://osmcode.org/libosmium).
7
8Copyright 2013-2021 Jochen Topf <jochen@topf.org> and others (see README).
9
10Boost Software License - Version 1.0 - August 17th, 2003
11
12Permission is hereby granted, free of charge, to any person or organization
13obtaining a copy of the software and accompanying documentation covered by
14this license (the "Software") to use, reproduce, display, distribute,
15execute, and transmit the Software, and to prepare derivative works of the
16Software, and to permit third-parties to whom the Software is furnished to
17do so, all subject to the following:
18
19The copyright notices in the Software and this entire statement, including
20the above license grant, this restriction and the following disclaimer,
21must be included in all copies of the Software, in whole or in part, and
22all derivative works of the Software, unless such copies or derivative
23works are solely in the form of machine-executable object code generated by
24a source language processor.
25
26THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32DEALINGS IN THE SOFTWARE.
33
34*/
35
37#include <osmium/io/detail/output_format.hpp>
38#include <osmium/io/detail/queue_util.hpp>
39#include <osmium/io/detail/read_write.hpp>
40#include <osmium/io/detail/write_thread.hpp>
41#include <osmium/io/error.hpp>
42#include <osmium/io/file.hpp>
43#include <osmium/io/header.hpp>
49#include <osmium/version.hpp>
50
51#include <cassert>
52#include <cstddef>
53#include <exception>
54#include <functional>
55#include <future>
56#include <initializer_list>
57#include <memory>
58#include <string>
59#include <utility>
60
61namespace osmium {
62
63 namespace memory {
64 class Item;
65 } //namespace memory
66
67 namespace io {
68
69 namespace detail {
70
71 inline std::size_t get_output_queue_size() noexcept {
72 return osmium::config::get_max_queue_size("OUTPUT", 20);
73 }
74
75 } // namespace detail
76
100 class Writer {
101
102 enum {
103 default_buffer_size = 10UL * 1024UL * 1024UL
104 };
105
107
108 detail::future_string_queue_type m_output_queue{detail::get_output_queue_size(), "raw_output"};
109
110 std::unique_ptr<osmium::io::detail::OutputFormat> m_output{nullptr};
111
112 osmium::memory::Buffer m_buffer{};
113
115
116 std::future<std::size_t> m_write_future{};
117
119
120 // Checking the m_write_future is much more expensive then checking
121 // one atomic bool, so we set this bool in the write_thread when
122 // the writer should check the future...
123 std::atomic_bool m_notification{false};
124
125 enum class status {
126 okay = 0, // normal writing
127 error = 1, // some error occurred while writing
128 closed = 2 // close() called successfully
130
131 // This function will run in a separate thread.
132 static void write_thread(detail::future_string_queue_type& output_queue,
133 std::unique_ptr<osmium::io::Compressor>&& compressor,
134 std::promise<std::size_t>&& write_promise,
135 std::atomic_bool* notification) {
136 detail::WriteThread write_thread{output_queue,
137 std::move(compressor),
138 std::move(write_promise),
139 notification};
140 write_thread();
141 }
142
143 void do_write(osmium::memory::Buffer&& buffer) {
144 if (buffer && buffer.committed() > 0) {
145 m_output->write_buffer(std::move(buffer));
146 }
147 }
148
149 void do_flush() {
150 if (m_notification) {
152 }
153 if (m_buffer && m_buffer.committed() > 0) {
154 osmium::memory::Buffer buffer{m_buffer_size,
155 osmium::memory::Buffer::auto_grow::no};
156 using std::swap;
157 swap(m_buffer, buffer);
158
159 m_output->write_buffer(std::move(buffer));
160 }
161 }
162
163 template <typename TFunction, typename... TArgs>
164 void ensure_cleanup(TFunction func, TArgs&&... args) {
165 if (m_status != status::okay) {
166 throw io_error("Can not write to writer when in status 'closed' or 'error'");
167 }
168
169 try {
170 func(std::forward<TArgs>(args)...);
171 } catch (...) {
173 detail::add_to_queue(m_output_queue, std::current_exception());
174 detail::add_end_of_data_to_queue(m_output_queue);
175 throw;
176 }
177 }
178
184 };
185
186 static void set_option(options_type& options, osmium::thread::Pool& pool) {
187 options.pool = &pool;
188 }
189
190 static void set_option(options_type& options, const osmium::io::Header& header) {
191 options.header = header;
192 }
193
194 static void set_option(options_type& options, overwrite value) {
195 options.allow_overwrite = value;
196 }
197
198 static void set_option(options_type& options, fsync value) {
199 options.sync = value;
200 }
201
202 void do_close() {
203 if (m_status == status::okay) {
204 ensure_cleanup([&](){
205 do_write(std::move(m_buffer));
206 m_output->write_end();
208 detail::add_end_of_data_to_queue(m_output_queue);
209 });
210 }
211 }
212
213 public:
214
245 template <typename... TArgs>
246 explicit Writer(const osmium::io::File& file, TArgs&&... args) :
247 m_file(file.check()) {
248 assert(!m_file.buffer()); // XXX can't handle pseudo-files
249
250 options_type options;
251 (void)std::initializer_list<int>{
252 (set_option(options, args), 0)...
253 };
254
255 if (!options.pool) {
257 }
258
259 m_output = osmium::io::detail::OutputFormatFactory::instance().create_output(*options.pool, m_file, m_output_queue);
260
261 if (options.header.get("generator").empty()) {
262 options.header.set("generator", "libosmium/" LIBOSMIUM_VERSION_STRING);
263 }
264
265 std::unique_ptr<osmium::io::Compressor> compressor =
267 osmium::io::detail::open_for_writing(m_file.filename(), options.allow_overwrite),
268 options.sync);
269
270 std::promise<std::size_t> write_promise;
271 m_write_future = write_promise.get_future();
272 m_thread = osmium::thread::thread_handler{write_thread, std::ref(m_output_queue), std::move(compressor), std::move(write_promise), &m_notification};
273
274 ensure_cleanup([&](){
275 m_output->write_header(options.header);
276 });
277 }
278
279 template <typename... TArgs>
280 explicit Writer(const std::string& filename, TArgs&&... args) :
281 Writer(osmium::io::File{filename}, std::forward<TArgs>(args)...) {
282 }
283
284 template <typename... TArgs>
285 explicit Writer(const char* filename, TArgs&&... args) :
286 Writer(osmium::io::File{filename}, std::forward<TArgs>(args)...) {
287 }
288
289 Writer(const Writer&) = delete;
290 Writer& operator=(const Writer&) = delete;
291
292 Writer(Writer&&) = delete;
293 Writer& operator=(Writer&&) = delete;
294
295 ~Writer() noexcept {
296 try {
297 do_close();
298 } catch (...) {
299 // Ignore any exceptions because destructor must not throw.
300 }
301 }
302
306 size_t buffer_size() const noexcept {
307 return m_buffer_size;
308 }
309
314 void set_buffer_size(size_t size) noexcept {
315 m_buffer_size = size;
316 }
317
325 void flush() {
326 ensure_cleanup([&](){
327 do_flush();
328 });
329 }
330
339 void operator()(osmium::memory::Buffer&& buffer) {
340 ensure_cleanup([&](){
341 do_flush();
342 do_write(std::move(buffer));
343 });
344 }
345
354 ensure_cleanup([&](){
355 if (!m_buffer) {
356 m_buffer = osmium::memory::Buffer{m_buffer_size,
357 osmium::memory::Buffer::auto_grow::no};
358 }
359 try {
360 m_buffer.push_back(item);
361 } catch (const osmium::buffer_is_full&) {
362 do_flush();
363 m_buffer.push_back(item);
364 }
365 });
366 }
367
379 std::size_t close() {
380 do_close();
381
382 if (m_write_future.valid()) {
383 return m_write_future.get();
384 }
385
386 return 0;
387 }
388
389 }; // class Writer
390
391 } // namespace io
392
393} // namespace osmium
394
395#endif // OSMIUM_IO_WRITER_HPP
std::unique_ptr< osmium::io::Compressor > create_compressor(const osmium::io::file_compression compression, TArgs &&... args) const
Definition: compression.hpp:204
static CompressionFactory & instance()
Definition: compression.hpp:184
Definition: file.hpp:72
File & filename(const std::string &filename)
Definition: file.hpp:312
file_compression compression() const noexcept
Definition: file.hpp:294
const char * buffer() const noexcept
Definition: file.hpp:143
Definition: header.hpp:68
Definition: writer.hpp:100
std::size_t close()
Definition: writer.hpp:379
@ default_buffer_size
Definition: writer.hpp:103
static void set_option(options_type &options, fsync value)
Definition: writer.hpp:198
static void set_option(options_type &options, const osmium::io::Header &header)
Definition: writer.hpp:190
static void set_option(options_type &options, osmium::thread::Pool &pool)
Definition: writer.hpp:186
size_t m_buffer_size
Definition: writer.hpp:114
Writer(Writer &&)=delete
void do_flush()
Definition: writer.hpp:149
Writer(const osmium::io::File &file, TArgs &&... args)
Definition: writer.hpp:246
Writer(const std::string &filename, TArgs &&... args)
Definition: writer.hpp:280
size_t buffer_size() const noexcept
Definition: writer.hpp:306
status
Definition: writer.hpp:125
osmium::thread::thread_handler m_thread
Definition: writer.hpp:118
void flush()
Definition: writer.hpp:325
void operator()(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:339
osmium::memory::Buffer m_buffer
Definition: writer.hpp:112
std::atomic_bool m_notification
Definition: writer.hpp:123
~Writer() noexcept
Definition: writer.hpp:295
std::unique_ptr< osmium::io::detail::OutputFormat > m_output
Definition: writer.hpp:110
static void set_option(options_type &options, overwrite value)
Definition: writer.hpp:194
std::future< std::size_t > m_write_future
Definition: writer.hpp:116
void do_close()
Definition: writer.hpp:202
void do_write(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:143
Writer & operator=(Writer &&)=delete
Writer & operator=(const Writer &)=delete
detail::future_string_queue_type m_output_queue
Definition: writer.hpp:108
void set_buffer_size(size_t size) noexcept
Definition: writer.hpp:314
Writer(const Writer &)=delete
osmium::io::File m_file
Definition: writer.hpp:106
void operator()(const osmium::memory::Item &item)
Definition: writer.hpp:353
void ensure_cleanup(TFunction func, TArgs &&... args)
Definition: writer.hpp:164
static void write_thread(detail::future_string_queue_type &output_queue, std::unique_ptr< osmium::io::Compressor > &&compressor, std::promise< std::size_t > &&write_promise, std::atomic_bool *notification)
Definition: writer.hpp:132
Writer(const char *filename, TArgs &&... args)
Definition: writer.hpp:285
enum osmium::io::Writer::status m_status
Definition: item.hpp:105
Definition: pool.hpp:90
static Pool & default_instance()
Definition: pool.hpp:186
Definition: util.hpp:85
Definition: attr.hpp:342
std::size_t get_max_queue_size(const char *queue_name, const std::size_t default_value) noexcept
Definition: config.hpp:83
fsync
Definition: writer_options.hpp:51
overwrite
Definition: writer_options.hpp:43
void check_for_exception(std::future< T > &future)
Definition: util.hpp:55
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
Definition: location.hpp:551
Definition: writer.hpp:179
overwrite allow_overwrite
Definition: writer.hpp:181
osmium::thread::Pool * pool
Definition: writer.hpp:183
fsync sync
Definition: writer.hpp:182
osmium::io::Header header
Definition: writer.hpp:180
Definition: error.hpp:44
#define LIBOSMIUM_VERSION_STRING
Definition: version.hpp:40