1 // Copyright The OpenTelemetry Authors
2 // SPDX-License-Identifier: Apache-2.0
5 #ifdef ENABLE_LOGS_PREVIEW
7 # include "opentelemetry/sdk/common/circular_buffer.h"
8 # include "opentelemetry/sdk/logs/exporter.h"
9 # include "opentelemetry/sdk/logs/processor.h"
12 # include <condition_variable>
15 OPENTELEMETRY_BEGIN_NAMESPACE
23 * This is an implementation of the LogProcessor which creates batches of finished logs and passes
24 * the export-friendly log data representations to the configured LogExporter.
26 class BatchLogProcessor
: public LogProcessor
30 * Creates a batch log processor by configuring the specified exporter and other parameters
31 * as per the official, language-agnostic opentelemetry specs.
33 * @param exporter - The backend exporter to pass the logs to
34 * @param max_queue_size - The maximum buffer/queue size. After the size is reached, logs are
36 * @param scheduled_delay_millis - The time interval between two consecutive exports.
37 * @param max_export_batch_size - The maximum batch size of every export. It must be smaller or
38 * equal to max_queue_size
40 explicit BatchLogProcessor(
41 std::unique_ptr
<LogExporter
> &&exporter
,
42 const size_t max_queue_size
= 2048,
43 const std::chrono::milliseconds scheduled_delay_millis
= std::chrono::milliseconds(5000),
44 const size_t max_export_batch_size
= 512);
46 /** Makes a new recordable **/
47 std::unique_ptr
<Recordable
> MakeRecordable() noexcept override
;
50 * Called when the Logger's log method creates a log record
51 * @param record the log record
54 void OnReceive(std::unique_ptr
<Recordable
> &&record
) noexcept override
;
57 * Export all log records that have not been exported yet.
59 * NOTE: Timeout functionality not supported yet.
62 std::chrono::microseconds timeout
= std::chrono::microseconds::max()) noexcept override
;
65 * Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of
66 * all its logs and passes them to the exporter. Any subsequent calls to
67 * ForceFlush or Shutdown will return immediately without doing anything.
69 * NOTE: Timeout functionality not supported yet.
72 std::chrono::microseconds timeout
= std::chrono::microseconds::max()) noexcept override
;
75 * Class destructor which invokes the Shutdown() method.
77 virtual ~BatchLogProcessor() override
;
81 * The background routine performed by the worker thread.
83 void DoBackgroundWork();
86 * Exports all logs to the configured exporter.
88 * @param was_force_flush_called - A flag to check if the current export is the result
89 * of a call to ForceFlush method. If true, then we have to
90 * notify the main thread to wake it up in the ForceFlush
93 void Export(const bool was_for_flush_called
);
96 * Called when Shutdown() is invoked. Completely drains the queue of all log records and
97 * passes them to the exporter.
101 /* The configured backend log exporter */
102 std::unique_ptr
<LogExporter
> exporter_
;
104 /* Configurable parameters as per the official *trace* specs */
105 const size_t max_queue_size_
;
106 const std::chrono::milliseconds scheduled_delay_millis_
;
107 const size_t max_export_batch_size_
;
109 /* Synchronization primitives */
110 std::condition_variable cv_
, force_flush_cv_
;
111 std::mutex cv_m_
, force_flush_cv_m_
, shutdown_m_
;
113 /* The buffer/queue to which the ended logs are added */
114 common::CircularBuffer
<Recordable
> buffer_
;
116 /* Important boolean flags to handle the workflow of the processor */
117 std::atomic
<bool> is_shutdown_
{false};
118 std::atomic
<bool> is_force_flush_
{false};
119 std::atomic
<bool> is_force_flush_notified_
{false};
121 /* The background worker thread */
122 std::thread worker_thread_
;
127 OPENTELEMETRY_END_NAMESPACE