]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/jaeger-client-cpp/src/jaegertracing/ThriftSender.cpp
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / jaegertracing / jaeger-client-cpp / src / jaegertracing / ThriftSender.cpp
CommitLineData
f67539c2
TL
1/*
2 * Copyright (c) 2017-2018 Uber Technologies, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#include "jaegertracing/ThriftSender.h"
18
19#include "jaegertracing/Span.h"
20#include "jaegertracing/Tag.h"
21#include "jaegertracing/Tracer.h"
22#include <algorithm>
23#include <cstdint>
24#include <iostream>
25#include <iterator>
26#include <thrift/transport/TBufferTransports.h>
27
28#ifdef _MSC_VER
29#pragma warning(disable : 4267) // Conversion from unsigned to signed. It
30 // should not be a problem here.
31#endif
32
33namespace jaegertracing {
34namespace net {
35class IPAddress;
36} // namespace net
37
38namespace {
39
40constexpr auto kEmitBatchOverhead = 30;
41
42} // anonymous namespace
43
44ThriftSender::ThriftSender(std::unique_ptr<utils::Transport>&& transporter)
45 : _transporter(std::move(transporter))
46 , _maxSpanBytes(0)
47 , _byteBufferSize(0)
48 , _processByteSize(0)
49 , _protocolFactory(_transporter->protocolFactory())
50 , _thriftBuffer(new apache::thrift::transport::TMemoryBuffer())
51{
52}
53
54int ThriftSender::append(const Span& span)
55{
56 if (_process.serviceName.empty()) {
57 const auto& tracer = static_cast<const Tracer&>(span.tracer());
58 _process.serviceName = tracer.serviceName();
59
60 const auto& tracerTags = tracer.tags();
61 std::vector<thrift::Tag> thriftTags;
62 thriftTags.reserve(tracerTags.size());
63 std::transform(std::begin(tracerTags),
64 std::end(tracerTags),
65 std::back_inserter(thriftTags),
66 [](const Tag& tag) {
67 thrift::Tag thriftTag;
68 tag.thrift(thriftTag);
69 return thriftTag;
70 });
71 _process.__set_tags(thriftTags);
72
73 _processByteSize = calcSizeOfSerializedThrift(_process);
74 _maxSpanBytes =
75 _transporter->maxPacketSize() - _processByteSize - kEmitBatchOverhead;
76 }
77 thrift::Span jaegerSpan;
78 span.thrift(jaegerSpan);
79 const auto spanSize = calcSizeOfSerializedThrift(jaegerSpan);
80 if (spanSize > _maxSpanBytes) {
81 std::ostringstream oss;
82 throw Sender::Exception("Span is too large", 1);
83 }
84
85 _byteBufferSize += spanSize;
86 if (_byteBufferSize <= _maxSpanBytes) {
87 _spanBuffer.push_back(jaegerSpan);
88 if (_byteBufferSize < _maxSpanBytes) {
89 return 0;
90 }
91 return flush();
92 }
93
94 // Flush currently full buffer, then append this span to buffer.
95 const auto flushed = flush();
96 _spanBuffer.push_back(jaegerSpan);
97 _byteBufferSize = spanSize + _processByteSize;
98 return flushed;
99}
100
101int ThriftSender::flush()
102{
103 if (_spanBuffer.empty()) {
104 return 0;
105 }
106
107 thrift::Batch batch;
108 batch.__set_process(_process);
109 batch.__set_spans(_spanBuffer);
110
111 try {
112 _transporter->emitBatch(batch);
113 } catch (const std::system_error& ex) {
114 std::ostringstream oss;
115 oss << "Could not send span " << ex.what()
116 << ", code=" << ex.code().value();
117 throw Sender::Exception(oss.str(), _spanBuffer.size());
118 } catch (const std::exception& ex) {
119 std::ostringstream oss;
120 oss << "Could not send span " << ex.what();
121 throw Sender::Exception(oss.str(), _spanBuffer.size());
122 } catch (...) {
123 throw Sender::Exception("Could not send span, unknown error",
124 _spanBuffer.size());
125 }
126
127 resetBuffers();
128
129 return batch.spans.size();
130}
131
132} // namespace jaegertracing