]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/opentelemetry-cpp/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / jaegertracing / opentelemetry-cpp / sdk / include / opentelemetry / sdk / logs / batch_log_processor.h
1 // Copyright The OpenTelemetry Authors
2 // SPDX-License-Identifier: Apache-2.0
3
4 #pragma once
5 #ifdef ENABLE_LOGS_PREVIEW
6
7 # include "opentelemetry/sdk/common/circular_buffer.h"
8 # include "opentelemetry/sdk/logs/exporter.h"
9 # include "opentelemetry/sdk/logs/processor.h"
10
11 # include <atomic>
12 # include <condition_variable>
13 # include <thread>
14
15 OPENTELEMETRY_BEGIN_NAMESPACE
16 namespace sdk
17 {
18
19 namespace logs
20 {
21
22 /**
23 * This is an implementation of the LogProcessor which creates batches of finished logs and passes
24 * the export-friendly log data representations to the configured LogExporter.
25 */
26 class BatchLogProcessor : public LogProcessor
27 {
28 public:
29 /**
30 * Creates a batch log processor by configuring the specified exporter and other parameters
31 * as per the official, language-agnostic opentelemetry specs.
32 *
33 * @param exporter - The backend exporter to pass the logs to
34 * @param max_queue_size - The maximum buffer/queue size. After the size is reached, logs are
35 * dropped.
36 * @param scheduled_delay_millis - The time interval between two consecutive exports.
37 * @param max_export_batch_size - The maximum batch size of every export. It must be smaller or
38 * equal to max_queue_size
39 */
40 explicit BatchLogProcessor(
41 std::unique_ptr<LogExporter> &&exporter,
42 const size_t max_queue_size = 2048,
43 const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000),
44 const size_t max_export_batch_size = 512);
45
46 /** Makes a new recordable **/
47 std::unique_ptr<Recordable> MakeRecordable() noexcept override;
48
49 /**
50 * Called when the Logger's log method creates a log record
51 * @param record the log record
52 */
53
54 void OnReceive(std::unique_ptr<Recordable> &&record) noexcept override;
55
56 /**
57 * Export all log records that have not been exported yet.
58 *
59 * NOTE: Timeout functionality not supported yet.
60 */
61 bool ForceFlush(
62 std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
63
64 /**
65 * Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of
66 * all its logs and passes them to the exporter. Any subsequent calls to
67 * ForceFlush or Shutdown will return immediately without doing anything.
68 *
69 * NOTE: Timeout functionality not supported yet.
70 */
71 bool Shutdown(
72 std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;
73
74 /**
75 * Class destructor which invokes the Shutdown() method.
76 */
77 virtual ~BatchLogProcessor() override;
78
79 private:
80 /**
81 * The background routine performed by the worker thread.
82 */
83 void DoBackgroundWork();
84
85 /**
86 * Exports all logs to the configured exporter.
87 *
88 * @param was_force_flush_called - A flag to check if the current export is the result
89 * of a call to ForceFlush method. If true, then we have to
90 * notify the main thread to wake it up in the ForceFlush
91 * method.
92 */
93 void Export(const bool was_for_flush_called);
94
95 /**
96 * Called when Shutdown() is invoked. Completely drains the queue of all log records and
97 * passes them to the exporter.
98 */
99 void DrainQueue();
100
101 /* The configured backend log exporter */
102 std::unique_ptr<LogExporter> exporter_;
103
104 /* Configurable parameters as per the official *trace* specs */
105 const size_t max_queue_size_;
106 const std::chrono::milliseconds scheduled_delay_millis_;
107 const size_t max_export_batch_size_;
108
109 /* Synchronization primitives */
110 std::condition_variable cv_, force_flush_cv_;
111 std::mutex cv_m_, force_flush_cv_m_, shutdown_m_;
112
113 /* The buffer/queue to which the ended logs are added */
114 common::CircularBuffer<Recordable> buffer_;
115
116 /* Important boolean flags to handle the workflow of the processor */
117 std::atomic<bool> is_shutdown_{false};
118 std::atomic<bool> is_force_flush_{false};
119 std::atomic<bool> is_force_flush_notified_{false};
120
121 /* The background worker thread */
122 std::thread worker_thread_;
123 };
124
125 } // namespace logs
126 } // namespace sdk
127 OPENTELEMETRY_END_NAMESPACE
128 #endif