]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/opentelemetry-cpp/exporters/otlp/src/otlp_http_client.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / jaegertracing / opentelemetry-cpp / exporters / otlp / src / otlp_http_client.cc
1 // Copyright The OpenTelemetry Authors
2 // SPDX-License-Identifier: Apache-2.0
3
4 #include "opentelemetry/exporters/otlp/otlp_http_client.h"
5
6 #if defined(HAVE_GSL)
7 # include <gsl/gsl>
8 #else
9 # include <assert.h>
10 #endif
11
12 #include "opentelemetry/ext/http/client/http_client_factory.h"
13 #include "opentelemetry/ext/http/common/url_parser.h"
14
15 #include "opentelemetry/exporters/otlp/protobuf_include_prefix.h"
16
17 #include <mutex>
18 #include "google/protobuf/message.h"
19 #include "google/protobuf/reflection.h"
20 #include "google/protobuf/stubs/common.h"
21 #include "google/protobuf/stubs/stringpiece.h"
22 #include "nlohmann/json.hpp"
23
24 #if defined(GOOGLE_PROTOBUF_VERSION) && GOOGLE_PROTOBUF_VERSION >= 3007000
25 # include "google/protobuf/stubs/strutil.h"
26 #else
27 # include "google/protobuf/stubs/port.h"
28 namespace google
29 {
30 namespace protobuf
31 {
32 LIBPROTOBUF_EXPORT void Base64Escape(StringPiece src, std::string *dest);
33 } // namespace protobuf
34 } // namespace google
35 #endif
36
37 #include "opentelemetry/exporters/otlp/protobuf_include_suffix.h"
38
39 #include "opentelemetry/sdk/common/global_log_handler.h"
40 #include "opentelemetry/sdk_config.h"
41
42 #include <condition_variable>
43 #include <fstream>
44 #include <mutex>
45 #include <sstream>
46 #include <string>
47 #include <vector>
48
49 #ifdef GetMessage
50 # undef GetMessage
51 #endif
52
53 namespace nostd = opentelemetry::nostd;
54 namespace http_client = opentelemetry::ext::http::client;
55
56 OPENTELEMETRY_BEGIN_NAMESPACE
57 namespace exporter
58 {
59 namespace otlp
60 {
61
62 namespace
63 {
64
65 /**
66 * This class handles the response message from the Elasticsearch request
67 */
68 class ResponseHandler : public http_client::EventHandler
69 {
70 public:
71 /**
72 * Creates a response handler, that by default doesn't display to console
73 */
74 ResponseHandler(bool console_debug = false) : console_debug_{console_debug} {}
75
76 /**
77 * Automatically called when the response is received, store the body into a string and notify any
78 * threads blocked on this result
79 */
80 void OnResponse(http_client::Response &response) noexcept override
81 {
82 // Lock the private members so they can't be read while being modified
83 {
84 std::unique_lock<std::mutex> lk(mutex_);
85
86 // Store the body of the request
87 body_ = std::string(response.GetBody().begin(), response.GetBody().end());
88
89 if (console_debug_)
90 {
91 std::stringstream ss;
92 ss << "[OTLP HTTP Client] Status:" << response.GetStatusCode() << "Header:";
93 response.ForEachHeader([&ss](opentelemetry::nostd::string_view header_name,
94 opentelemetry::nostd::string_view header_value) {
95 ss << "\t" << header_name.data() << " : " << header_value.data() << ",";
96 return true;
97 });
98 ss << "Body:" << body_;
99 OTEL_INTERNAL_LOG_DEBUG(ss.str());
100 }
101
102 // Set the response_received_ flag to true and notify any threads waiting on this result
103 response_received_ = true;
104 stop_waiting_ = true;
105 }
106 cv_.notify_all();
107 }
108
109 /**resource
110 * A method the user calls to block their thread until the response is received. The longest
111 * duration is the timeout of the request, set by SetTimeoutMs()
112 */
113 bool waitForResponse()
114 {
115 std::unique_lock<std::mutex> lk(mutex_);
116 cv_.wait(lk, [this] { return stop_waiting_; });
117 return response_received_;
118 }
119
120 /**
121 * Returns the body of the response
122 */
123 std::string GetResponseBody()
124 {
125 // Lock so that body_ can't be written to while returning it
126 std::unique_lock<std::mutex> lk(mutex_);
127 return body_;
128 }
129
130 // Callback method when an http event occurs
131 void OnEvent(http_client::SessionState state,
132 opentelemetry::nostd::string_view reason) noexcept override
133 {
134 // need to modify stop_waiting_ under lock before calling notify_all
135 switch (state)
136 {
137 case http_client::SessionState::CreateFailed:
138 case http_client::SessionState::ConnectFailed:
139 case http_client::SessionState::SendFailed:
140 case http_client::SessionState::SSLHandshakeFailed:
141 case http_client::SessionState::TimedOut:
142 case http_client::SessionState::NetworkError:
143 case http_client::SessionState::Cancelled: {
144 std::unique_lock<std::mutex> lk(mutex_);
145 stop_waiting_ = true;
146 }
147 break;
148
149 default:
150 break;
151 }
152
153 // If any failure event occurs, release the condition variable to unblock main thread
154 switch (state)
155 {
156 case http_client::SessionState::CreateFailed:
157 OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: session create failed");
158 cv_.notify_all();
159 break;
160
161 case http_client::SessionState::Created:
162 if (console_debug_)
163 {
164 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: session created");
165 }
166 break;
167
168 case http_client::SessionState::Destroyed:
169 if (console_debug_)
170 {
171 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: session destroyed");
172 }
173 break;
174
175 case http_client::SessionState::Connecting:
176 if (console_debug_)
177 {
178 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: connecting to peer");
179 }
180 break;
181
182 case http_client::SessionState::ConnectFailed:
183 OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: connection failed");
184 cv_.notify_all();
185 break;
186
187 case http_client::SessionState::Connected:
188 if (console_debug_)
189 {
190 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: connected");
191 }
192 break;
193
194 case http_client::SessionState::Sending:
195 if (console_debug_)
196 {
197 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: sending request");
198 }
199 break;
200
201 case http_client::SessionState::SendFailed:
202 OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: request send failed");
203 cv_.notify_all();
204 break;
205
206 case http_client::SessionState::Response:
207 if (console_debug_)
208 {
209 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: response received");
210 }
211 break;
212
213 case http_client::SessionState::SSLHandshakeFailed:
214 OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: SSL handshake failed");
215 cv_.notify_all();
216 break;
217
218 case http_client::SessionState::TimedOut:
219 OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: request time out");
220 cv_.notify_all();
221 break;
222
223 case http_client::SessionState::NetworkError:
224 OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: network error");
225 cv_.notify_all();
226 break;
227
228 case http_client::SessionState::ReadError:
229 if (console_debug_)
230 {
231 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Session state: error reading response");
232 }
233 break;
234
235 case http_client::SessionState::WriteError:
236 if (console_debug_)
237 {
238 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] DEBUG:Session state: error writing request");
239 }
240 break;
241
242 case http_client::SessionState::Cancelled:
243 OTEL_INTERNAL_LOG_ERROR("[OTLP HTTP Client] Session state: (manually) cancelled\n");
244 cv_.notify_all();
245 break;
246
247 default:
248 break;
249 }
250 }
251
252 private:
253 // Define a condition variable and mutex
254 std::condition_variable cv_;
255 std::mutex mutex_;
256
257 // Whether notify has been called
258 bool stop_waiting_ = false;
259
260 // Whether the response has been received
261 bool response_received_ = false;
262
263 // A string to store the response body
264 std::string body_ = "";
265
266 // Whether to print the results from the callback
267 bool console_debug_ = false;
268 };
269
270 static inline char HexEncode(unsigned char byte)
271 {
272 #if defined(HAVE_GSL)
273 Expects(byte <= 16);
274 #else
275 assert(byte <= 16);
276 #endif
277 if (byte >= 10)
278 {
279 return byte - 10 + 'a';
280 }
281 else
282 {
283 return byte + '0';
284 }
285 }
286
287 static std::string HexEncode(const std::string &bytes)
288 {
289 std::string ret;
290 ret.reserve(bytes.size() * 2);
291 for (std::string::size_type i = 0; i < bytes.size(); ++i)
292 {
293 unsigned char byte = static_cast<unsigned char>(bytes[i]);
294 ret.push_back(HexEncode(byte >> 4));
295 ret.push_back(HexEncode(byte & 0x0f));
296 }
297 return ret;
298 }
299
300 static std::string BytesMapping(const std::string &bytes,
301 const google::protobuf::FieldDescriptor *field_descriptor,
302 JsonBytesMappingKind kind)
303 {
304 switch (kind)
305 {
306 case JsonBytesMappingKind::kHexId: {
307 if (field_descriptor->lowercase_name() == "trace_id" ||
308 field_descriptor->lowercase_name() == "span_id" ||
309 field_descriptor->lowercase_name() == "parent_span_id")
310 {
311 return HexEncode(bytes);
312 }
313 else
314 {
315 std::string base64_value;
316 google::protobuf::Base64Escape(bytes, &base64_value);
317 return base64_value;
318 }
319 }
320 case JsonBytesMappingKind::kBase64: {
321 // Base64 is the default bytes mapping of protobuf
322 std::string base64_value;
323 google::protobuf::Base64Escape(bytes, &base64_value);
324 return base64_value;
325 }
326 case JsonBytesMappingKind::kHex:
327 return HexEncode(bytes);
328 default:
329 return bytes;
330 }
331 }
332
333 static void ConvertGenericFieldToJson(nlohmann::json &value,
334 const google::protobuf::Message &message,
335 const google::protobuf::FieldDescriptor *field_descriptor,
336 const OtlpHttpClientOptions &options);
337
338 static void ConvertListFieldToJson(nlohmann::json &value,
339 const google::protobuf::Message &message,
340 const google::protobuf::FieldDescriptor *field_descriptor,
341 const OtlpHttpClientOptions &options);
342
343 static void ConvertGenericMessageToJson(nlohmann::json &value,
344 const google::protobuf::Message &message,
345 const OtlpHttpClientOptions &options)
346 {
347 std::vector<const google::protobuf::FieldDescriptor *> fields_with_data;
348 message.GetReflection()->ListFields(message, &fields_with_data);
349 for (std::size_t i = 0; i < fields_with_data.size(); ++i)
350 {
351 const google::protobuf::FieldDescriptor *field_descriptor = fields_with_data[i];
352 nlohmann::json &child_value = options.use_json_name ? value[field_descriptor->json_name()]
353 : value[field_descriptor->name()];
354 if (field_descriptor->is_repeated())
355 {
356 ConvertListFieldToJson(child_value, message, field_descriptor, options);
357 }
358 else
359 {
360 ConvertGenericFieldToJson(child_value, message, field_descriptor, options);
361 }
362 }
363 }
364
365 bool SerializeToHttpBody(http_client::Body &output, const google::protobuf::Message &message)
366 {
367 auto body_size = message.ByteSizeLong();
368 if (body_size > 0)
369 {
370 output.resize(body_size);
371 return message.SerializeWithCachedSizesToArray(
372 reinterpret_cast<google::protobuf::uint8 *>(&output[0]));
373 }
374 return true;
375 }
376
377 void ConvertGenericFieldToJson(nlohmann::json &value,
378 const google::protobuf::Message &message,
379 const google::protobuf::FieldDescriptor *field_descriptor,
380 const OtlpHttpClientOptions &options)
381 {
382 switch (field_descriptor->cpp_type())
383 {
384 case google::protobuf::FieldDescriptor::CPPTYPE_INT32: {
385 value = message.GetReflection()->GetInt32(message, field_descriptor);
386 break;
387 }
388 case google::protobuf::FieldDescriptor::CPPTYPE_INT64: {
389 // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded as
390 // decimal strings, and either numbers or strings are accepted when decoding.
391 value = std::to_string(message.GetReflection()->GetInt64(message, field_descriptor));
392 break;
393 }
394 case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: {
395 value = message.GetReflection()->GetUInt32(message, field_descriptor);
396 break;
397 }
398 case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: {
399 // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded as
400 // decimal strings, and either numbers or strings are accepted when decoding.
401 value = std::to_string(message.GetReflection()->GetUInt64(message, field_descriptor));
402 break;
403 }
404 case google::protobuf::FieldDescriptor::CPPTYPE_STRING: {
405 std::string empty;
406 if (field_descriptor->type() == google::protobuf::FieldDescriptor::TYPE_BYTES)
407 {
408 value = BytesMapping(
409 message.GetReflection()->GetStringReference(message, field_descriptor, &empty),
410 field_descriptor, options.json_bytes_mapping);
411 }
412 else
413 {
414 value = message.GetReflection()->GetStringReference(message, field_descriptor, &empty);
415 }
416 break;
417 }
418 case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: {
419 ConvertGenericMessageToJson(
420 value, message.GetReflection()->GetMessage(message, field_descriptor, nullptr), options);
421 break;
422 }
423 case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: {
424 value = message.GetReflection()->GetDouble(message, field_descriptor);
425 break;
426 }
427 case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: {
428 value = message.GetReflection()->GetFloat(message, field_descriptor);
429 break;
430 }
431 case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: {
432 value = message.GetReflection()->GetBool(message, field_descriptor);
433 break;
434 }
435 case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: {
436 value = message.GetReflection()->GetEnumValue(message, field_descriptor);
437 break;
438 }
439 default: {
440 break;
441 }
442 }
443 }
444
445 void ConvertListFieldToJson(nlohmann::json &value,
446 const google::protobuf::Message &message,
447 const google::protobuf::FieldDescriptor *field_descriptor,
448 const OtlpHttpClientOptions &options)
449 {
450 auto field_size = message.GetReflection()->FieldSize(message, field_descriptor);
451
452 switch (field_descriptor->cpp_type())
453 {
454 case google::protobuf::FieldDescriptor::CPPTYPE_INT32: {
455 for (int i = 0; i < field_size; ++i)
456 {
457 value.push_back(message.GetReflection()->GetRepeatedInt32(message, field_descriptor, i));
458 }
459
460 break;
461 }
462 case google::protobuf::FieldDescriptor::CPPTYPE_INT64: {
463 for (int i = 0; i < field_size; ++i)
464 {
465 // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded
466 // as decimal strings, and either numbers or strings are accepted when decoding.
467 value.push_back(std::to_string(
468 message.GetReflection()->GetRepeatedInt64(message, field_descriptor, i)));
469 }
470
471 break;
472 }
473 case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: {
474 for (int i = 0; i < field_size; ++i)
475 {
476 value.push_back(message.GetReflection()->GetRepeatedUInt32(message, field_descriptor, i));
477 }
478
479 break;
480 }
481 case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: {
482 for (int i = 0; i < field_size; ++i)
483 {
484 // According to Protobuf specs 64-bit integer numbers in JSON-encoded payloads are encoded
485 // as decimal strings, and either numbers or strings are accepted when decoding.
486 value.push_back(std::to_string(
487 message.GetReflection()->GetRepeatedUInt64(message, field_descriptor, i)));
488 }
489
490 break;
491 }
492 case google::protobuf::FieldDescriptor::CPPTYPE_STRING: {
493 std::string empty;
494 if (field_descriptor->type() == google::protobuf::FieldDescriptor::TYPE_BYTES)
495 {
496 for (int i = 0; i < field_size; ++i)
497 {
498 value.push_back(BytesMapping(message.GetReflection()->GetRepeatedStringReference(
499 message, field_descriptor, i, &empty),
500 field_descriptor, options.json_bytes_mapping));
501 }
502 }
503 else
504 {
505 for (int i = 0; i < field_size; ++i)
506 {
507 value.push_back(message.GetReflection()->GetRepeatedStringReference(
508 message, field_descriptor, i, &empty));
509 }
510 }
511 break;
512 }
513 case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: {
514 for (int i = 0; i < field_size; ++i)
515 {
516 nlohmann::json sub_value;
517 ConvertGenericMessageToJson(
518 sub_value, message.GetReflection()->GetRepeatedMessage(message, field_descriptor, i),
519 options);
520 value.push_back(std::move(sub_value));
521 }
522
523 break;
524 }
525 case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: {
526 for (int i = 0; i < field_size; ++i)
527 {
528 value.push_back(message.GetReflection()->GetRepeatedDouble(message, field_descriptor, i));
529 }
530
531 break;
532 }
533 case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: {
534 for (int i = 0; i < field_size; ++i)
535 {
536 value.push_back(message.GetReflection()->GetRepeatedFloat(message, field_descriptor, i));
537 }
538
539 break;
540 }
541 case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: {
542 for (int i = 0; i < field_size; ++i)
543 {
544 value.push_back(message.GetReflection()->GetRepeatedBool(message, field_descriptor, i));
545 }
546
547 break;
548 }
549 case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: {
550 for (int i = 0; i < field_size; ++i)
551 {
552 value.push_back(
553 message.GetReflection()->GetRepeatedEnumValue(message, field_descriptor, i));
554 }
555 break;
556 }
557 default: {
558 break;
559 }
560 }
561 }
562
563 } // namespace
564
565 OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options)
566 : options_(options), http_client_(http_client::HttpClientFactory::Create())
567 {}
568
569 OtlpHttpClient::OtlpHttpClient(OtlpHttpClientOptions &&options,
570 std::shared_ptr<ext::http::client::HttpClient> http_client)
571 : options_(options), http_client_(http_client)
572 {}
573
574 // ----------------------------- HTTP Client methods ------------------------------
575 opentelemetry::sdk::common::ExportResult OtlpHttpClient::Export(
576 const google::protobuf::Message &message) noexcept
577 {
578 // Return failure if this exporter has been shutdown
579 if (isShutdown())
580 {
581 const char *error_message = "[OTLP HTTP Client] Export failed, exporter is shutdown";
582 if (options_.console_debug)
583 {
584 std::cerr << error_message << std::endl;
585 }
586 OTEL_INTERNAL_LOG_ERROR(error_message);
587
588 return opentelemetry::sdk::common::ExportResult::kFailure;
589 }
590
591 // Parse uri and store it to cache
592 if (http_uri_.empty())
593 {
594 auto parse_url = opentelemetry::ext::http::common::UrlParser(std::string(options_.url));
595 if (!parse_url.success_)
596 {
597 std::string error_message = "[OTLP HTTP Client] Export failed, invalid url: " + options_.url;
598 if (options_.console_debug)
599 {
600 std::cerr << error_message << std::endl;
601 }
602 OTEL_INTERNAL_LOG_ERROR(error_message.c_str());
603
604 return opentelemetry::sdk::common::ExportResult::kFailure;
605 }
606
607 if (!parse_url.path_.empty() && parse_url.path_[0] == '/')
608 {
609 http_uri_ = parse_url.path_.substr(1);
610 }
611 else
612 {
613 http_uri_ = parse_url.path_;
614 }
615 }
616
617 http_client::Body body_vec;
618 std::string content_type;
619 if (options_.content_type == HttpRequestContentType::kBinary)
620 {
621 if (SerializeToHttpBody(body_vec, message))
622 {
623 if (options_.console_debug)
624 {
625 OTEL_INTERNAL_LOG_DEBUG(
626 "[OTLP HTTP Client] Request body(Binary): " << message.Utf8DebugString());
627 }
628 }
629 else
630 {
631 if (options_.console_debug)
632 {
633 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Serialize body failed(Binary):"
634 << message.InitializationErrorString());
635 }
636 return opentelemetry::sdk::common::ExportResult::kFailure;
637 }
638 content_type = kHttpBinaryContentType;
639 }
640 else
641 {
642 nlohmann::json json_request;
643
644 // Convert from proto into json object
645 ConvertGenericMessageToJson(json_request, message, options_);
646
647 std::string post_body_json =
648 json_request.dump(-1, ' ', false, nlohmann::detail::error_handler_t::replace);
649 if (options_.console_debug)
650 {
651 OTEL_INTERNAL_LOG_DEBUG("[OTLP HTTP Client] Request body(Json)" << post_body_json);
652 }
653 body_vec.assign(post_body_json.begin(), post_body_json.end());
654 content_type = kHttpJsonContentType;
655 }
656
657 // Send the request
658 auto session = http_client_->CreateSession(options_.url);
659 auto request = session->CreateRequest();
660
661 for (auto &header : options_.http_headers)
662 {
663 request->AddHeader(header.first, header.second);
664 }
665 request->SetUri(http_uri_);
666 request->SetTimeoutMs(std::chrono::duration_cast<std::chrono::milliseconds>(options_.timeout));
667 request->SetMethod(http_client::Method::Post);
668 request->SetBody(body_vec);
669 request->ReplaceHeader("Content-Type", content_type);
670
671 // Send the request
672 std::unique_ptr<ResponseHandler> handler(new ResponseHandler(options_.console_debug));
673 session->SendRequest(*handler);
674
675 // Wait for the response to be received
676 if (options_.console_debug)
677 {
678 OTEL_INTERNAL_LOG_DEBUG(
679 "[OTLP HTTP Client] DEBUG: Waiting for response from "
680 << options_.url << " (timeout = "
681 << std::chrono::duration_cast<std::chrono::milliseconds>(options_.timeout).count()
682 << " milliseconds)");
683 }
684 bool write_successful = handler->waitForResponse();
685
686 // End the session
687 session->FinishSession();
688
689 // If an error occurred with the HTTP request
690 if (!write_successful)
691 {
692 // TODO: retry logic
693 return opentelemetry::sdk::common::ExportResult::kFailure;
694 }
695
696 return opentelemetry::sdk::common::ExportResult::kSuccess;
697 }
698
699 bool OtlpHttpClient::Shutdown(std::chrono::microseconds) noexcept
700 {
701 {
702 const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
703 is_shutdown_ = true;
704 }
705
706 // Shutdown the session manager
707 http_client_->CancelAllSessions();
708 http_client_->FinishAllSessions();
709
710 return true;
711 }
712
713 bool OtlpHttpClient::isShutdown() const noexcept
714 {
715 const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
716 return is_shutdown_;
717 }
718
719 } // namespace otlp
720 } // namespace exporter
721 OPENTELEMETRY_END_NAMESPACE