1 // Copyright The OpenTelemetry Authors
2 // SPDX-License-Identifier: Apache-2.0
4 #include "opentelemetry/exporters/otlp/otlp_http_client.h"
12 #include "opentelemetry/ext/http/client/http_client_factory.h"
13 #include "opentelemetry/ext/http/common/url_parser.h"
15 #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h"
18 #include "google/protobuf/message.h"
19 #include "google/protobuf/reflection.h"
20 #include "google/protobuf/stubs/common.h"
21 #include "google/protobuf/stubs/stringpiece.h"
22 #include "nlohmann/json.hpp"
24 #if defined(GOOGLE_PROTOBUF_VERSION) && GOOGLE_PROTOBUF_VERSION >= 3007000
25 # include "google/protobuf/stubs/strutil.h"
27 # include "google/protobuf/stubs/port.h"
32 LIBPROTOBUF_EXPORT
void Base64Escape(StringPiece src
, std::string
*dest
);
33 } // namespace protobuf
37 #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h"
39 #include "opentelemetry/sdk/common/global_log_handler.h"
40 #include "opentelemetry/sdk_config.h"
42 #include <condition_variable>
53 namespace nostd
= opentelemetry::nostd
;
54 namespace http_client
= opentelemetry::ext::http::client
;
56 OPENTELEMETRY_BEGIN_NAMESPACE
66 * This class handles the response message from the Elasticsearch request
68 class ResponseHandler
: public http_client::EventHandler
72 * Creates a response handler, that by default doesn't display to console
74 ResponseHandler(bool console_debug
= false) : console_debug_
{console_debug
} {}
77 * Automatically called when the response is received, store the body into a string and notify any
78 * threads blocked on this result
80 void OnResponse(http_client::Response
&response
) noexcept override
82 // Lock the private members so they can't be read while being modified
84 std::unique_lock
<std::mutex
> lk(mutex_
);
86 // Store the body of the request
87 body_
= std::string(response
.GetBody().begin(), response
.GetBody().end());
92 ss
<< "[OTLP HTTP Client] Status:" << response
.GetStatusCode() << "Header:";
93 response
.ForEachHeader([&ss
](opentelemetry::nostd::string_view header_name
,
94 opentelemetry::nostd::string_view header_value
) {
95 ss
<< "\t" << header_name
.data() << " : " << header_value
.data() << ",";
98 ss
<< "Body:" << body_
;
99 OTEL_INTERNAL_LOG_DEBUG(ss
.str());
102 // Set the response_received_ flag to true and notify any threads waiting on this result
103 response_received_
= true;
104 stop_waiting_
= true;
110 * A method the user calls to block their thread until the response is received. The longest
111 * duration is the timeout of the request, set by SetTimeoutMs()
113 bool waitForResponse()
115 std::unique_lock
<std::mutex
> lk(mutex_
);
116 cv_
.wait(lk
, [this] { return stop_waiting_
; });
117 return response_received_
;
121 * Returns the body of the response
123 std::string
GetResponseBody()
125 // Lock so that body_ can't be written to while returning it
126 std::unique_lock
<std::mutex
> lk(mutex_
);
130 // Callback method when an http event occurs
131 void OnEvent(http_client::SessionState state
,
132 opentelemetry::nostd::string_view reason
) noexcept override
134 // need to modify stop_waiting_ under lock before calling notify_all
137 case http_client::SessionState::CreateFailed
:
138 case http_client::SessionState::ConnectFailed
:
139 case http_client::SessionState::SendFailed
:
140 case http_client::SessionState::SSLHandshakeFailed
:
141 case http_client::SessionState::TimedOut
:
142 case http_client::SessionState::NetworkError
:
143 case http_client::SessionState::Cancelled
: {
144 std::unique_lock
<std::mutex
> lk(mutex_
);
145 stop_waiting_
= true;
153 // If any failure event occurs, release the condition variable to unblock main thread
156 case http_client::SessionState::CreateFailed
:
157 OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: session create failed");
161 case http_client::SessionState::Created
:
164 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: session created");
168 case http_client::SessionState::Destroyed
:
171 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: session destroyed");
175 case http_client::SessionState::Connecting
:
178 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: connecting to peer");
182 case http_client::SessionState::ConnectFailed
:
183 OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: connection failed");
187 case http_client::SessionState::Connected
:
190 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: connected");
194 case http_client::SessionState::Sending
:
197 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: sending request");
201 case http_client::SessionState::SendFailed
:
202 OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: request send failed");
206 case http_client::SessionState::Response
:
209 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: response received");
213 case http_client::SessionState::SSLHandshakeFailed
:
214 OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: SSL handshake failed");
218 case http_client::SessionState::TimedOut
:
219 OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: request time out");
223 case http_client::SessionState::NetworkError
:
224 OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: network error");
228 case http_client::SessionState::ReadError
:
231 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: error reading response");
235 case http_client::SessionState::WriteError
:
238 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] DEBUG:Session state: error writing request");
242 case http_client::SessionState::Cancelled
:
243 OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: (manually) cancelled\n");
253 // Define a condition variable and mutex
254 std::condition_variable cv_
;
257 // Whether notify has been called
258 bool stop_waiting_
= false;
260 // Whether the response has been received
261 bool response_received_
= false;
263 // A string to store the response body
264 std::string body_
= "";
266 // Whether to print the results from the callback
267 bool console_debug_
= false;
270 static inline char HexEncode(unsigned char byte
)
272 #if defined(HAVE_GSL)
279 return byte
- 10 + 'a';
287 static std::string
HexEncode(const std::string
&bytes
)
290 ret
.reserve(bytes
.size() * 2);
291 for (std::string::size_type i
= 0; i
< bytes
.size(); ++i
)
293 unsigned char byte
= static_cast<unsigned char>(bytes
[i
]);
294 ret
.push_back(HexEncode(byte
>> 4));
295 ret
.push_back(HexEncode(byte
& 0x0f));
300 static std::string
BytesMapping(const std::string
&bytes
,
301 const google::protobuf::FieldDescriptor
*field_descriptor
,
302 JsonBytesMappingKind kind
)
306 case JsonBytesMappingKind::kHexId
: {
307 if (field_descriptor
->lowercase_name() == "trace_id" ||
308 field_descriptor
->lowercase_name() == "span_id" ||
309 field_descriptor
->lowercase_name() == "parent_span_id")
311 return HexEncode(bytes
);
315 std::string base64_value
;
316 google::protobuf::Base64Escape(bytes
, &base64_value
);
320 case JsonBytesMappingKind::kBase64
: {
321 // Base64 is the default bytes mapping of protobuf
322 std::string base64_value
;
323 google::protobuf::Base64Escape(bytes
, &base64_value
);
326 case JsonBytesMappingKind::kHex
:
327 return HexEncode(bytes
);
333 static void ConvertGenericFieldToJson(nlohmann::json
&value
,
334 const google::protobuf::Message
&message
,
335 const google::protobuf::FieldDescriptor
*field_descriptor
,
336 const OtlpHttpClientOptions
&options
);
338 static void ConvertListFieldToJson(nlohmann::json
&value
,
339 const google::protobuf::Message
&message
,
340 const google::protobuf::FieldDescriptor
*field_descriptor
,
341 const OtlpHttpClientOptions
&options
);
343 static void ConvertGenericMessageToJson(nlohmann::json
&value
,
344 const google::protobuf::Message
&message
,
345 const OtlpHttpClientOptions
&options
)
347 std::vector
<const google::protobuf::FieldDescriptor
*> fields_with_data
;
348 message
.GetReflection()->ListFields(message
, &fields_with_data
);
349 for (std::size_t i
= 0; i
< fields_with_data
.size(); ++i
)
351 const google::protobuf::FieldDescriptor
*field_descriptor
= fields_with_data
[i
];
352 nlohmann::json
&child_value
= options
.use_json_name
? value
[field_descriptor
->json_name()]
353 : value
[field_descriptor
->name()];
354 if (field_descriptor
->is_repeated())
356 ConvertListFieldToJson(child_value
, message
, field_descriptor
, options
);
360 ConvertGenericFieldToJson(child_value
, message
, field_descriptor
, options
);
365 bool SerializeToHttpBody(http_client::Body
&output
, const google::protobuf::Message
&message
)
367 auto body_size
= message
.ByteSizeLong();
370 output
.resize(body_size
);
371 return message
.SerializeWithCachedSizesToArray(
372 reinterpret_cast<google::protobuf::uint8
*>(&output
[0]));
377 void ConvertGenericFieldToJson(nlohmann::json
&value
,
378 const google::protobuf::Message
&message
,
379 const google::protobuf::FieldDescriptor
*field_descriptor
,
380 const OtlpHttpClientOptions
&options
)
382 switch (field_descriptor
->cpp_type())
384 case google::protobuf::FieldDescriptor::CPPTYPE_INT32
: {
385 value
= message
.GetReflection()->GetInt32(message
, field_descriptor
);
388 case google::protobuf::FieldDescriptor::CPPTYPE_INT64
: {
389 // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded as
390 // decimal strings, and either numbers or strings are accepted when decoding.
391 value
= std::to_string(message
.GetReflection()->GetInt64(message
, field_descriptor
));
394 case google::protobuf::FieldDescriptor::CPPTYPE_UINT32
: {
395 value
= message
.GetReflection()->GetUInt32(message
, field_descriptor
);
398 case google::protobuf::FieldDescriptor::CPPTYPE_UINT64
: {
399 // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded as
400 // decimal strings, and either numbers or strings are accepted when decoding.
401 value
= std::to_string(message
.GetReflection()->GetUInt64(message
, field_descriptor
));
404 case google::protobuf::FieldDescriptor::CPPTYPE_STRING
: {
406 if (field_descriptor
->type() == google::protobuf::FieldDescriptor::TYPE_BYTES
)
408 value
= BytesMapping(
409 message
.GetReflection()->GetStringReference(message
, field_descriptor
, &empty
),
410 field_descriptor
, options
.json_bytes_mapping
);
414 value
= message
.GetReflection()->GetStringReference(message
, field_descriptor
, &empty
);
418 case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE
: {
419 ConvertGenericMessageToJson(
420 value
, message
.GetReflection()->GetMessage(message
, field_descriptor
, nullptr), options
);
423 case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE
: {
424 value
= message
.GetReflection()->GetDouble(message
, field_descriptor
);
427 case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT
: {
428 value
= message
.GetReflection()->GetFloat(message
, field_descriptor
);
431 case google::protobuf::FieldDescriptor::CPPTYPE_BOOL
: {
432 value
= message
.GetReflection()->GetBool(message
, field_descriptor
);
435 case google::protobuf::FieldDescriptor::CPPTYPE_ENUM
: {
436 value
= message
.GetReflection()->GetEnumValue(message
, field_descriptor
);
445 void ConvertListFieldToJson(nlohmann::json
&value
,
446 const google::protobuf::Message
&message
,
447 const google::protobuf::FieldDescriptor
*field_descriptor
,
448 const OtlpHttpClientOptions
&options
)
450 auto field_size
= message
.GetReflection()->FieldSize(message
, field_descriptor
);
452 switch (field_descriptor
->cpp_type())
454 case google::protobuf::FieldDescriptor::CPPTYPE_INT32
: {
455 for (int i
= 0; i
< field_size
; ++i
)
457 value
.push_back(message
.GetReflection()->GetRepeatedInt32(message
, field_descriptor
, i
));
462 case google::protobuf::FieldDescriptor::CPPTYPE_INT64
: {
463 for (int i
= 0; i
< field_size
; ++i
)
465 // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded
466 // as decimal strings, and either numbers or strings are accepted when decoding.
467 value
.push_back(std::to_string(
468 message
.GetReflection()->GetRepeatedInt64(message
, field_descriptor
, i
)));
473 case google::protobuf::FieldDescriptor::CPPTYPE_UINT32
: {
474 for (int i
= 0; i
< field_size
; ++i
)
476 value
.push_back(message
.GetReflection()->GetRepeatedUInt32(message
, field_descriptor
, i
));
481 case google::protobuf::FieldDescriptor::CPPTYPE_UINT64
: {
482 for (int i
= 0; i
< field_size
; ++i
)
484 // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded
485 // as decimal strings, and either numbers or strings are accepted when decoding.
486 value
.push_back(std::to_string(
487 message
.GetReflection()->GetRepeatedUInt64(message
, field_descriptor
, i
)));
492 case google::protobuf::FieldDescriptor::CPPTYPE_STRING
: {
494 if (field_descriptor
->type() == google::protobuf::FieldDescriptor::TYPE_BYTES
)
496 for (int i
= 0; i
< field_size
; ++i
)
498 value
.push_back(BytesMapping(message
.GetReflection()->GetRepeatedStringReference(
499 message
, field_descriptor
, i
, &empty
),
500 field_descriptor
, options
.json_bytes_mapping
));
505 for (int i
= 0; i
< field_size
; ++i
)
507 value
.push_back(message
.GetReflection()->GetRepeatedStringReference(
508 message
, field_descriptor
, i
, &empty
));
513 case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE
: {
514 for (int i
= 0; i
< field_size
; ++i
)
516 nlohmann::json sub_value
;
517 ConvertGenericMessageToJson(
518 sub_value
, message
.GetReflection()->GetRepeatedMessage(message
, field_descriptor
, i
),
520 value
.push_back(std::move(sub_value
));
525 case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE
: {
526 for (int i
= 0; i
< field_size
; ++i
)
528 value
.push_back(message
.GetReflection()->GetRepeatedDouble(message
, field_descriptor
, i
));
533 case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT
: {
534 for (int i
= 0; i
< field_size
; ++i
)
536 value
.push_back(message
.GetReflection()->GetRepeatedFloat(message
, field_descriptor
, i
));
541 case google::protobuf::FieldDescriptor::CPPTYPE_BOOL
: {
542 for (int i
= 0; i
< field_size
; ++i
)
544 value
.push_back(message
.GetReflection()->GetRepeatedBool(message
, field_descriptor
, i
));
549 case google::protobuf::FieldDescriptor::CPPTYPE_ENUM
: {
550 for (int i
= 0; i
< field_size
; ++i
)
553 message
.GetReflection()->GetRepeatedEnumValue(message
, field_descriptor
, i
));
565 OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions
&&options
)
566 : options_(options
), http_client_(http_client::HttpClientFactory::Create())
569 OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions
&&options
,
570 std::shared_ptr
<ext::http::client::HttpClient
> http_client
)
571 : options_(options
), http_client_(http_client
)
574 // ----------------------------- HTTP Client methods ------------------------------
575 opentelemetry::sdk::common::ExportResult
OtlpHttpClient::Export(
576 const google::protobuf::Message
&message
) noexcept
578 // Return failure if this exporter has been shutdown
581 const char *error_message
= "[OTLP HTTP Client] Export failed, exporter is shutdown";
582 if (options_
.console_debug
)
584 std::cerr
<< error_message
<< std::endl
;
586 OTEL_INTERNAL_LOG_ERROR(error_message
);
588 return opentelemetry::sdk::common::ExportResult::kFailure
;
591 // Parse uri and store it to cache
592 if (http_uri_
.empty())
594 auto parse_url
= opentelemetry::ext::http::common::UrlParser(std::string(options_
.url
));
595 if (!parse_url
.success_
)
597 std::string error_message
= "[OTLP HTTP Client] Export failed, invalid url: " + options_
.url
;
598 if (options_
.console_debug
)
600 std::cerr
<< error_message
<< std::endl
;
602 OTEL_INTERNAL_LOG_ERROR(error_message
.c_str());
604 return opentelemetry::sdk::common::ExportResult::kFailure
;
607 if (!parse_url
.path_
.empty() && parse_url
.path_
[0] == '/')
609 http_uri_
= parse_url
.path_
.substr(1);
613 http_uri_
= parse_url
.path_
;
617 http_client::Body body_vec
;
618 std::string content_type
;
619 if (options_
.content_type
== HttpRequestContentType::kBinary
)
621 if (SerializeToHttpBody(body_vec
, message
))
623 if (options_
.console_debug
)
625 OTEL_INTERNAL_LOG_DEBUG(
626 "[OTLP HTTP Client] Request body(Binary): " << message
.Utf8DebugString());
631 if (options_
.console_debug
)
633 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Serialize body failed(Binary):"
634 << message
.InitializationErrorString());
636 return opentelemetry::sdk::common::ExportResult::kFailure
;
638 content_type
= kHttpBinaryContentType
;
642 nlohmann::json json_request
;
644 // Convert from proto into json object
645 ConvertGenericMessageToJson(json_request
, message
, options_
);
647 std::string post_body_json
=
648 json_request
.dump(-1, ' ', false, nlohmann::detail::error_handler_t::replace
);
649 if (options_
.console_debug
)
651 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Request body(Json)" << post_body_json
);
653 body_vec
.assign(post_body_json
.begin(), post_body_json
.end());
654 content_type
= kHttpJsonContentType
;
658 auto session
= http_client_
->CreateSession(options_
.url
);
659 auto request
= session
->CreateRequest();
661 for (auto &header
: options_
.http_headers
)
663 request
->AddHeader(header
.first
, header
.second
);
665 request
->SetUri(http_uri_
);
666 request
->SetTimeoutMs(std::chrono::duration_cast
<std::chrono::milliseconds
>(options_
.timeout
));
667 request
->SetMethod(http_client::Method::Post
);
668 request
->SetBody(body_vec
);
669 request
->ReplaceHeader("Content-Type", content_type
);
672 std::unique_ptr
<ResponseHandler
> handler(new ResponseHandler(options_
.console_debug
));
673 session
->SendRequest(*handler
);
675 // Wait for the response to be received
676 if (options_
.console_debug
)
678 OTEL_INTERNAL_LOG_DEBUG(
679 "[OTLP HTTP Client] DEBUG: Waiting for response from "
680 << options_
.url
<< " (timeout = "
681 << std::chrono::duration_cast
<std::chrono::milliseconds
>(options_
.timeout
).count()
682 << " milliseconds)");
684 bool write_successful
= handler
->waitForResponse();
687 session
->FinishSession();
689 // If an error occurred with the HTTP request
690 if (!write_successful
)
693 return opentelemetry::sdk::common::ExportResult::kFailure
;
696 return opentelemetry::sdk::common::ExportResult::kSuccess
;
699 bool OtlpHttpClient::Shutdown(std::chrono::microseconds
) noexcept
702 const std::lock_guard
<opentelemetry::common::SpinLockMutex
> locked(lock_
);
706 // Shutdown the session manager
707 http_client_
->CancelAllSessions();
708 http_client_
->FinishAllSessions();
713 bool OtlpHttpClient::isShutdown() const noexcept
715 const std::lock_guard
<opentelemetry::common::SpinLockMutex
> locked(lock_
);
720 } // namespace exporter
721 OPENTELEMETRY_END_NAMESPACE