Libosmium  2.17.0
Fast and flexible C++ library for working with OpenStreetMap data
Loading...
Searching...
No Matches
queue.hpp
Go to the documentation of this file.
1#ifndef OSMIUM_THREAD_QUEUE_HPP
2#define OSMIUM_THREAD_QUEUE_HPP
3
4/*
5
6This file is part of Osmium (https://osmcode.org/libosmium).
7
8Copyright 2013-2021 Jochen Topf <jochen@topf.org> and others (see README).
9
10Boost Software License - Version 1.0 - August 17th, 2003
11
12Permission is hereby granted, free of charge, to any person or organization
13obtaining a copy of the software and accompanying documentation covered by
14this license (the "Software") to use, reproduce, display, distribute,
15execute, and transmit the Software, and to prepare derivative works of the
16Software, and to permit third-parties to whom the Software is furnished to
17do so, all subject to the following:
18
19The copyright notices in the Software and this entire statement, including
20the above license grant, this restriction and the following disclaimer,
21must be included in all copies of the Software, in whole or in part, and
22all derivative works of the Software, unless such copies or derivative
23works are solely in the form of machine-executable object code generated by
24a source language processor.
25
26THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32DEALINGS IN THE SOFTWARE.
33
34*/
35
36#include <chrono>
37#include <condition_variable>
38#include <cstddef>
39#include <mutex>
40#include <queue>
41#include <string>
42#include <utility> // IWYU pragma: keep
43
44#ifdef OSMIUM_DEBUG_QUEUE_SIZE
45# include <atomic>
46# include <iostream>
47#endif
48
49namespace osmium {
50
51 namespace thread {
52
56 template <typename T>
57 class Queue {
58
61 const std::size_t m_max_size;
62
64 const std::string m_name;
65
66 mutable std::mutex m_mutex;
67
68 std::queue<T> m_queue;
69
71 std::condition_variable m_data_available;
72
74 std::condition_variable m_space_available;
75
76#ifdef OSMIUM_DEBUG_QUEUE_SIZE
78 std::size_t m_largest_size;
79
81 std::atomic<int> m_push_counter;
82
85 std::atomic<int> m_full_counter;
86
91 std::atomic<int> m_pop_counter;
92
95 std::atomic<int> m_empty_counter;
96#endif
97
98 public:
99
107 explicit Queue(std::size_t max_size = 0, std::string name = "") :
108 m_max_size(max_size),
109 m_name(std::move(name)),
110 m_queue()
111#ifdef OSMIUM_DEBUG_QUEUE_SIZE
112 ,
113 m_largest_size(0),
114 m_push_counter(0),
115 m_full_counter(0),
116 m_pop_counter(0),
117 m_empty_counter(0)
118#endif
119 {
120 }
121
122 Queue(const Queue&) = delete;
123 Queue& operator=(const Queue&) = delete;
124
125 Queue(Queue&&) = delete;
126 Queue& operator=(Queue&&) = delete;
127
128#ifdef OSMIUM_DEBUG_QUEUE_SIZE
129 ~Queue() {
130 std::cerr << "queue '" << m_name
131 << "' with max_size=" << m_max_size
132 << " had largest size " << m_largest_size
133 << " and was full " << m_full_counter
134 << " times in " << m_push_counter
135 << " push() calls and was empty " << m_empty_counter
136 << " times in " << m_pop_counter
137 << " pop() calls\n";
138 }
139#else
140 ~Queue() = default;
141#endif
142
147 void push(T value) {
148 constexpr const std::chrono::milliseconds max_wait{10};
149#ifdef OSMIUM_DEBUG_QUEUE_SIZE
150 ++m_push_counter;
151#endif
152 if (m_max_size) {
153 while (size() >= m_max_size) {
154 std::unique_lock<std::mutex> lock{m_mutex};
155 m_space_available.wait_for(lock, max_wait, [this] {
156 return m_queue.size() < m_max_size;
157 });
158#ifdef OSMIUM_DEBUG_QUEUE_SIZE
159 ++m_full_counter;
160#endif
161 }
162 }
163 std::lock_guard<std::mutex> lock{m_mutex};
164 m_queue.push(std::move(value));
165#ifdef OSMIUM_DEBUG_QUEUE_SIZE
166 if (m_largest_size < m_queue.size()) {
167 m_largest_size = m_queue.size();
168 }
169#endif
170 m_data_available.notify_one();
171 }
172
173 void wait_and_pop(T& value) {
174#ifdef OSMIUM_DEBUG_QUEUE_SIZE
175 ++m_pop_counter;
176#endif
177 std::unique_lock<std::mutex> lock{m_mutex};
178#ifdef OSMIUM_DEBUG_QUEUE_SIZE
179 if (m_queue.empty()) {
180 ++m_empty_counter;
181 }
182#endif
183 m_data_available.wait(lock, [this] {
184 return !m_queue.empty();
185 });
186 if (!m_queue.empty()) {
187 value = std::move(m_queue.front());
188 m_queue.pop();
189 lock.unlock();
190 if (m_max_size) {
191 m_space_available.notify_one();
192 }
193 }
194 }
195
196 bool try_pop(T& value) {
197#ifdef OSMIUM_DEBUG_QUEUE_SIZE
198 ++m_pop_counter;
199#endif
200 {
201 std::lock_guard<std::mutex> lock{m_mutex};
202 if (m_queue.empty()) {
203#ifdef OSMIUM_DEBUG_QUEUE_SIZE
204 ++m_empty_counter;
205#endif
206 return false;
207 }
208 value = std::move(m_queue.front());
209 m_queue.pop();
210 }
211 if (m_max_size) {
212 m_space_available.notify_one();
213 }
214 return true;
215 }
216
217 bool empty() const {
218 std::lock_guard<std::mutex> lock{m_mutex};
219 return m_queue.empty();
220 }
221
222 std::size_t size() const {
223 std::lock_guard<std::mutex> lock{m_mutex};
224 return m_queue.size();
225 }
226
227 }; // class Queue
228
229 } // namespace thread
230
231} // namespace osmium
232
233#endif // OSMIUM_THREAD_QUEUE_HPP
Definition: queue.hpp:57
bool try_pop(T &value)
Definition: queue.hpp:196
std::mutex m_mutex
Definition: queue.hpp:66
Queue & operator=(const Queue &)=delete
Queue & operator=(Queue &&)=delete
bool empty() const
Definition: queue.hpp:217
void push(T value)
Definition: queue.hpp:147
void wait_and_pop(T &value)
Definition: queue.hpp:173
Queue(const Queue &)=delete
std::condition_variable m_space_available
Used to signal producers when queue is not full.
Definition: queue.hpp:74
Queue(std::size_t max_size=0, std::string name="")
Definition: queue.hpp:107
std::size_t size() const
Definition: queue.hpp:222
std::queue< T > m_queue
Definition: queue.hpp:68
Queue(Queue &&)=delete
const std::size_t m_max_size
Definition: queue.hpp:61
std::condition_variable m_data_available
Used to signal consumers when data is available in the queue.
Definition: queue.hpp:71
const std::string m_name
Name of this queue (for debugging only).
Definition: queue.hpp:64
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
Definition: location.hpp:551