]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Copyright (c) 2017-2018 Uber Technologies, Inc. | |
3 | * | |
4 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | * you may not use this file except in compliance with the License. | |
6 | * You may obtain a copy of the License at | |
7 | * | |
8 | * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | * | |
10 | * Unless required by applicable law or agreed to in writing, software | |
11 | * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | * See the License for the specific language governing permissions and | |
14 | * limitations under the License. | |
15 | */ | |
16 | ||
17 | #include "jaegertracing/ThriftSender.h" | |
18 | ||
19 | #include "jaegertracing/Span.h" | |
20 | #include "jaegertracing/Tag.h" | |
21 | #include "jaegertracing/Tracer.h" | |
22 | #include <algorithm> | |
23 | #include <cstdint> | |
24 | #include <iostream> | |
25 | #include <iterator> | |
26 | #include <thrift/transport/TBufferTransports.h> | |
27 | ||
28 | #ifdef _MSC_VER | |
29 | #pragma warning(disable : 4267) // Conversion from unsigned to signed. It | |
30 | // should not be a problem here. | |
31 | #endif | |
32 | ||
33 | namespace jaegertracing { | |
34 | namespace net { | |
35 | class IPAddress; | |
36 | } // namespace net | |
37 | ||
38 | namespace { | |
39 | ||
40 | constexpr auto kEmitBatchOverhead = 30; | |
41 | ||
42 | } // anonymous namespace | |
43 | ||
44 | ThriftSender::ThriftSender(std::unique_ptr<utils::Transport>&& transporter) | |
45 | : _transporter(std::move(transporter)) | |
46 | , _maxSpanBytes(0) | |
47 | , _byteBufferSize(0) | |
48 | , _processByteSize(0) | |
49 | , _protocolFactory(_transporter->protocolFactory()) | |
50 | , _thriftBuffer(new apache::thrift::transport::TMemoryBuffer()) | |
51 | { | |
52 | } | |
53 | ||
54 | int ThriftSender::append(const Span& span) | |
55 | { | |
56 | if (_process.serviceName.empty()) { | |
57 | const auto& tracer = static_cast<const Tracer&>(span.tracer()); | |
58 | _process.serviceName = tracer.serviceName(); | |
59 | ||
60 | const auto& tracerTags = tracer.tags(); | |
61 | std::vector<thrift::Tag> thriftTags; | |
62 | thriftTags.reserve(tracerTags.size()); | |
63 | std::transform(std::begin(tracerTags), | |
64 | std::end(tracerTags), | |
65 | std::back_inserter(thriftTags), | |
66 | [](const Tag& tag) { | |
67 | thrift::Tag thriftTag; | |
68 | tag.thrift(thriftTag); | |
69 | return thriftTag; | |
70 | }); | |
71 | _process.__set_tags(thriftTags); | |
72 | ||
73 | _processByteSize = calcSizeOfSerializedThrift(_process); | |
74 | _maxSpanBytes = | |
75 | _transporter->maxPacketSize() - _processByteSize - kEmitBatchOverhead; | |
76 | } | |
77 | thrift::Span jaegerSpan; | |
78 | span.thrift(jaegerSpan); | |
79 | const auto spanSize = calcSizeOfSerializedThrift(jaegerSpan); | |
80 | if (spanSize > _maxSpanBytes) { | |
81 | std::ostringstream oss; | |
82 | throw Sender::Exception("Span is too large", 1); | |
83 | } | |
84 | ||
85 | _byteBufferSize += spanSize; | |
86 | if (_byteBufferSize <= _maxSpanBytes) { | |
87 | _spanBuffer.push_back(jaegerSpan); | |
88 | if (_byteBufferSize < _maxSpanBytes) { | |
89 | return 0; | |
90 | } | |
91 | return flush(); | |
92 | } | |
93 | ||
94 | // Flush currently full buffer, then append this span to buffer. | |
95 | const auto flushed = flush(); | |
96 | _spanBuffer.push_back(jaegerSpan); | |
97 | _byteBufferSize = spanSize + _processByteSize; | |
98 | return flushed; | |
99 | } | |
100 | ||
101 | int ThriftSender::flush() | |
102 | { | |
103 | if (_spanBuffer.empty()) { | |
104 | return 0; | |
105 | } | |
106 | ||
107 | thrift::Batch batch; | |
108 | batch.__set_process(_process); | |
109 | batch.__set_spans(_spanBuffer); | |
110 | ||
111 | try { | |
112 | _transporter->emitBatch(batch); | |
113 | } catch (const std::system_error& ex) { | |
114 | std::ostringstream oss; | |
115 | oss << "Could not send span " << ex.what() | |
116 | << ", code=" << ex.code().value(); | |
117 | throw Sender::Exception(oss.str(), _spanBuffer.size()); | |
118 | } catch (const std::exception& ex) { | |
119 | std::ostringstream oss; | |
120 | oss << "Could not send span " << ex.what(); | |
121 | throw Sender::Exception(oss.str(), _spanBuffer.size()); | |
122 | } catch (...) { | |
123 | throw Sender::Exception("Could not send span, unknown error", | |
124 | _spanBuffer.size()); | |
125 | } | |
126 | ||
127 | resetBuffers(); | |
128 | ||
129 | return batch.spans.size(); | |
130 | } | |
131 | ||
132 | } // namespace jaegertracing |