]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | /* |
2 | * Copyright Lingxi Li 2015. | |
3 | * Copyright Andrey Semashev 2016. | |
4 | * Distributed under the Boost Software License, Version 1.0. | |
5 | * (See accompanying file LICENSE_1_0.txt or copy at | |
6 | * http://www.boost.org/LICENSE_1_0.txt) | |
7 | */ | |
8 | /*! | |
9 | * \file ipc_reliable_message_queue_win.hpp | |
10 | * \author Lingxi Li | |
11 | * \author Andrey Semashev | |
12 | * \date 28.10.2015 | |
13 | * | |
14 | * \brief This header is the Boost.Log library implementation, see the library documentation | |
15 | * at http://www.boost.org/doc/libs/release/libs/log/doc/html/index.html. | |
16 | * | |
17 | * This file provides an interprocess message queue implementation on POSIX platforms. | |
18 | */ | |
19 | ||
20 | #include <boost/log/detail/config.hpp> | |
21 | #include <cstddef> | |
22 | #include <cstring> | |
23 | #include <new> | |
24 | #include <limits> | |
25 | #include <string> | |
26 | #include <algorithm> | |
27 | #include <stdexcept> | |
28 | #include <boost/assert.hpp> | |
29 | #include <boost/static_assert.hpp> | |
30 | #include <boost/cstdint.hpp> | |
31 | #include <boost/atomic/atomic.hpp> | |
32 | #include <boost/atomic/capabilities.hpp> | |
33 | #include <boost/log/exceptions.hpp> | |
34 | #include <boost/log/utility/ipc/reliable_message_queue.hpp> | |
35 | #include <boost/log/support/exception.hpp> | |
36 | #include <boost/log/detail/pause.hpp> | |
37 | #include <boost/exception/info.hpp> | |
38 | #include <boost/exception/enable_error_info.hpp> | |
39 | #include <boost/align/align_up.hpp> | |
b32b8144 | 40 | #include <boost/winapi/thread.hpp> // SwitchToThread |
7c673cae FG |
41 | #include "windows/ipc_sync_wrappers.hpp" |
42 | #include "windows/mapped_shared_memory.hpp" | |
43 | #include "windows/utf_code_conversion.hpp" | |
44 | #include "murmur3.hpp" | |
45 | #include "bit_tools.hpp" | |
46 | #include <windows.h> | |
47 | #include <boost/log/detail/header.hpp> | |
48 | ||
49 | #if BOOST_ATOMIC_INT32_LOCK_FREE != 2 | |
50 | // 32-bit atomic ops are required to be able to place atomic<uint32_t> in the process-shared memory | |
51 | #error Boost.Log: Native 32-bit atomic operations are required but not supported by Boost.Atomic on the target platform | |
52 | #endif | |
53 | ||
54 | //! A suffix used in names of interprocess objects created by the queue. | |
55 | //! Used as a protection against clashing with user-supplied names of interprocess queues and also to resolve conflicts between queues of different types. | |
56 | #define BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".3010b9950926463398eee00b35b44651" | |
57 | ||
58 | namespace boost { | |
59 | ||
60 | BOOST_LOG_OPEN_NAMESPACE | |
61 | ||
62 | namespace ipc { | |
63 | ||
64 | //! Message queue implementation data | |
65 | struct reliable_message_queue::implementation | |
66 | { | |
67 | private: | |
68 | //! Header of an allocation block within the message queue. Placed at the beginning of the block within the shared memory segment. | |
69 | struct block_header | |
70 | { | |
71 | // Element data alignment, in bytes | |
72 | enum { data_alignment = 32u }; | |
73 | ||
74 | //! Size of the element data, in bytes | |
75 | size_type m_size; | |
76 | ||
77 | //! Returns the block header overhead, in bytes | |
78 | static BOOST_CONSTEXPR size_type get_header_overhead() BOOST_NOEXCEPT | |
79 | { | |
80 | return static_cast< size_type >(boost::alignment::align_up(sizeof(block_header), data_alignment)); | |
81 | } | |
82 | ||
83 | //! Returns a pointer to the element data | |
84 | void* get_data() const BOOST_NOEXCEPT | |
85 | { | |
86 | return const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + get_header_overhead(); | |
87 | } | |
88 | }; | |
89 | ||
90 | //! Header of the message queue. Placed at the beginning of the shared memory segment. | |
91 | struct header | |
92 | { | |
93 | // Increment this constant whenever you change the binary layout of the queue (apart from this header structure) | |
94 | enum { abi_version = 0 }; | |
95 | ||
96 | // !!! Whenever you add/remove members in this structure, also modify get_abi_tag() function accordingly !!! | |
97 | ||
98 | //! A tag value to ensure the correct binary layout of the message queue data structures. Must be placed first and always have a fixed size and alignment. | |
99 | uint32_t m_abi_tag; | |
100 | //! Padding to protect against alignment changes in Boost.Atomic. Don't use BOOST_ALIGNMENT to ensure portability. | |
101 | unsigned char m_padding[BOOST_LOG_CPU_CACHE_LINE_SIZE - sizeof(uint32_t)]; | |
102 | //! A flag indicating that the queue is constructed (i.e. the queue is constructed when the value is not 0). | |
103 | boost::atomic< uint32_t > m_initialized; | |
104 | //! Number of allocation blocks in the queue. | |
105 | const uint32_t m_capacity; | |
106 | //! Size of an allocation block, in bytes. | |
107 | const size_type m_block_size; | |
108 | //! Shared state of the mutex for protecting queue data structures. | |
109 | boost::log::ipc::aux::interprocess_mutex::shared_state m_mutex_state; | |
110 | //! Shared state of the condition variable used to block writers when the queue is full. | |
111 | boost::log::ipc::aux::interprocess_condition_variable::shared_state m_nonfull_queue_state; | |
112 | //! The current number of allocated blocks in the queue. | |
113 | uint32_t m_size; | |
114 | //! The current writing position (allocation block index). | |
115 | uint32_t m_put_pos; | |
116 | //! The current reading position (allocation block index). | |
117 | uint32_t m_get_pos; | |
118 | ||
119 | header(uint32_t capacity, size_type block_size) : | |
120 | m_abi_tag(get_abi_tag()), | |
121 | m_capacity(capacity), | |
122 | m_block_size(block_size), | |
123 | m_size(0u), | |
124 | m_put_pos(0u), | |
125 | m_get_pos(0u) | |
126 | { | |
127 | // Must be initialized last. m_initialized is zero-initialized initially. | |
128 | m_initialized.fetch_add(1u, boost::memory_order_release); | |
129 | } | |
130 | ||
131 | //! Returns the header structure ABI tag | |
132 | static uint32_t get_abi_tag() BOOST_NOEXCEPT | |
133 | { | |
134 | // This FOURCC identifies the queue type | |
135 | boost::log::aux::murmur3_32 hash(boost::log::aux::make_fourcc('r', 'e', 'l', 'q')); | |
136 | ||
137 | // This FOURCC identifies the queue implementation | |
138 | hash.mix(boost::log::aux::make_fourcc('w', 'n', 't', '5')); | |
139 | hash.mix(abi_version); | |
140 | ||
141 | // We will use these constants to align pointers | |
142 | hash.mix(BOOST_LOG_CPU_CACHE_LINE_SIZE); | |
143 | hash.mix(block_header::data_alignment); | |
144 | ||
145 | // The members in the sequence below must be enumerated in the same order as they are declared in the header structure. | |
146 | // The ABI tag is supposed change whenever a member changes size or offset from the beginning of the header. | |
147 | ||
148 | #define BOOST_LOG_MIX_HEADER_MEMBER(name)\ | |
149 | hash.mix(static_cast< uint32_t >(sizeof(((header*)NULL)->name)));\ | |
150 | hash.mix(static_cast< uint32_t >(offsetof(header, name))) | |
151 | ||
152 | BOOST_LOG_MIX_HEADER_MEMBER(m_abi_tag); | |
153 | BOOST_LOG_MIX_HEADER_MEMBER(m_padding); | |
154 | BOOST_LOG_MIX_HEADER_MEMBER(m_initialized); | |
155 | BOOST_LOG_MIX_HEADER_MEMBER(m_capacity); | |
156 | BOOST_LOG_MIX_HEADER_MEMBER(m_block_size); | |
157 | BOOST_LOG_MIX_HEADER_MEMBER(m_mutex_state); | |
158 | BOOST_LOG_MIX_HEADER_MEMBER(m_nonfull_queue_state); | |
159 | BOOST_LOG_MIX_HEADER_MEMBER(m_size); | |
160 | BOOST_LOG_MIX_HEADER_MEMBER(m_put_pos); | |
161 | BOOST_LOG_MIX_HEADER_MEMBER(m_get_pos); | |
162 | ||
163 | #undef BOOST_LOG_MIX_HEADER_MEMBER | |
164 | ||
165 | return hash.finalize(); | |
166 | } | |
167 | ||
168 | //! Returns an element header at the specified index | |
169 | block_header* get_block(uint32_t index) const BOOST_NOEXCEPT | |
170 | { | |
171 | BOOST_ASSERT(index < m_capacity); | |
172 | unsigned char* p = const_cast< unsigned char* >(reinterpret_cast< const unsigned char* >(this)) + boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE); | |
173 | p += static_cast< std::size_t >(m_block_size) * static_cast< std::size_t >(index); | |
174 | return reinterpret_cast< block_header* >(p); | |
175 | } | |
176 | ||
177 | BOOST_DELETED_FUNCTION(header(header const&)) | |
178 | BOOST_DELETED_FUNCTION(header& operator=(header const&)) | |
179 | }; | |
180 | ||
181 | private: | |
182 | //! Shared memory object and mapping | |
183 | boost::log::ipc::aux::mapped_shared_memory m_shared_memory; | |
184 | //! Queue overflow handling policy | |
185 | const overflow_policy m_overflow_policy; | |
186 | //! The mask for selecting bits that constitute size values from 0 to (block_size - 1) | |
187 | size_type m_block_size_mask; | |
188 | //! The number of the bit set in block_size (i.e. log base 2 of block_size) | |
189 | uint32_t m_block_size_log2; | |
190 | ||
191 | //! Mutex for protecting queue data structures. | |
192 | boost::log::ipc::aux::interprocess_mutex m_mutex; | |
193 | //! Event used to block readers when the queue is empty. | |
194 | boost::log::ipc::aux::interprocess_event m_nonempty_queue; | |
195 | //! Condition variable used to block writers when the queue is full. | |
196 | boost::log::ipc::aux::interprocess_condition_variable m_nonfull_queue; | |
197 | //! The event indicates that stop has been requested | |
198 | boost::log::ipc::aux::auto_handle m_stop; | |
199 | ||
200 | //! The queue name, as specified by the user | |
201 | const object_name m_name; | |
202 | ||
203 | public: | |
204 | //! The constructor creates a new shared memory segment | |
205 | implementation | |
206 | ( | |
207 | open_mode::create_only_tag, | |
208 | object_name const& name, | |
209 | uint32_t capacity, | |
210 | size_type block_size, | |
211 | overflow_policy oflow_policy, | |
212 | permissions const& perms | |
213 | ) : | |
214 | m_overflow_policy(oflow_policy), | |
215 | m_block_size_mask(0u), | |
216 | m_block_size_log2(0u), | |
217 | m_name(name) | |
218 | { | |
219 | const std::wstring wname = boost::log::aux::utf8_to_utf16(name.c_str()); | |
220 | const std::size_t shmem_size = estimate_region_size(capacity, block_size); | |
221 | m_shared_memory.create(wname.c_str(), shmem_size, perms); | |
222 | m_shared_memory.map(); | |
223 | ||
224 | create_queue(wname, capacity, block_size, perms); | |
225 | } | |
226 | ||
227 | //! The constructor creates a new shared memory segment or opens the existing one | |
228 | implementation | |
229 | ( | |
230 | open_mode::open_or_create_tag, | |
231 | object_name const& name, | |
232 | uint32_t capacity, | |
233 | size_type block_size, | |
234 | overflow_policy oflow_policy, | |
235 | permissions const& perms | |
236 | ) : | |
237 | m_overflow_policy(oflow_policy), | |
238 | m_block_size_mask(0u), | |
239 | m_block_size_log2(0u), | |
240 | m_name(name) | |
241 | { | |
242 | const std::wstring wname = boost::log::aux::utf8_to_utf16(name.c_str()); | |
243 | const std::size_t shmem_size = estimate_region_size(capacity, block_size); | |
244 | const bool created = m_shared_memory.create_or_open(wname.c_str(), shmem_size, perms); | |
245 | m_shared_memory.map(); | |
246 | ||
247 | if (created) | |
248 | create_queue(wname, capacity, block_size, perms); | |
249 | else | |
250 | adopt_queue(wname, m_shared_memory.size(), perms); | |
251 | } | |
252 | ||
253 | //! The constructor opens the existing shared memory segment | |
254 | implementation | |
255 | ( | |
256 | open_mode::open_only_tag, | |
257 | object_name const& name, | |
258 | overflow_policy oflow_policy, | |
259 | permissions const& perms | |
260 | ) : | |
261 | m_overflow_policy(oflow_policy), | |
262 | m_block_size_mask(0u), | |
263 | m_block_size_log2(0u), | |
264 | m_name(name) | |
265 | { | |
266 | const std::wstring wname = boost::log::aux::utf8_to_utf16(name.c_str()); | |
267 | m_shared_memory.open(wname.c_str()); | |
268 | m_shared_memory.map(); | |
269 | ||
270 | adopt_queue(wname, m_shared_memory.size(), perms); | |
271 | } | |
272 | ||
273 | object_name const& name() const BOOST_NOEXCEPT | |
274 | { | |
275 | return m_name; | |
276 | } | |
277 | ||
278 | uint32_t capacity() const BOOST_NOEXCEPT | |
279 | { | |
280 | return get_header()->m_capacity; | |
281 | } | |
282 | ||
283 | size_type block_size() const BOOST_NOEXCEPT | |
284 | { | |
285 | return get_header()->m_block_size; | |
286 | } | |
287 | ||
288 | operation_result send(void const* message_data, size_type message_size) | |
289 | { | |
290 | const uint32_t block_count = estimate_block_count(message_size); | |
291 | ||
292 | header* const hdr = get_header(); | |
293 | ||
294 | if (BOOST_UNLIKELY(block_count > hdr->m_capacity)) | |
295 | BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity"); | |
296 | ||
297 | if (!lock_queue()) | |
298 | return aborted; | |
299 | ||
300 | boost::log::ipc::aux::interprocess_mutex::optional_unlock unlock(m_mutex); | |
301 | ||
302 | while (true) | |
303 | { | |
304 | if ((hdr->m_capacity - hdr->m_size) >= block_count) | |
305 | break; | |
306 | ||
307 | const overflow_policy oflow_policy = m_overflow_policy; | |
308 | if (oflow_policy == fail_on_overflow) | |
309 | return no_space; | |
310 | else if (BOOST_UNLIKELY(oflow_policy == throw_on_overflow)) | |
311 | BOOST_LOG_THROW_DESCR(capacity_limit_reached, "Interprocess queue is full"); | |
312 | ||
313 | if (!m_nonfull_queue.wait(unlock, m_stop.get())) | |
314 | return aborted; | |
315 | } | |
316 | ||
317 | enqueue_message(message_data, message_size, block_count); | |
318 | ||
319 | return succeeded; | |
320 | } | |
321 | ||
322 | bool try_send(void const* message_data, size_type message_size) | |
323 | { | |
324 | const uint32_t block_count = estimate_block_count(message_size); | |
325 | ||
326 | header* const hdr = get_header(); | |
327 | ||
328 | if (BOOST_UNLIKELY(block_count > hdr->m_capacity)) | |
329 | BOOST_LOG_THROW_DESCR(logic_error, "Message size exceeds the interprocess queue capacity"); | |
330 | ||
331 | if (!lock_queue()) | |
332 | return false; | |
333 | ||
334 | boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(m_mutex); | |
335 | ||
336 | if ((hdr->m_capacity - hdr->m_size) < block_count) | |
337 | return false; | |
338 | ||
339 | enqueue_message(message_data, message_size, block_count); | |
340 | ||
341 | return true; | |
342 | } | |
343 | ||
344 | operation_result receive(receive_handler handler, void* state) | |
345 | { | |
346 | if (!lock_queue()) | |
347 | return aborted; | |
348 | ||
349 | boost::log::ipc::aux::interprocess_mutex::optional_unlock unlock(m_mutex); | |
350 | ||
351 | header* const hdr = get_header(); | |
352 | ||
353 | while (true) | |
354 | { | |
355 | if (hdr->m_size > 0u) | |
356 | break; | |
357 | ||
358 | m_mutex.unlock(); | |
359 | unlock.disengage(); | |
360 | ||
361 | if (!m_nonempty_queue.wait(m_stop.get()) || !lock_queue()) | |
362 | return aborted; | |
363 | ||
364 | unlock.engage(m_mutex); | |
365 | } | |
366 | ||
367 | dequeue_message(handler, state); | |
368 | ||
369 | return succeeded; | |
370 | } | |
371 | ||
372 | bool try_receive(receive_handler handler, void* state) | |
373 | { | |
374 | if (!lock_queue()) | |
375 | return false; | |
376 | ||
377 | boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(m_mutex); | |
378 | ||
379 | header* const hdr = get_header(); | |
380 | if (hdr->m_size == 0u) | |
381 | return false; | |
382 | ||
383 | dequeue_message(handler, state); | |
384 | ||
385 | return true; | |
386 | } | |
387 | ||
388 | void stop_local() | |
389 | { | |
b32b8144 | 390 | BOOST_VERIFY(boost::winapi::SetEvent(m_stop.get()) != 0); |
7c673cae FG |
391 | } |
392 | ||
393 | void reset_local() | |
394 | { | |
b32b8144 | 395 | BOOST_VERIFY(boost::winapi::ResetEvent(m_stop.get()) != 0); |
7c673cae FG |
396 | } |
397 | ||
398 | void clear() | |
399 | { | |
400 | m_mutex.lock(); | |
401 | boost::log::ipc::aux::interprocess_mutex::auto_unlock unlock(m_mutex); | |
402 | clear_queue(); | |
403 | } | |
404 | ||
405 | private: | |
406 | header* get_header() const BOOST_NOEXCEPT | |
407 | { | |
408 | return static_cast< header* >(m_shared_memory.address()); | |
409 | } | |
410 | ||
411 | static std::size_t estimate_region_size(uint32_t capacity, size_type block_size) BOOST_NOEXCEPT | |
412 | { | |
413 | return boost::alignment::align_up(sizeof(header), BOOST_LOG_CPU_CACHE_LINE_SIZE) + static_cast< std::size_t >(capacity) * static_cast< std::size_t >(block_size); | |
414 | } | |
415 | ||
416 | void create_stop_event() | |
417 | { | |
418 | #if BOOST_USE_WINAPI_VERSION >= BOOST_WINAPI_VERSION_WIN6 | |
b32b8144 | 419 | boost::winapi::HANDLE_ h = boost::winapi::CreateEventExW |
7c673cae FG |
420 | ( |
421 | NULL, // permissions | |
422 | NULL, // name | |
b32b8144 FG |
423 | boost::winapi::CREATE_EVENT_MANUAL_RESET_, |
424 | boost::winapi::SYNCHRONIZE_ | boost::winapi::EVENT_MODIFY_STATE_ | |
7c673cae FG |
425 | ); |
426 | #else | |
b32b8144 | 427 | boost::winapi::HANDLE_ h = boost::winapi::CreateEventW |
7c673cae FG |
428 | ( |
429 | NULL, // permissions | |
430 | true, // manual reset | |
431 | false, // initial state | |
432 | NULL // name | |
433 | ); | |
434 | #endif | |
435 | if (BOOST_UNLIKELY(h == NULL)) | |
436 | { | |
b32b8144 | 437 | boost::winapi::DWORD_ err = boost::winapi::GetLastError(); |
7c673cae FG |
438 | BOOST_LOG_THROW_DESCR_PARAMS(boost::log::system_error, "Failed to create an stop event object", (err)); |
439 | } | |
440 | ||
441 | m_stop.init(h); | |
442 | } | |
443 | ||
444 | void create_queue(std::wstring const& name, uint32_t capacity, size_type block_size, permissions const& perms) | |
445 | { | |
446 | // Initialize synchronization primitives before initializing the header as the openers will wait for it to be initialized | |
447 | header* const hdr = get_header(); | |
448 | m_mutex.create((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".mutex").c_str(), &hdr->m_mutex_state, perms); | |
449 | m_nonempty_queue.create((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonempty_queue_event").c_str(), false, perms); | |
450 | m_nonfull_queue.init((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonfull_queue_cond_var").c_str(), &hdr->m_nonfull_queue_state, perms); | |
451 | create_stop_event(); | |
452 | ||
453 | new (hdr) header(capacity, block_size); | |
454 | ||
455 | init_block_size(block_size); | |
456 | } | |
457 | ||
458 | void adopt_queue(std::wstring const& name, std::size_t shmem_size, permissions const& perms) | |
459 | { | |
460 | if (shmem_size < sizeof(header)) | |
461 | BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment size too small"); | |
462 | ||
463 | // Wait until the mapped region becomes initialized | |
464 | header* const hdr = get_header(); | |
465 | BOOST_CONSTEXPR_OR_CONST unsigned int wait_loops = 1000u, spin_loops = 16u, spins = 16u; | |
466 | for (unsigned int i = 0; i < wait_loops; ++i) | |
467 | { | |
468 | uint32_t initialized = hdr->m_initialized.load(boost::memory_order_acquire); | |
469 | if (initialized) | |
470 | { | |
471 | goto done; | |
472 | } | |
473 | ||
474 | if (i < spin_loops) | |
475 | { | |
476 | for (unsigned int j = 0; j < spins; ++j) | |
477 | { | |
478 | boost::log::aux::pause(); | |
479 | } | |
480 | } | |
481 | else | |
482 | { | |
b32b8144 | 483 | boost::winapi::SwitchToThread(); |
7c673cae FG |
484 | } |
485 | } | |
486 | ||
487 | BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: shared memory segment is not initialized by creator for too long"); | |
488 | ||
489 | done: | |
490 | // Check that the queue layout matches the current process ABI | |
491 | if (hdr->m_abi_tag != header::get_abi_tag()) | |
492 | BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue ABI is incompatible"); | |
493 | ||
494 | if (!boost::log::aux::is_power_of_2(hdr->m_block_size)) | |
495 | BOOST_LOG_THROW_DESCR(setup_error, "Boost.Log interprocess message queue cannot be opened: the queue block size is not a power of 2"); | |
496 | ||
497 | m_mutex.open((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".mutex").c_str(), &hdr->m_mutex_state); | |
498 | m_nonempty_queue.open((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonempty_queue_event").c_str()); | |
499 | m_nonfull_queue.init((name + BOOST_LOG_IPC_NAMES_AUX_SUFFIX L".nonfull_queue_cond_var").c_str(), &hdr->m_nonfull_queue_state, perms); | |
500 | create_stop_event(); | |
501 | ||
502 | init_block_size(hdr->m_block_size); | |
503 | } | |
504 | ||
505 | void init_block_size(size_type block_size) | |
506 | { | |
507 | m_block_size_mask = block_size - 1u; | |
508 | ||
509 | uint32_t block_size_log2 = 0u; | |
510 | if ((block_size & 0x0000ffff) == 0u) | |
511 | { | |
512 | block_size >>= 16u; | |
513 | block_size_log2 += 16u; | |
514 | } | |
515 | if ((block_size & 0x000000ff) == 0u) | |
516 | { | |
517 | block_size >>= 8u; | |
518 | block_size_log2 += 8u; | |
519 | } | |
520 | if ((block_size & 0x0000000f) == 0u) | |
521 | { | |
522 | block_size >>= 4u; | |
523 | block_size_log2 += 4u; | |
524 | } | |
525 | if ((block_size & 0x00000003) == 0u) | |
526 | { | |
527 | block_size >>= 2u; | |
528 | block_size_log2 += 2u; | |
529 | } | |
530 | if ((block_size & 0x00000001) == 0u) | |
531 | { | |
532 | ++block_size_log2; | |
533 | } | |
534 | m_block_size_log2 = block_size_log2; | |
535 | } | |
536 | ||
537 | bool lock_queue() | |
538 | { | |
539 | return m_mutex.lock(m_stop.get()); | |
540 | } | |
541 | ||
542 | void clear_queue() | |
543 | { | |
544 | header* const hdr = get_header(); | |
545 | hdr->m_size = 0u; | |
546 | hdr->m_put_pos = 0u; | |
547 | hdr->m_get_pos = 0u; | |
548 | m_nonfull_queue.notify_all(); | |
549 | } | |
550 | ||
551 | //! Returns the number of allocation blocks that are required to store user's payload of the specified size | |
552 | uint32_t estimate_block_count(size_type size) const BOOST_NOEXCEPT | |
553 | { | |
554 | // ceil((size + get_header_overhead()) / block_size) | |
555 | return static_cast< uint32_t >((size + block_header::get_header_overhead() + m_block_size_mask) >> m_block_size_log2); | |
556 | } | |
557 | ||
558 | //! Puts the message to the back of the queue | |
559 | void enqueue_message(void const* message_data, size_type message_size, uint32_t block_count) | |
560 | { | |
561 | header* const hdr = get_header(); | |
562 | ||
563 | const uint32_t capacity = hdr->m_capacity; | |
564 | const size_type block_size = hdr->m_block_size; | |
565 | uint32_t pos = hdr->m_put_pos; | |
566 | ||
567 | block_header* block = hdr->get_block(pos); | |
568 | block->m_size = message_size; | |
569 | ||
570 | size_type write_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size); | |
571 | std::memcpy(block->get_data(), message_data, write_size); | |
572 | ||
573 | pos += block_count; | |
574 | if (BOOST_UNLIKELY(pos >= capacity)) | |
575 | { | |
576 | // Write the rest of the message at the beginning of the queue | |
577 | pos -= capacity; | |
578 | message_data = static_cast< const unsigned char* >(message_data) + write_size; | |
579 | write_size = message_size - write_size; | |
580 | if (write_size > 0u) | |
581 | std::memcpy(hdr->get_block(0u), message_data, write_size); | |
582 | } | |
583 | ||
584 | hdr->m_put_pos = pos; | |
585 | ||
586 | const uint32_t old_queue_size = hdr->m_size; | |
587 | hdr->m_size = old_queue_size + block_count; | |
588 | if (old_queue_size == 0u) | |
589 | m_nonempty_queue.set(); | |
590 | } | |
591 | ||
592 | //! Retrieves the next message and invokes the handler to store the message contents | |
593 | void dequeue_message(receive_handler handler, void* state) | |
594 | { | |
595 | header* const hdr = get_header(); | |
596 | ||
597 | const uint32_t capacity = hdr->m_capacity; | |
598 | const size_type block_size = hdr->m_block_size; | |
599 | uint32_t pos = hdr->m_get_pos; | |
600 | ||
601 | block_header* block = hdr->get_block(pos); | |
602 | size_type message_size = block->m_size; | |
603 | uint32_t block_count = estimate_block_count(message_size); | |
604 | ||
605 | BOOST_ASSERT(block_count <= hdr->m_size); | |
606 | ||
607 | size_type read_size = (std::min)(static_cast< size_type >((capacity - pos) * block_size - block_header::get_header_overhead()), message_size); | |
608 | handler(state, block->get_data(), read_size); | |
609 | ||
610 | pos += block_count; | |
611 | if (BOOST_UNLIKELY(pos >= capacity)) | |
612 | { | |
613 | // Read the tail of the message | |
614 | pos -= capacity; | |
615 | read_size = message_size - read_size; | |
616 | if (read_size > 0u) | |
617 | handler(state, hdr->get_block(0u), read_size); | |
618 | } | |
619 | ||
620 | hdr->m_get_pos = pos; | |
621 | hdr->m_size -= block_count; | |
622 | ||
623 | m_nonfull_queue.notify_all(); | |
624 | } | |
625 | }; | |
626 | ||
627 | BOOST_LOG_API void reliable_message_queue::create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms) | |
628 | { | |
629 | BOOST_ASSERT(m_impl == NULL); | |
630 | if (!boost::log::aux::is_power_of_2(block_size)) | |
631 | BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2")); | |
632 | try | |
633 | { | |
634 | m_impl = new implementation(open_mode::create_only, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms); | |
635 | } | |
636 | catch (boost::exception& e) | |
637 | { | |
638 | e << boost::log::ipc::object_name_info(name); | |
639 | throw; | |
640 | } | |
641 | } | |
642 | ||
643 | BOOST_LOG_API void reliable_message_queue::open_or_create(object_name const& name, uint32_t capacity, size_type block_size, overflow_policy oflow_policy, permissions const& perms) | |
644 | { | |
645 | BOOST_ASSERT(m_impl == NULL); | |
646 | if (!boost::log::aux::is_power_of_2(block_size)) | |
647 | BOOST_THROW_EXCEPTION(std::invalid_argument("Interprocess message queue block size is not a power of 2")); | |
648 | try | |
649 | { | |
650 | m_impl = new implementation(open_mode::open_or_create, name, capacity, static_cast< size_type >(boost::alignment::align_up(block_size, BOOST_LOG_CPU_CACHE_LINE_SIZE)), oflow_policy, perms); | |
651 | } | |
652 | catch (boost::exception& e) | |
653 | { | |
654 | e << boost::log::ipc::object_name_info(name); | |
655 | throw; | |
656 | } | |
657 | } | |
658 | ||
659 | BOOST_LOG_API void reliable_message_queue::open(object_name const& name, overflow_policy oflow_policy, permissions const& perms) | |
660 | { | |
661 | BOOST_ASSERT(m_impl == NULL); | |
662 | try | |
663 | { | |
664 | m_impl = new implementation(open_mode::open_only, name, oflow_policy, perms); | |
665 | } | |
666 | catch (boost::exception& e) | |
667 | { | |
668 | e << boost::log::ipc::object_name_info(name); | |
669 | throw; | |
670 | } | |
671 | } | |
672 | ||
673 | BOOST_LOG_API void reliable_message_queue::clear() | |
674 | { | |
675 | BOOST_ASSERT(m_impl != NULL); | |
676 | try | |
677 | { | |
678 | m_impl->clear(); | |
679 | } | |
680 | catch (boost::exception& e) | |
681 | { | |
682 | e << boost::log::ipc::object_name_info(m_impl->name()); | |
683 | throw; | |
684 | } | |
685 | } | |
686 | ||
687 | BOOST_LOG_API object_name const& reliable_message_queue::name() const | |
688 | { | |
689 | BOOST_ASSERT(m_impl != NULL); | |
690 | return m_impl->name(); | |
691 | } | |
692 | ||
693 | BOOST_LOG_API uint32_t reliable_message_queue::capacity() const | |
694 | { | |
695 | BOOST_ASSERT(m_impl != NULL); | |
696 | return m_impl->capacity(); | |
697 | } | |
698 | ||
699 | BOOST_LOG_API reliable_message_queue::size_type reliable_message_queue::block_size() const | |
700 | { | |
701 | BOOST_ASSERT(m_impl != NULL); | |
702 | return m_impl->block_size(); | |
703 | } | |
704 | ||
705 | BOOST_LOG_API void reliable_message_queue::stop_local() | |
706 | { | |
707 | BOOST_ASSERT(m_impl != NULL); | |
708 | try | |
709 | { | |
710 | m_impl->stop_local(); | |
711 | } | |
712 | catch (boost::exception& e) | |
713 | { | |
714 | e << boost::log::ipc::object_name_info(m_impl->name()); | |
715 | throw; | |
716 | } | |
717 | } | |
718 | ||
719 | BOOST_LOG_API void reliable_message_queue::reset_local() | |
720 | { | |
721 | BOOST_ASSERT(m_impl != NULL); | |
722 | try | |
723 | { | |
724 | m_impl->reset_local(); | |
725 | } | |
726 | catch (boost::exception& e) | |
727 | { | |
728 | e << boost::log::ipc::object_name_info(m_impl->name()); | |
729 | throw; | |
730 | } | |
731 | } | |
732 | ||
733 | BOOST_LOG_API void reliable_message_queue::do_close() BOOST_NOEXCEPT | |
734 | { | |
735 | delete m_impl; | |
736 | m_impl = NULL; | |
737 | } | |
738 | ||
739 | BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::send(void const* message_data, size_type message_size) | |
740 | { | |
741 | BOOST_ASSERT(m_impl != NULL); | |
742 | try | |
743 | { | |
744 | return m_impl->send(message_data, message_size); | |
745 | } | |
746 | catch (boost::exception& e) | |
747 | { | |
748 | e << boost::log::ipc::object_name_info(m_impl->name()); | |
749 | throw; | |
750 | } | |
751 | } | |
752 | ||
753 | BOOST_LOG_API bool reliable_message_queue::try_send(void const* message_data, size_type message_size) | |
754 | { | |
755 | BOOST_ASSERT(m_impl != NULL); | |
756 | try | |
757 | { | |
758 | return m_impl->try_send(message_data, message_size); | |
759 | } | |
760 | catch (boost::exception& e) | |
761 | { | |
762 | e << boost::log::ipc::object_name_info(m_impl->name()); | |
763 | throw; | |
764 | } | |
765 | } | |
766 | ||
767 | BOOST_LOG_API reliable_message_queue::operation_result reliable_message_queue::do_receive(receive_handler handler, void* state) | |
768 | { | |
769 | BOOST_ASSERT(m_impl != NULL); | |
770 | try | |
771 | { | |
772 | return m_impl->receive(handler, state); | |
773 | } | |
774 | catch (boost::exception& e) | |
775 | { | |
776 | e << boost::log::ipc::object_name_info(m_impl->name()); | |
777 | throw; | |
778 | } | |
779 | } | |
780 | ||
781 | BOOST_LOG_API bool reliable_message_queue::do_try_receive(receive_handler handler, void* state) | |
782 | { | |
783 | BOOST_ASSERT(m_impl != NULL); | |
784 | try | |
785 | { | |
786 | return m_impl->try_receive(handler, state); | |
787 | } | |
788 | catch (boost::exception& e) | |
789 | { | |
790 | e << boost::log::ipc::object_name_info(m_impl->name()); | |
791 | throw; | |
792 | } | |
793 | } | |
794 | ||
795 | //! Fixed buffer receive handler | |
796 | BOOST_LOG_API void reliable_message_queue::fixed_buffer_receive_handler(void* state, const void* data, size_type size) | |
797 | { | |
798 | fixed_buffer_state* p = static_cast< fixed_buffer_state* >(state); | |
799 | if (BOOST_UNLIKELY(size > p->size)) | |
800 | BOOST_THROW_EXCEPTION(bad_alloc("Buffer too small to receive the message")); | |
801 | ||
802 | std::memcpy(p->data, data, size); | |
803 | p->data += size; | |
804 | p->size -= size; | |
805 | } | |
806 | ||
807 | BOOST_LOG_API void reliable_message_queue::remove(object_name const&) | |
808 | { | |
809 | // System objects are reference counted on Windows, nothing to do here | |
810 | } | |
811 | ||
812 | } // namespace ipc | |
813 | ||
814 | BOOST_LOG_CLOSE_NAMESPACE // namespace log | |
815 | ||
816 | } // namespace boost | |
817 | ||
818 | #include <boost/log/detail/footer.hpp> |