]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | /* |
2 | * Copyright Lingxi Li 2015. | |
3 | * Copyright Andrey Semashev 2016. | |
4 | * Distributed under the Boost Software License, Version 1.0. | |
5 | * (See accompanying file LICENSE_1_0.txt or copy at | |
6 | * http://www.boost.org/LICENSE_1_0.txt) | |
7 | */ | |
8 | /*! | |
9 | * \file posix/ipc_reliable_message_queue.cpp | |
10 | * \author Lingxi Li | |
11 | * \author Andrey Semashev | |
12 | * \date 17.11.2015 | |
13 | * | |
14 | * \brief This header is the Boost.Log library implementation, see the library documentation | |
15 | * at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html. | |
16 | * | |
17 | * This file provides an interprocess message queue implementation on POSIX platforms. | |
18 | */ | |
19 | ||
20 | #include <boost/log/detail/config.hpp> | |
21 | #include <cstddef> | |
22 | #include <cerrno> | |
23 | #include <cstring> | |
1e59de90 | 24 | #include <ctime> |
7c673cae FG |
25 | #include <new> |
26 | #include <string> | |
27 | #include <stdexcept> | |
28 | #include <algorithm> | |
29 | #include <unistd.h> | |
30 | #if defined(BOOST_HAS_SCHED_YIELD) | |
31 | #include <sched.h> | |
32 | #elif defined(BOOST_HAS_PTHREAD_YIELD) | |
33 | #include <pthread.h> | |
34 | #elif defined(BOOST_HAS_NANOSLEEP) | |
35 | #include <time.h> | |
36 | #endif | |
37 | #include <boost/assert.hpp> | |
38 | #include <boost/static_assert.hpp> | |
39 | #include <boost/cstdint.hpp> | |
1e59de90 | 40 | #include <boost/memory_order.hpp> |
7c673cae | 41 | #include <boost/atomic/atomic.hpp> |
1e59de90 | 42 | #include <boost/atomic/ipc_atomic.hpp> |
7c673cae FG |
43 | #include <boost/atomic/capabilities.hpp> |
44 | #include <boost/throw_exception.hpp> | |
45 | #include <boost/log/exceptions.hpp> | |
46 | #include <boost/log/utility/ipc/reliable_message_queue.hpp> | |
47 | #include <boost/log/support/exception.hpp> | |
48 | #include <boost/log/detail/pause.hpp> | |
49 | #include <boost/exception/info.hpp> | |
50 | #include <boost/exception/enable_error_info.hpp> | |
51 | #include <boost/interprocess/creation_tags.hpp> | |
52 | #include <boost/interprocess/exceptions.hpp> | |
53 | #include <boost/interprocess/permissions.hpp> | |
54 | #include <boost/interprocess/mapped_region.hpp> | |
55 | #include <boost/interprocess/shared_memory_object.hpp> | |
56 | #include <boost/align/align_up.hpp> | |
57 | #include "ipc_sync_wrappers.hpp" | |
58 | #include "murmur3.hpp" | |
59 | #include "bit_tools.hpp" | |
60 | #include <boost/log/detail/header.hpp> | |
61 | ||
7c673cae FG |
62 | namespace boost { |
63 | ||
64 | BOOST_LOG_OPEN_NAMESPACE | |
65 | ||
66 | namespace ipc { | |
67 | ||
68 | //! Message queue implementation data | |
69 | struct reliable_message_queue::implementation | |
70 | { | |
71 | private: | |
72 | //! Header of an allocation block within the message queue. Placed at the beginning of the block within the shared memory segment. | |
73 | struct block_header | |
74 | { | |
75 | // Element data alignment, in bytes | |
76 | enum { data_alignment = 32u }; | |
77 | ||
78 | //! Size of the element data, in bytes | |
79 | size_type m_size; | |
80 | ||
81 | //! Returns the block header overhead, in bytes | |
82 | static BOOST_CONSTEXPR size_type get_header_overhead() BOOST_NOEXCEPT | |
83 | { | |
84 | return static_cast< size_type >(boost::alignment::align_up(sizeof(block_header), data_alignment)); | |
85 | } | |
86 | ||
87 | //! Returns a pointer to the element data | |
88 | void* get_data() const BOOST_NOEXCEPT | |
89 | { | |
90 | return const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + get_header_overhead(); | |
91 | } | |
92 | }; | |
93 | ||
94 | //! Header of the message queue. Placed at the beginning of the shared memory segment. | |
95 | struct header | |
96 | { | |
97 | // Increment this constant whenever you change the binary layout of the queue (apart from this header structure) | |
98 | enum { abi_version = 0 }; | |
99 | ||
100 | // !!! Whenever you add/remove members in this structure, also modify get_abi_tag() function accordingly !!! | |
101 | ||
102 | //! A tag value to ensure the correct binary layout of the message queue data structures. Must be placed first and always have a fixed size and alignment. | |
103 | uint32_t m_abi_tag; | |
104 | //! Padding to protect against alignment changes in Boost.Atomic. Don't use BOOST_ALIGNMENT to ensure portability. | |
105 | unsigned char m_padding[BOOST_LOG_CPU_CACHE_LINE_SIZE - sizeof(uint32_t)]; | |
106 | //! Reference counter. Also acts as a flag indicating that the queue is constructed (i.e. the queue is constructed when the counter is not 0). | |
1e59de90 | 107 | boost::ipc_atomic< uint32_t > m_ref_count; |
7c673cae FG |
108 | //! Number of allocation blocks in the queue. |
109 | const uint32_t m_capacity; | |
110 | //! Size of an allocation block, in bytes. | |
111 | const size_type m_block_size; | |
112 | //! Mutex for protecting queue data structures. | |
113 | boost::log::ipc::aux::interprocess_mutex m_mutex; | |
114 | //! Condition variable used to block readers when the queue is empty. | |
115 | boost::log::ipc::aux::interprocess_condition_variable m_nonempty_queue; | |
116 | //! Condition variable used to block writers when the queue is full. | |
117 | boost::log::ipc::aux::interprocess_condition_variable m_nonfull_queue; | |
118 | //! The current number of allocated blocks in the queue. | |
119 | uint32_t m_size; | |
120 | //! The current writing position (allocation block index). | |
121 | uint32_t m_put_pos; | |
122 | //! The current reading position (allocation block index). | |
123 | uint32_t m_get_pos; | |
124 | ||
125 | header(uint32_t capacity, size_type block_size) : | |
126 | m_abi_tag(get_abi_tag()), | |
127 | m_capacity(capacity), | |
128 | m_block_size(block_size), | |
129 | m_size(0u), | |
130 | m_put_pos(0u), | |
131 | m_get_pos(0u) | |
132 | { | |
133 | // Must be initialized last. m_ref_count is zero-initialized initially. | |
1e59de90 | 134 | m_ref_count.opaque_add(1u, boost::memory_order_release); |
7c673cae FG |
135 | } |
136 | ||
137 | //! Returns the header structure ABI tag | |
138 | static uint32_t get_abi_tag() BOOST_NOEXCEPT | |
139 | { | |
140 | // This FOURCC identifies the queue type | |
141 | boost::log::aux::murmur3_32 hash(boost::log::aux::make_fourcc('r', 'e', 'l', 'q')); | |
142 | ||
143 | // This FOURCC identifies the queue implementation | |
144 | hash.mix(boost::log::aux::make_fourcc('p', 't', 'h', 'r')); | |
145 | hash.mix(abi_version); | |
146 | ||
147 | // We will use these constants to align pointers | |
148 | hash.mix(BOOST_LOG_CPU_CACHE_LINE_SIZE); | |
149 | hash.mix(block_header::data_alignment); | |
150 | ||
151 | // The members in the sequence below must be enumerated in the same order as they are declared in the header structure. | |
152 | // The ABI tag is supposed change whenever a member changes size or offset from the beginning of the header. | |
153 | ||
154 | #define BOOST_LOG_MIX_HEADER_MEMBER(name)\ | |
155 | hash.mix(static_cast< uint32_t >(sizeof(((header*)NULL)->name)));\ | |
156 | hash.mix(static_cast< uint32_t >(offsetof(header, name))) | |
157 | ||
158 | BOOST_LOG_MIX_HEADER_MEMBER(m_abi_tag); | |
159 | BOOST_LOG_MIX_HEADER_MEMBER(m_padding); | |
160 | BOOST_LOG_MIX_HEADER_MEMBER(m_ref_count); | |
161 | BOOST_LOG_MIX_HEADER_MEMBER(m_capacity); | |
162 | BOOST_LOG_MIX_HEADER_MEMBER(m_block_size); | |
163 | BOOST_LOG_MIX_HEADER_MEMBER(m_mutex); | |
164 | BOOST_LOG_MIX_HEADER_MEMBER(m_nonempty_queue); | |
165 | BOOST_LOG_MIX_HEADER_MEMBER(m_nonfull_queue); | |
166 | BOOST_LOG_MIX_HEADER_MEMBER(m_size); | |
167 | BOOST_LOG_MIX_HEADER_MEMBER(m_put_pos); | |
168 | BOOST_LOG_MIX_HEADER_MEMBER(m_get_pos); | |
169 | ||
170 | #undef BOOST_LOG_MIX_HEADER_MEMBER | |
171 | ||
172 | return hash.finalize(); | |
173 | } | |
174 | ||
175 | //! Returns an element header at the specified index | |
176 | block_header* get_block(uint32_t index) const BOOST_NOEXCEPT | |
177 | { | |
178 | BOOST_ASSERT(index < m_capacity); | |
179 | unsigned char* p = const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE); | |
180 | p += static_cast< std::size_t >(m_block_size) * static_cast< std::size_t >(index); | |
181 | return reinterpret_cast< block_header* >(p); | |
182 | } | |
183 | ||
184 | BOOST_DELETED_FUNCTION(header(header const&)) | |
185 | BOOST_DELETED_FUNCTION(header& operator=(header const&)) | |
186 | }; | |
187 | ||
188 | private: | |
189 | //! Shared memory object | |
190 | boost::interprocess::shared_memory_object m_shared_memory; | |
191 | //! Shared memory mapping into the process address space | |
192 | boost::interprocess::mapped_region m_region; | |
193 | //! Queue overflow handling policy | |
194 | const overflow_policy m_overflow_policy; | |
195 | //! The mask for selecting bits that constitute size values from 0 to (block_size - 1) | |
196 | size_type m_block_size_mask; | |
197 | //! The number of the bit set in block_size (i.e. log base 2 of block_size) | |
198 | uint32_t m_block_size_log2; | |
199 | //! The flag indicates that stop has been requested | |
1e59de90 | 200 | boost::atomic< bool > m_stop; |
7c673cae FG |
201 | |
202 | //! Queue shared memory object name | |
203 | const object_name m_name; | |
204 | ||
1e59de90 TL |
205 | //! The total number of loop iterations in \c adopt_region for waiting for the region initialization to complete |
206 | static BOOST_CONSTEXPR_OR_CONST unsigned int region_init_wait_loops = 200u; | |
207 | //! Threshold of the number of loop iterations in \c adopt_region for using pause instructions for yielding | |
208 | static BOOST_CONSTEXPR_OR_CONST unsigned int region_init_wait_loops_pause = 16u; | |
209 | //! Threshold of the number of loop iterations in \c adopt_region for using \c short_yield for yielding | |
210 | static BOOST_CONSTEXPR_OR_CONST unsigned int region_init_wait_loops_short_yield = 64u; | |
211 | //! Timeout, in seconds, for performing shared memory creation/opening loop | |
212 | static BOOST_CONSTEXPR_OR_CONST unsigned int region_open_or_create_timeout = 60u; | |
213 | //! The number of short yields to perform during the shared memory creation/opening loop | |
214 | static BOOST_CONSTEXPR_OR_CONST unsigned int region_open_or_create_short_yield_loops = 64u; | |
215 | ||
7c673cae FG |
216 | public: |
217 | //! The constructor creates a new shared memory segment | |
218 | implementation | |
219 | ( | |
220 | open_mode::create_only_tag, | |
221 | object_name const& name, | |
222 | uint32_t capacity, | |
223 | size_type block_size, | |
224 | overflow_policy oflow_policy, | |
225 | permissions const& perms | |
226 | ) : | |
1e59de90 | 227 | m_shared_memory(), |
7c673cae FG |
228 | m_region(), |
229 | m_overflow_policy(oflow_policy), | |
230 | m_block_size_mask(0u), | |
231 | m_block_size_log2(0u), | |
232 | m_stop(false), | |
233 | m_name(name) | |
234 | { | |
1e59de90 TL |
235 | BOOST_ASSERT(block_size >= block_header::get_header_overhead()); |
236 | ||
237 | boost::interprocess::permissions ipc_perms(perms.get_native()); | |
238 | while (true) | |
239 | { | |
240 | try | |
241 | { | |
242 | boost::interprocess::shared_memory_object shared_memory(boost::interprocess::create_only, name.c_str(), boost::interprocess::read_write, ipc_perms); | |
243 | m_shared_memory.swap(shared_memory); | |
244 | break; | |
245 | } | |
246 | catch (boost::interprocess::interprocess_exception& e) | |
247 | { | |
248 | // shared_memory_object does not handle EINTR returned from shm_open internally. | |
249 | // https://github.com/boostorg/interprocess/issues/152 | |
250 | if (e.get_native_error() != EINTR) | |
251 | throw; | |
252 | } | |
253 | } | |
254 | ||
7c673cae FG |
255 | create_region(capacity, block_size); |
256 | } | |
257 | ||
258 | //! The constructor creates a new shared memory segment or opens the existing one | |
259 | implementation | |
260 | ( | |
261 | open_mode::open_or_create_tag, | |
262 | object_name const& name, | |
263 | uint32_t capacity, | |
264 | size_type block_size, | |
265 | overflow_policy oflow_policy, | |
266 | permissions const& perms | |
267 | ) : | |
1e59de90 | 268 | m_shared_memory(), |
7c673cae FG |
269 | m_region(), |
270 | m_overflow_policy(oflow_policy), | |
271 | m_block_size_mask(0u), | |
272 | m_block_size_log2(0u), | |
273 | m_stop(false), | |
274 | m_name(name) | |
275 | { | |
1e59de90 TL |
276 | BOOST_ASSERT(block_size >= block_header::get_header_overhead()); |
277 | ||
278 | // We need to know for certain whether we create the shared memory segment or open an existing one. | |
279 | // This is to ensure that only one thread initializes the segment and all other threads wait until completion. | |
280 | // Since shared_memory_object(open_or_create) constructor does not report whether the segment was actually created, | |
281 | // we have to loop trying to create or open the segment. https://github.com/boostorg/interprocess/issues/151 | |
282 | boost::interprocess::permissions ipc_perms(perms.get_native()); | |
283 | bool created = false; | |
284 | unsigned int i = 0u; | |
285 | std::time_t start_time = std::time(NULL); | |
286 | while (true) | |
287 | { | |
288 | while (true) | |
289 | { | |
290 | try | |
291 | { | |
292 | boost::interprocess::shared_memory_object shared_memory(boost::interprocess::create_only, name.c_str(), boost::interprocess::read_write, ipc_perms); | |
293 | m_shared_memory.swap(shared_memory); | |
294 | created = true; | |
295 | goto done; | |
296 | } | |
297 | catch (boost::interprocess::interprocess_exception& e) | |
298 | { | |
299 | if (e.get_error_code() == boost::interprocess::already_exists_error) | |
300 | break; | |
301 | ||
302 | // shared_memory_object does not handle EINTR returned from shm_open internally. | |
303 | // https://github.com/boostorg/interprocess/issues/152 | |
304 | if (e.get_native_error() != EINTR) | |
305 | throw; | |
306 | } | |
307 | } | |
308 | ||
309 | while (true) | |
310 | { | |
311 | try | |
312 | { | |
313 | boost::interprocess::shared_memory_object shared_memory(boost::interprocess::open_only, name.c_str(), boost::interprocess::read_write); | |
314 | m_shared_memory.swap(shared_memory); | |
315 | created = false; | |
316 | goto done; | |
317 | } | |
318 | catch (boost::interprocess::interprocess_exception& e) | |
319 | { | |
320 | if (e.get_error_code() == boost::interprocess::not_found_error) | |
321 | break; | |
322 | ||
323 | if (e.get_native_error() != EINTR) | |
324 | throw; | |
325 | } | |
326 | } | |
327 | ||
328 | std::time_t now = std::time(NULL); | |
329 | if (BOOST_UNLIKELY((now - start_time) >= region_open_or_create_timeout)) | |
330 | BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be created or opened: shared memory segment failed to be created or opened until timeout (possible livelock)"); | |
331 | ||
332 | if (i < region_open_or_create_short_yield_loops) | |
333 | short_yield(); | |
334 | else | |
335 | long_yield(); | |
336 | ||
337 | ++i; | |
338 | } | |
339 | ||
340 | done: | |
341 | if (created) | |
7c673cae FG |
342 | create_region(capacity, block_size); |
343 | else | |
1e59de90 | 344 | adopt_region(); |
7c673cae FG |
345 | } |
346 | ||
347 | //! The constructor opens the existing shared memory segment | |
348 | implementation | |
349 | ( | |
350 | open_mode::open_only_tag, | |
351 | object_name const& name, | |
352 | overflow_policy oflow_policy | |
353 | ) : | |
1e59de90 | 354 | m_shared_memory(), |
7c673cae FG |
355 | m_region(), |
356 | m_overflow_policy(oflow_policy), | |
357 | m_block_size_mask(0u), | |
358 | m_block_size_log2(0u), | |
359 | m_stop(false), | |
360 | m_name(name) | |
361 | { | |
1e59de90 TL |
362 | while (true) |
363 | { | |
364 | try | |
365 | { | |
366 | boost::interprocess::shared_memory_object shared_memory(boost::interprocess::open_only, name.c_str(), boost::interprocess::read_write); | |
367 | m_shared_memory.swap(shared_memory); | |
368 | break; | |
369 | } | |
370 | catch (boost::interprocess::interprocess_exception& e) | |
371 | { | |
372 | // shared_memory_object does not handle EINTR returned from shm_open internally. | |
373 | // https://github.com/boostorg/interprocess/issues/152 | |
374 | if (e.get_native_error() != EINTR) | |
375 | throw; | |
376 | } | |
377 | } | |
7c673cae | 378 | |
1e59de90 | 379 | adopt_region(); |
7c673cae FG |
380 | } |
381 | ||
382 | ~implementation() | |
383 | { | |
384 | close_region(); | |
385 | } | |
386 | ||
387 | object_name const& name() const BOOST_NOEXCEPT | |
388 | { | |
389 | return m_name; | |
390 | } | |
391 | ||
392 | uint32_t capacity() const BOOST_NOEXCEPT | |
393 | { | |
394 | return get_header()->m_capacity; | |
395 | } | |
396 | ||
397 | size_type block_size() const BOOST_NOEXCEPT | |
398 | { | |
399 | return get_header()->m_block_size; | |
400 | } | |
401 | ||
402 | operation_result send(void const* message_data, size_type message_size) | |
403 | { | |
404 | const uint32_t block_count = estimate_block_count(message_size); | |
405 | ||
406 | header* const hdr = get_header(); | |
407 | ||
408 | if (BOOST_UNLIKELY(block_count > hdr->m_capacity)) | |
409 | BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity"); | |
410 | ||
1e59de90 | 411 | if (m_stop.load(boost::memory_order_relaxed)) |
7c673cae FG |
412 | return aborted; |
413 | ||
414 | lock_queue(); | |
415 | boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex); | |
416 | ||
417 | while (true) | |
418 | { | |
1e59de90 | 419 | if (m_stop.load(boost::memory_order_relaxed)) |
7c673cae FG |
420 | return aborted; |
421 | ||
422 | if ((hdr->m_capacity - hdr->m_size) >= block_count) | |
423 | break; | |
424 | ||
425 | const overflow_policy oflow_policy = m_overflow_policy; | |
426 | if (oflow_policy == fail_on_overflow) | |
427 | return no_space; | |
428 | else if (BOOST_UNLIKELY(oflow_policy == throw_on_overflow)) | |
429 | BOOST_LOG_THROW_DESCR(capacity_limit_reached, "Interprocess queue is full"); | |
430 | ||
431 | hdr->m_nonfull_queue.wait(hdr->m_mutex); | |
432 | } | |
433 | ||
434 | enqueue_message(message_data, message_size, block_count); | |
435 | ||
436 | return succeeded; | |
437 | } | |
438 | ||
439 | bool try_send(void const* message_data, size_type message_size) | |
440 | { | |
441 | const uint32_t block_count = estimate_block_count(message_size); | |
442 | ||
443 | header* const hdr = get_header(); | |
444 | ||
445 | if (BOOST_UNLIKELY(block_count > hdr->m_capacity)) | |
446 | BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity"); | |
447 | ||
1e59de90 | 448 | if (m_stop.load(boost::memory_order_relaxed)) |
7c673cae FG |
449 | return false; |
450 | ||
451 | lock_queue(); | |
452 | boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex); | |
453 | ||
1e59de90 | 454 | if (m_stop.load(boost::memory_order_relaxed)) |
7c673cae FG |
455 | return false; |
456 | ||
457 | if ((hdr->m_capacity - hdr->m_size) < block_count) | |
458 | return false; | |
459 | ||
460 | enqueue_message(message_data, message_size, block_count); | |
461 | ||
462 | return true; | |
463 | } | |
464 | ||
465 | operation_result receive(receive_handler handler, void* state) | |
466 | { | |
1e59de90 | 467 | if (m_stop.load(boost::memory_order_relaxed)) |
7c673cae FG |
468 | return aborted; |
469 | ||
470 | lock_queue(); | |
471 | header* const hdr = get_header(); | |
472 | boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex); | |
473 | ||
474 | while (true) | |
475 | { | |
1e59de90 | 476 | if (m_stop.load(boost::memory_order_relaxed)) |
7c673cae FG |
477 | return aborted; |
478 | ||
479 | if (hdr->m_size > 0u) | |
480 | break; | |
481 | ||
482 | hdr->m_nonempty_queue.wait(hdr->m_mutex); | |
483 | } | |
484 | ||
485 | dequeue_message(handler, state); | |
486 | ||
487 | return succeeded; | |
488 | } | |
489 | ||
490 | bool try_receive(receive_handler handler, void* state) | |
491 | { | |
1e59de90 | 492 | if (m_stop.load(boost::memory_order_relaxed)) |
7c673cae FG |
493 | return false; |
494 | ||
495 | lock_queue(); | |
496 | header* const hdr = get_header(); | |
497 | boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex); | |
498 | ||
499 | if (hdr->m_size == 0u) | |
500 | return false; | |
501 | ||
502 | dequeue_message(handler, state); | |
503 | ||
504 | return true; | |
505 | } | |
506 | ||
507 | void stop_local() | |
508 | { | |
1e59de90 | 509 | if (m_stop.load(boost::memory_order_relaxed)) |
7c673cae FG |
510 | return; |
511 | ||
512 | lock_queue(); | |
513 | header* const hdr = get_header(); | |
514 | boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex); | |
515 | ||
1e59de90 | 516 | m_stop.store(true, boost::memory_order_relaxed); |
7c673cae FG |
517 | |
518 | hdr->m_nonempty_queue.notify_all(); | |
519 | hdr->m_nonfull_queue.notify_all(); | |
520 | } | |
521 | ||
522 | void reset_local() | |
523 | { | |
1e59de90 | 524 | m_stop.store(false, boost::memory_order_relaxed); |
7c673cae FG |
525 | } |
526 | ||
527 | void clear() | |
528 | { | |
529 | lock_queue(); | |
530 | header* const hdr = get_header(); | |
531 | boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(hdr->m_mutex); | |
532 | clear_queue(); | |
533 | } | |
534 | ||
535 | private: | |
536 | header* get_header() const BOOST_NOEXCEPT | |
537 | { | |
538 | return static_cast< header* >(m_region.get_address()); | |
539 | } | |
540 | ||
541 | static std::size_t estimate_region_size(uint32_t capacity, size_type block_size) BOOST_NOEXCEPT | |
542 | { | |
543 | return boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE) + static_cast< std::size_t >(capacity) * static_cast< std::size_t >(block_size); | |
544 | } | |
545 | ||
546 | void create_region(uint32_t capacity, size_type block_size) | |
547 | { | |
548 | const std::size_t shmem_size = estimate_region_size(capacity, block_size); | |
549 | m_shared_memory.truncate(shmem_size); | |
550 | boost::interprocess::mapped_region(m_shared_memory, boost::interprocess::read_write, 0u, shmem_size).swap(m_region); | |
551 | ||
552 | new (m_region.get_address()) header(capacity, block_size); | |
553 | ||
554 | init_block_size(block_size); | |
555 | } | |
556 | ||
1e59de90 | 557 | void adopt_region() |
7c673cae | 558 | { |
1e59de90 TL |
559 | std::size_t shmem_size = 0u; |
560 | unsigned int i = 0u; | |
561 | std::time_t start_time = std::time(NULL); | |
562 | while (true) | |
563 | { | |
564 | boost::interprocess::offset_t shm_size = 0; | |
565 | const bool get_size_result = m_shared_memory.get_size(shm_size); | |
566 | if (BOOST_LIKELY(get_size_result && shm_size > 0)) | |
567 | { | |
568 | shmem_size = static_cast< std::size_t >(shm_size); | |
569 | break; | |
570 | } | |
571 | ||
572 | std::time_t now = std::time(NULL); | |
573 | if (BOOST_UNLIKELY((now - start_time) >= region_open_or_create_timeout)) | |
574 | { | |
575 | if (get_size_result) | |
576 | goto shmem_size_too_small; | |
577 | BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size could not be determined until timeout"); | |
578 | } | |
579 | ||
580 | if (i < region_open_or_create_short_yield_loops) | |
581 | short_yield(); | |
582 | else | |
583 | long_yield(); | |
584 | ||
585 | ++i; | |
586 | } | |
587 | ||
588 | if (BOOST_UNLIKELY(shmem_size < sizeof(header))) | |
589 | { | |
590 | shmem_size_too_small: | |
7c673cae | 591 | BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size too small"); |
1e59de90 | 592 | } |
7c673cae | 593 | |
1e59de90 | 594 | boost::interprocess::mapped_region(m_shared_memory, boost::interprocess::read_write, 0, shmem_size).swap(m_region); |
7c673cae FG |
595 | |
596 | // Wait until the mapped region becomes initialized | |
597 | header* const hdr = get_header(); | |
1e59de90 | 598 | for (i = 0u; i < region_init_wait_loops; ++i) |
7c673cae FG |
599 | { |
600 | uint32_t ref_count = hdr->m_ref_count.load(boost::memory_order_acquire); | |
601 | while (ref_count > 0u) | |
602 | { | |
603 | if (hdr->m_ref_count.compare_exchange_weak(ref_count, ref_count + 1u, boost::memory_order_acq_rel, boost::memory_order_acquire)) | |
604 | goto done; | |
605 | } | |
606 | ||
1e59de90 TL |
607 | if (i < region_init_wait_loops_pause) |
608 | boost::log::aux::pause(); | |
609 | else if (i < region_init_wait_loops_short_yield) | |
610 | short_yield(); | |
7c673cae | 611 | else |
1e59de90 | 612 | long_yield(); |
7c673cae FG |
613 | } |
614 | ||
615 | BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment is not initialized by creator for too long"); | |
616 | ||
617 | done: | |
618 | try | |
619 | { | |
620 | // Check that the queue layout matches the current process ABI | |
621 | if (hdr->m_abi_tag != header::get_abi_tag()) | |
622 | BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue ABI is incompatible"); | |
623 | ||
624 | if (!boost::log::aux::is_power_of_2(hdr->m_block_size)) | |
625 | BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue block size is not a power of 2"); | |
626 | ||
627 | init_block_size(hdr->m_block_size); | |
628 | } | |
629 | catch (...) | |
630 | { | |
631 | close_region(); | |
632 | throw; | |
633 | } | |
634 | } | |
635 | ||
636 | void close_region() BOOST_NOEXCEPT | |
637 | { | |
638 | header* const hdr = get_header(); | |
639 | ||
640 | if (hdr->m_ref_count.fetch_sub(1u, boost::memory_order_acq_rel) == 1u) | |
641 | { | |
642 | boost::interprocess::shared_memory_object::remove(m_shared_memory.get_name()); | |
643 | ||
644 | hdr->~header(); | |
645 | ||
646 | boost::interprocess::mapped_region().swap(m_region); | |
647 | boost::interprocess::shared_memory_object().swap(m_shared_memory); | |
648 | ||
649 | m_block_size_mask = 0u; | |
650 | m_block_size_log2 = 0u; | |
651 | } | |
652 | } | |
653 | ||
654 | void init_block_size(size_type block_size) | |
655 | { | |
656 | m_block_size_mask = block_size - 1u; | |
657 | ||
658 | uint32_t block_size_log2 = 0u; | |
659 | if ((block_size & 0x0000ffff) == 0u) | |
660 | { | |
661 | block_size >>= 16u; | |
662 | block_size_log2 += 16u; | |
663 | } | |
664 | if ((block_size & 0x000000ff) == 0u) | |
665 | { | |
666 | block_size >>= 8u; | |
667 | block_size_log2 += 8u; | |
668 | } | |
669 | if ((block_size & 0x0000000f) == 0u) | |
670 | { | |
671 | block_size >>= 4u; | |
672 | block_size_log2 += 4u; | |
673 | } | |
674 | if ((block_size & 0x00000003) == 0u) | |
675 | { | |
676 | block_size >>= 2u; | |
677 | block_size_log2 += 2u; | |
678 | } | |
679 | if ((block_size & 0x00000001) == 0u) | |
680 | { | |
681 | ++block_size_log2; | |
682 | } | |
683 | m_block_size_log2 = block_size_log2; | |
684 | } | |
685 | ||
686 | void lock_queue() | |
687 | { | |
688 | header* const hdr = get_header(); | |
689 | ||
690 | #if defined(BOOST_LOG_HAS_PTHREAD_MUTEX_ROBUST) | |
691 | try | |
692 | { | |
693 | #endif | |
694 | hdr->m_mutex.lock(); | |
695 | #if defined(BOOST_LOG_HAS_PTHREAD_MUTEX_ROBUST) | |
696 | } | |
697 | catch (boost::log::ipc::aux::lock_owner_dead&) | |
698 | { | |
699 | // The mutex is locked by the current thread, but the previous owner terminated without releasing the lock | |
700 | try | |
701 | { | |
702 | clear_queue(); | |
703 | hdr->m_mutex.recover(); | |
704 | } | |
705 | catch (...) | |
706 | { | |
707 | hdr->m_mutex.unlock(); | |
708 | throw; | |
709 | } | |
710 | } | |
711 | #endif | |
712 | } | |
713 | ||
714 | void clear_queue() | |
715 | { | |
716 | header* const hdr = get_header(); | |
717 | hdr->m_size = 0u; | |
718 | hdr->m_put_pos = 0u; | |
719 | hdr->m_get_pos = 0u; | |
720 | hdr->m_nonfull_queue.notify_all(); | |
721 | } | |
722 | ||
723 | //! Returns the number of allocation blocks that are required to store user's payload of the specified size | |
724 | uint32_t estimate_block_count(size_type size) const BOOST_NOEXCEPT | |
725 | { | |
726 | // ceil((size + get_header_overhead()) / block_size) | |
727 | return static_cast< uint32_t >((size + block_header::get_header_overhead() + m_block_size_mask) >> m_block_size_log2); | |
728 | } | |
729 | ||
730 | //! Puts the message to the back of the queue | |
731 | void enqueue_message(void const* message_data, size_type message_size, uint32_t block_count) | |
732 | { | |
733 | header* const hdr = get_header(); | |
734 | ||
735 | const uint32_t capacity = hdr->m_capacity; | |
736 | const size_type block_size = hdr->m_block_size; | |
737 | uint32_t pos = hdr->m_put_pos; | |
1e59de90 | 738 | BOOST_ASSERT(pos < capacity); |
7c673cae FG |
739 | |
740 | block_header* block = hdr->get_block(pos); | |
741 | block->m_size = message_size; | |
742 | ||
743 | size_type write_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size); | |
744 | std::memcpy(block->get_data(), message_data, write_size); | |
745 | ||
746 | pos += block_count; | |
747 | if (BOOST_UNLIKELY(pos >= capacity)) | |
748 | { | |
749 | // Write the rest of the message at the beginning of the queue | |
750 | pos -= capacity; | |
751 | message_data = static_cast< const unsigned char* >(message_data) + write_size; | |
752 | write_size = message_size - write_size; | |
753 | if (write_size > 0u) | |
754 | std::memcpy(hdr->get_block(0u), message_data, write_size); | |
755 | } | |
756 | ||
757 | hdr->m_put_pos = pos; | |
758 | ||
759 | const uint32_t old_queue_size = hdr->m_size; | |
760 | hdr->m_size = old_queue_size + block_count; | |
761 | if (old_queue_size == 0u) | |
762 | hdr->m_nonempty_queue.notify_one(); | |
763 | } | |
764 | ||
765 | //! Retrieves the next message and invokes the handler to store the message contents | |
766 | void dequeue_message(receive_handler handler, void* state) | |
767 | { | |
768 | header* const hdr = get_header(); | |
769 | ||
770 | const uint32_t capacity = hdr->m_capacity; | |
771 | const size_type block_size = hdr->m_block_size; | |
772 | uint32_t pos = hdr->m_get_pos; | |
1e59de90 | 773 | BOOST_ASSERT(pos < capacity); |
7c673cae FG |
774 | |
775 | block_header* block = hdr->get_block(pos); | |
776 | size_type message_size = block->m_size; | |
777 | uint32_t block_count = estimate_block_count(message_size); | |
778 | ||
779 | BOOST_ASSERT(block_count <= hdr->m_size); | |
780 | ||
781 | size_type read_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size); | |
782 | handler(state, block->get_data(), read_size); | |
783 | ||
784 | pos += block_count; | |
785 | if (BOOST_UNLIKELY(pos >= capacity)) | |
786 | { | |
787 | // Read the tail of the message | |
788 | pos -= capacity; | |
789 | read_size = message_size - read_size; | |
790 | if (read_size > 0u) | |
791 | handler(state, hdr->get_block(0u), read_size); | |
792 | } | |
793 | ||
794 | hdr->m_get_pos = pos; | |
795 | hdr->m_size -= block_count; | |
796 | ||
797 | hdr->m_nonfull_queue.notify_all(); | |
798 | } | |
1e59de90 TL |
799 | |
800 | static void short_yield() BOOST_NOEXCEPT | |
801 | { | |
802 | #if defined(BOOST_HAS_SCHED_YIELD) | |
803 | sched_yield(); | |
804 | #elif defined(BOOST_HAS_PTHREAD_YIELD) | |
805 | pthread_yield(); | |
806 | #else | |
807 | long_yield(); | |
808 | #endif | |
809 | } | |
810 | ||
811 | static void long_yield() BOOST_NOEXCEPT | |
812 | { | |
813 | #if defined(BOOST_HAS_NANOSLEEP) | |
814 | timespec ts = {}; | |
815 | ts.tv_sec = 0; | |
816 | ts.tv_nsec = 1000; | |
817 | nanosleep(&ts, NULL); | |
818 | #else | |
819 | usleep(1); | |
820 | #endif | |
821 | } | |
7c673cae FG |
822 | }; |
823 | ||
824 | BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms) | |
825 | { | |
826 | BOOST_ASSERT(m_impl == NULL); | |
827 | if (!boost::log::aux::is_power_of_2(block_size)) | |
828 | BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2")); | |
829 | try | |
830 | { | |
831 | m_impl = new implementation(open_mode::create_only, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms); | |
832 | } | |
833 | catch (boost::exception& e) | |
834 | { | |
835 | e << boost::log::ipc::object_name_info(name); | |
836 | throw; | |
837 | } | |
838 | catch (boost::interprocess::interprocess_exception& e) | |
839 | { | |
840 | BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e.get_native_error(), boost::system::system_category()), e.what())) << boost::log::ipc::object_name_info(name)); | |
841 | } | |
842 | } | |
843 | ||
844 | BOOST_LOG_API void reliable_message_queue::open_or_create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms) | |
845 | { | |
846 | BOOST_ASSERT(m_impl == NULL); | |
847 | if (!boost::log::aux::is_power_of_2(block_size)) | |
848 | BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2")); | |
849 | try | |
850 | { | |
851 | m_impl = new implementation(open_mode::open_or_create, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms); | |
852 | } | |
853 | catch (boost::exception& e) | |
854 | { | |
855 | e << boost::log::ipc::object_name_info(name); | |
856 | throw; | |
857 | } | |
858 | catch (boost::interprocess::interprocess_exception& e) | |
859 | { | |
860 | BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e.get_native_error(), boost::system::system_category()), e.what())) << boost::log::ipc::object_name_info(name)); | |
861 | } | |
862 | } | |
863 | ||
864 | BOOST_LOG_API void reliable_message_queue::open(object_name const& name, overflow_policy oflow_policy, permissions const&) | |
865 | { | |
866 | BOOST_ASSERT(m_impl == NULL); | |
867 | try | |
868 | { | |
869 | m_impl = new implementation(open_mode::open_only, name, oflow_policy); | |
870 | } | |
871 | catch (boost::exception& e) | |
872 | { | |
873 | e << boost::log::ipc::object_name_info(name); | |
874 | throw; | |
875 | } | |
876 | catch (boost::interprocess::interprocess_exception& e) | |
877 | { | |
878 | BOOST_THROW_EXCEPTION(boost::enable_error_info(system_error(boost::system::error_code(e.get_native_error(), boost::system::system_category()), e.what())) << boost::log::ipc::object_name_info(name)); | |
879 | } | |
880 | } | |
881 | ||
882 | BOOST_LOG_API void reliable_message_queue::clear() | |
883 | { | |
884 | BOOST_ASSERT(m_impl != NULL); | |
885 | try | |
886 | { | |
887 | m_impl->clear(); | |
888 | } | |
889 | catch (boost::exception& e) | |
890 | { | |
891 | e << boost::log::ipc::object_name_info(m_impl->name()); | |
892 | throw; | |
893 | } | |
894 | } | |
895 | ||
896 | BOOST_LOG_API object_name const& reliable_message_queue::name() const | |
897 | { | |
898 | BOOST_ASSERT(m_impl != NULL); | |
899 | return m_impl->name(); | |
900 | } | |
901 | ||
902 | BOOST_LOG_API uint32_t reliable_message_queue::capacity() const | |
903 | { | |
904 | BOOST_ASSERT(m_impl != NULL); | |
905 | return m_impl->capacity(); | |
906 | } | |
907 | ||
908 | BOOST_LOG_API reliable_message_queue::size_type reliable_message_queue::block_size() const | |
909 | { | |
910 | BOOST_ASSERT(m_impl != NULL); | |
911 | return m_impl->block_size(); | |
912 | } | |
913 | ||
914 | BOOST_LOG_API void reliable_message_queue::stop_local() | |
915 | { | |
916 | BOOST_ASSERT(m_impl != NULL); | |
917 | try | |
918 | { | |
919 | m_impl->stop_local(); | |
920 | } | |
921 | catch (boost::exception& e) | |
922 | { | |
923 | e << boost::log::ipc::object_name_info(m_impl->name()); | |
924 | throw; | |
925 | } | |
926 | } | |
927 | ||
928 | BOOST_LOG_API void reliable_message_queue::reset_local() | |
929 | { | |
930 | BOOST_ASSERT(m_impl != NULL); | |
931 | try | |
932 | { | |
933 | m_impl->reset_local(); | |
934 | } | |
935 | catch (boost::exception& e) | |
936 | { | |
937 | e << boost::log::ipc::object_name_info(m_impl->name()); | |
938 | throw; | |
939 | } | |
940 | } | |
941 | ||
942 | BOOST_LOG_API void reliable_message_queue::do_close() BOOST_NOEXCEPT | |
943 | { | |
944 | delete m_impl; | |
945 | m_impl = NULL; | |
946 | } | |
947 | ||
948 | BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::send(void const* message_data, size_type message_size) | |
949 | { | |
950 | BOOST_ASSERT(m_impl != NULL); | |
951 | try | |
952 | { | |
953 | return m_impl->send(message_data, message_size); | |
954 | } | |
955 | catch (boost::exception& e) | |
956 | { | |
957 | e << boost::log::ipc::object_name_info(m_impl->name()); | |
958 | throw; | |
959 | } | |
960 | } | |
961 | ||
962 | BOOST_LOG_API bool reliable_message_queue::try_send(void const* message_data, size_type message_size) | |
963 | { | |
964 | BOOST_ASSERT(m_impl != NULL); | |
965 | try | |
966 | { | |
967 | return m_impl->try_send(message_data, message_size); | |
968 | } | |
969 | catch (boost::exception& e) | |
970 | { | |
971 | e << boost::log::ipc::object_name_info(m_impl->name()); | |
972 | throw; | |
973 | } | |
974 | } | |
975 | ||
976 | BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::do_receive(receive_handler handler, void* state) | |
977 | { | |
978 | BOOST_ASSERT(m_impl != NULL); | |
979 | try | |
980 | { | |
981 | return m_impl->receive(handler, state); | |
982 | } | |
983 | catch (boost::exception& e) | |
984 | { | |
985 | e << boost::log::ipc::object_name_info(m_impl->name()); | |
986 | throw; | |
987 | } | |
988 | } | |
989 | ||
990 | BOOST_LOG_API bool reliable_message_queue::do_try_receive(receive_handler handler, void* state) | |
991 | { | |
992 | BOOST_ASSERT(m_impl != NULL); | |
993 | try | |
994 | { | |
995 | return m_impl->try_receive(handler, state); | |
996 | } | |
997 | catch (boost::exception& e) | |
998 | { | |
999 | e << boost::log::ipc::object_name_info(m_impl->name()); | |
1000 | throw; | |
1001 | } | |
1002 | } | |
1003 | ||
1004 | //! Fixed buffer receive handler | |
1005 | BOOST_LOG_API void reliable_message_queue::fixed_buffer_receive_handler(void* state, const void* data, size_type size) | |
1006 | { | |
1007 | fixed_buffer_state* p = static_cast< fixed_buffer_state* >(state); | |
1008 | if (BOOST_UNLIKELY(size > p->size)) | |
1009 | BOOST_THROW_EXCEPTION(bad_alloc("Buffer too small to receive the message")); | |
1010 | ||
1011 | std::memcpy(p->data, data, size); | |
1012 | p->data += size; | |
1013 | p->size -= size; | |
1014 | } | |
1015 | ||
1016 | BOOST_LOG_API void reliable_message_queue::remove(object_name const& name) | |
1017 | { | |
1018 | boost::interprocess::shared_memory_object::remove(name.c_str()); | |
1019 | } | |
1020 | ||
1021 | } // namespace ipc | |
1022 | ||
1023 | BOOST_LOG_CLOSE_NAMESPACE // namespace log | |
1024 | ||
1025 | } // namespace boost | |
1026 | ||
1027 | #include <boost/log/detail/footer.hpp> |