2 * Copyright (c) 2017-2018 Uber Technologies, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
25 #include <nlohmann/json.hpp>
27 #include "jaegertracing/Tracer.h"
28 #include "jaegertracing/net/IPAddress.h"
29 #include "jaegertracing/net/Socket.h"
30 #include "jaegertracing/net/http/Request.h"
31 #include "jaegertracing/net/http/Response.h"
33 namespace jaegertracing
{
37 #define JSON_FROM_FIELD(var, field) \
39 json[#field] = var.field; \
42 #define FIELD_FROM_JSON(var, field) \
44 var.__set_##field(json.at(#field)); \
47 void to_json(nlohmann::json
& json
, const Transport::type
& transport
)
49 json
= _Transport_VALUES_TO_NAMES
.at(static_cast<int>(transport
));
52 void from_json(const nlohmann::json
& json
, Transport::type
& transport
)
54 const auto str
= json
.get
<std::string
>();
56 transport
= Transport::HTTP
;
59 if (str
== "TCHANNEL") {
60 transport
= Transport::TCHANNEL
;
64 transport
= Transport::DUMMY
;
67 std::ostringstream oss
;
68 oss
<< "Invalid transport value " << str
;
69 throw std::invalid_argument(oss
.str());
72 void to_json(nlohmann::json
& json
, const Downstream
& downstream
)
74 JSON_FROM_FIELD(downstream
, serviceName
);
75 JSON_FROM_FIELD(downstream
, serverRole
);
76 JSON_FROM_FIELD(downstream
, host
);
77 JSON_FROM_FIELD(downstream
, port
);
78 JSON_FROM_FIELD(downstream
, transport
);
79 if (downstream
.downstream
) {
80 json
["downstream"] = *downstream
.downstream
;
84 void from_json(const nlohmann::json
& json
, Downstream
& downstream
)
86 FIELD_FROM_JSON(downstream
, serviceName
);
87 FIELD_FROM_JSON(downstream
, serverRole
);
88 FIELD_FROM_JSON(downstream
, host
);
89 FIELD_FROM_JSON(downstream
, port
);
90 downstream
.__set_transport(json
.at("transport").get
<Transport::type
>());
91 auto itr
= json
.find("downstream");
92 if (itr
!= std::end(json
) && !itr
->is_null()) {
93 downstream
.__set_downstream(
94 std::make_shared
<Downstream
>(itr
->get
<Downstream
>()));
98 void to_json(nlohmann::json
& json
, const StartTraceRequest
& request
)
100 JSON_FROM_FIELD(request
, serverRole
);
101 JSON_FROM_FIELD(request
, sampled
);
102 JSON_FROM_FIELD(request
, baggage
);
103 JSON_FROM_FIELD(request
, downstream
);
106 void from_json(const nlohmann::json
& json
, StartTraceRequest
& request
)
108 FIELD_FROM_JSON(request
, serverRole
);
109 FIELD_FROM_JSON(request
, sampled
);
110 FIELD_FROM_JSON(request
, baggage
);
111 FIELD_FROM_JSON(request
, downstream
);
114 void to_json(nlohmann::json
& json
, const JoinTraceRequest
& request
)
116 JSON_FROM_FIELD(request
, serverRole
);
117 if (request
.__isset
.downstream
) {
118 json
["downstream"] = request
.downstream
;
122 void from_json(const nlohmann::json
& json
, JoinTraceRequest
& request
)
124 FIELD_FROM_JSON(request
, serverRole
);
125 auto itr
= json
.find("downstream");
126 if (itr
!= std::end(json
) && !itr
->is_null()) {
127 request
.__set_downstream(itr
->get
<Downstream
>());
131 void to_json(nlohmann::json
& json
, const ObservedSpan
& observedSpan
)
133 JSON_FROM_FIELD(observedSpan
, traceId
);
134 JSON_FROM_FIELD(observedSpan
, sampled
);
135 JSON_FROM_FIELD(observedSpan
, baggage
);
138 void from_json(const nlohmann::json
& json
, ObservedSpan
& observedSpan
)
140 FIELD_FROM_JSON(observedSpan
, traceId
);
141 FIELD_FROM_JSON(observedSpan
, sampled
);
142 FIELD_FROM_JSON(observedSpan
, baggage
);
145 void to_json(nlohmann::json
& json
, const TraceResponse
& response
)
147 if (response
.__isset
.span
) {
148 JSON_FROM_FIELD(response
, span
);
150 if (response
.downstream
) {
151 json
["downstream"] = *response
.downstream
;
153 JSON_FROM_FIELD(response
, notImplementedError
);
156 void from_json(const nlohmann::json
& json
, TraceResponse
& response
)
158 auto itr
= json
.find("span");
159 if (itr
!= std::end(json
) && !itr
->is_null()) {
160 response
.__set_span(itr
->get
<ObservedSpan
>());
162 itr
= json
.find("downstream");
163 if (itr
!= std::end(json
) && !itr
->is_null()) {
164 response
.__set_downstream(
165 std::make_shared
<TraceResponse
>(itr
->get
<TraceResponse
>()));
167 FIELD_FROM_JSON(response
, notImplementedError
);
170 #undef FIELD_FROM_JSON
171 #undef JSON_FROM_FIELD
173 } // namespace thrift
177 constexpr auto kBaggageKey
= "crossdock-baggage-key";
178 constexpr auto kDefaultTracerServiceName
= "crossdock-cpp";
180 std::string
escape(const std::string
& str
)
183 result
.reserve(str
.size());
184 for (auto&& ch
: str
) {
200 std::string
bufferedRead(net::Socket
& socket
)
202 constexpr auto kBufferSize
= 256;
203 std::array
<char, kBufferSize
> buffer
;
205 auto numRead
= ::read(socket
.handle(), &buffer
[0], buffer
.size());
206 data
.append(&buffer
[0], numRead
);
207 while (numRead
== kBufferSize
) {
208 numRead
= ::read(socket
.handle(), &buffer
[0], buffer
.size());
209 data
.append(&buffer
[0], numRead
);
214 class RequestReader
: public opentracing::HTTPHeadersReader
{
216 explicit RequestReader(const net::http::Request
& request
)
221 opentracing::expected
<void> ForeachKey(
222 std::function
<opentracing::expected
<void>(opentracing::string_view
,
223 opentracing::string_view
)> f
)
226 for (auto&& header
: _request
.headers()) {
227 const auto result
= f(header
.key(), header
.value());
232 return opentracing::make_expected();
236 const net::http::Request
& _request
;
239 class RequestWriter
: public opentracing::HTTPHeadersWriter
{
241 explicit RequestWriter(std::ostream
& requestStream
)
242 : _requestStream(requestStream
)
246 opentracing::expected
<void>
247 Set(opentracing::string_view key
,
248 opentracing::string_view value
) const override
250 _requestStream
<< key
<< ": " << value
<< "\r\n";
251 return opentracing::make_expected();
255 std::ostream
& _requestStream
;
258 thrift::ObservedSpan
observeSpan(const opentracing::SpanContext
& ctx
)
260 const auto& sc
= static_cast<const SpanContext
&>(ctx
);
261 thrift::ObservedSpan observedSpan
;
262 std::ostringstream oss
;
264 observedSpan
.__set_traceId(oss
.str());
265 observedSpan
.__set_sampled(sc
.isSampled());
266 auto itr
= sc
.baggage().find(kBaggageKey
);
267 if (itr
!= std::end(sc
.baggage())) {
268 observedSpan
.__set_baggage(itr
->second
);
273 thrift::TraceResponse
callDownstreamHTTP(const opentracing::SpanContext
& ctx
,
274 const thrift::Downstream
& target
,
275 opentracing::Tracer
& tracer
,
276 logging::Logger
& logger
)
278 thrift::JoinTraceRequest request
;
279 request
.__set_serverRole(target
.serverRole
);
280 if (target
.downstream
) {
281 request
.__set_downstream(*target
.downstream
);
284 const auto requestJSON
= nlohmann::json(request
).dump();
286 socket
.open(AF_INET
, SOCK_STREAM
);
287 const auto authority
= target
.host
+ ':' + target
.port
;
288 socket
.connect("http://" + authority
);
289 std::ostringstream oss
;
290 oss
<< "POST /join_trace HTTP/1.1\r\n"
292 << authority
<< "\r\n";
293 RequestWriter
writer(oss
);
294 tracer
.Inject(ctx
, writer
);
295 oss
<< "Connection: close\r\n"
296 "Content-Type: application/json\r\n"
298 << requestJSON
.size() << "\r\n\r\n"
300 const auto message
= oss
.str();
301 logger
.info("Sending request downstream: " + escape(message
));
302 const auto numWritten
=
303 ::write(socket
.handle(), &message
[0], message
.size());
306 const auto responseStr
= bufferedRead(socket
);
307 logger
.info("Received downstream response: " + escape(responseStr
));
308 std::istringstream
iss(responseStr
);
309 auto response
= net::http::Response::parse(iss
);
310 return nlohmann::json::parse(response
.body());
313 thrift::TraceResponse
callDownstream(const opentracing::SpanContext
& ctx
,
314 const std::string
& /* role */,
315 const thrift::Downstream
& downstream
,
316 opentracing::Tracer
& tracer
,
317 logging::Logger
& logger
)
319 thrift::TraceResponse response
;
321 switch (downstream
.transport
) {
322 case thrift::Transport::HTTP
: {
323 response
= callDownstreamHTTP(ctx
, downstream
, tracer
, logger
);
325 case thrift::Transport::TCHANNEL
: {
326 response
.__set_notImplementedError(
327 "TCHANNEL transport not implemented");
329 case thrift::Transport::DUMMY
: {
330 response
.__set_notImplementedError("DUMMY transport not implemented");
333 throw std::invalid_argument("Unrecognized protocol " +
334 std::to_string(downstream
.transport
));
341 thrift::TraceResponse
prepareResponse(const opentracing::SpanContext
& ctx
,
342 const std::string
& role
,
343 const thrift::Downstream
* downstream
,
344 opentracing::Tracer
& tracer
,
345 logging::Logger
& logger
)
347 const auto observedSpan
= observeSpan(ctx
);
348 thrift::TraceResponse response
;
349 response
.__set_span(observedSpan
);
351 response
.__set_downstream(std::make_shared
<thrift::TraceResponse
>(
352 callDownstream(ctx
, role
, *downstream
, tracer
, logger
)));
357 struct GenerateTracesRequest
{
358 using StrMap
= std::unordered_map
<std::string
, std::string
>;
361 std::string _operation
;
366 void from_json(const nlohmann::json
& json
, GenerateTracesRequest
& request
)
368 request
._type
= json
.at("type");
369 request
._operation
= json
.at("operation");
370 request
._tags
= json
.at("tags").get
<GenerateTracesRequest::StrMap
>();
371 request
._count
= json
.at("count");
374 } // anonymous namespace
376 using Handler
= std::function
<std::string(const net::http::Request
&)>;
378 class Server::SocketListener
{
380 SocketListener(const net::IPAddress
& ip
,
381 const std::shared_ptr
<logging::Logger
>& logger
,
391 ~SocketListener() { stop(); }
395 std::promise
<void> started
;
396 _thread
= std::thread([this, &started
]() { start(_ip
, started
); });
397 started
.get_future().get();
410 void start(const net::IPAddress
& ip
, std::promise
<void>& started
)
412 _socket
.open(AF_INET
, SOCK_STREAM
);
413 const auto enable
= 1;
414 ::setsockopt(_socket
.handle(),
424 using TaskList
= std::deque
<std::future
<void>>;
428 auto client
= _socket
.accept();
429 auto future
= std::async(
431 [this](net::Socket
&& socket
) {
432 net::Socket
client(std::move(socket
));
433 auto requestStr
= bufferedRead(client
);
434 _logger
->info("Received request: " + escape(requestStr
));
437 std::istringstream
iss(requestStr
);
438 const auto request
= net::http::Request::parse(iss
);
439 const auto responseStr
= _handler(request
);
440 const auto numWritten
= ::write(client
.handle(),
444 static_cast<int>(responseStr
.size())) {
445 std::ostringstream oss
;
446 oss
<< "Unable to write entire response"
449 << ", responseSize=" << responseStr
.size();
450 _logger
->error(oss
.str());
453 utils::ErrorUtil::logError(*_logger
, "Server error");
454 constexpr auto message
=
455 "HTTP/1.1 500 Internal Server Error\r\n\r\n";
456 constexpr auto messageSize
= sizeof(message
) - 1;
457 const auto numWritten
=
458 ::write(client
.handle(), message
, messageSize
);
465 tasks
.emplace_back(std::move(future
));
468 std::for_each(std::begin(tasks
),
470 [](TaskList::value_type
& future
) { future
.get(); });
475 std::shared_ptr
<logging::Logger
> _logger
;
477 std::atomic
<bool> _running
;
481 class Server::EndToEndHandler
{
483 using TracerPtr
= std::shared_ptr
<opentracing::Tracer
>;
485 EndToEndHandler(const std::string
& agentHostPort
,
486 const std::string
& collectorEndpoint
,
487 const std::string
& samplingServerURL
)
488 : _agentHostPort(agentHostPort
)
489 , _collectorEndpoint(collectorEndpoint
)
490 , _samplingServerURL(samplingServerURL
)
494 TracerPtr
findOrMakeTracer(std::string samplerType
)
496 if (samplerType
.empty()) {
497 samplerType
= kSamplerTypeRemote
;
500 std::lock_guard
<std::mutex
> lock(_mutex
);
501 auto itr
= _tracers
.find(samplerType
);
502 if (itr
!= std::end(_tracers
)) {
505 return init(samplerType
);
509 Config
makeEndToEndConfig(const std::string
& samplerType
) const
512 samplers::Config(samplerType
,
515 samplers::Config::kDefaultMaxOperations
,
516 std::chrono::seconds(5)),
517 reporters::Config(reporters::Config::kDefaultQueueSize
,
518 std::chrono::seconds(1),
521 _collectorEndpoint
));
524 TracerPtr
init(const std::string
& samplerType
)
526 const auto config
= makeEndToEndConfig(samplerType
);
527 auto tracer
= Tracer::make(kDefaultTracerServiceName
, config
);
528 _tracers
[config
.sampler().type()] = tracer
;
532 std::string _agentHostPort
;
533 std::string _collectorEndpoint
;
534 std::string _samplingServerURL
;
535 std::unordered_map
<std::string
, TracerPtr
> _tracers
;
539 Server::Server(const net::IPAddress
& clientIP
,
540 const net::IPAddress
& serverIP
,
541 const std::string
& agentHostPort
,
542 const std::string
& collectorEndpoint
,
543 const std::string
& samplingServerURL
)
544 : _logger(logging::consoleLogger())
545 , _tracer(Tracer::make(kDefaultTracerServiceName
, Config(), _logger
))
547 new SocketListener(clientIP
,
549 [this](const net::http::Request
& request
) {
550 return handleRequest(request
);
553 new SocketListener(serverIP
,
555 [this](const net::http::Request
& request
) {
556 return handleRequest(request
);
558 , _handler(new EndToEndHandler(agentHostPort
, collectorEndpoint
, samplingServerURL
))
562 Server::~Server() = default;
566 _clientListener
->start();
567 _serverListener
->start();
570 template <typename RequestType
>
571 std::string
Server::handleJSON(
572 const net::http::Request
& request
,
573 std::function
<thrift::TraceResponse(
574 const RequestType
&, const opentracing::SpanContext
&)> handler
)
576 RequestReader
reader(request
);
577 auto result
= _tracer
->Extract(reader
);
579 std::ostringstream oss
;
580 oss
<< "Cannot read request body: opentracing error code "
581 << result
.error().value();
582 const auto message
= oss
.str();
585 oss
<< "HTTP/1.1 400 Bad Request\r\n"
587 << message
.size() << "\r\n\r\n"
591 std::unique_ptr
<opentracing::SpanContext
> ctx(result
->release());
592 opentracing::StartSpanOptions options
;
593 options
.start_system_timestamp
= std::chrono::system_clock::now();
594 options
.start_steady_timestamp
= std::chrono::steady_clock::now();
596 options
.references
.emplace_back(std::make_pair(
597 opentracing::SpanReferenceType::ChildOfRef
, ctx
.get()));
599 auto span
= _tracer
->StartSpanWithOptions("post", options
);
601 RequestType thriftRequest
;
603 thriftRequest
= nlohmann::json::parse(request
.body());
604 } catch (const std::exception
& ex
) {
605 std::ostringstream oss
;
606 oss
<< "Cannot parse request JSON: " << ex
.what()
607 << ", json: " << request
.body();
608 const auto message
= oss
.str();
611 oss
<< "HTTP/1.1 500 Internal Server Error\r\n"
613 << message
.size() << "\r\n\r\n"
617 std::ostringstream oss
;
618 oss
<< "Cannot parse request JSON, json: " << request
.body();
619 const auto message
= oss
.str();
622 oss
<< "HTTP/1.1 500 Internal Server Error\r\n"
624 << message
.size() << "\r\n\r\n"
629 const auto thriftResponse
= handler(thriftRequest
, span
->context());
631 const auto message
= nlohmann::json(thriftResponse
).dump();
632 std::ostringstream oss
;
633 oss
<< "HTTP/1.1 200 OK\r\n"
634 "Content-Type: application/json\r\n"
636 << message
.size() << "\r\n\r\n"
639 } catch (const std::exception
& ex
) {
640 std::ostringstream oss
;
641 oss
<< "Cannot marshal response to JSON: " << ex
.what();
642 const auto message
= oss
.str();
645 oss
<< "HTTP/1.1 500 Internal Server Error\r\n"
647 << message
.size() << "\r\n\r\n"
651 std::ostringstream oss
;
652 oss
<< "Cannot marshal response to JSON";
653 const auto message
= oss
.str();
656 oss
<< "HTTP/1.1 500 Internal Server Error\r\n"
658 << message
.size() << "\r\n\r\n"
664 std::string
Server::handleRequest(const net::http::Request
& request
)
666 if (request
.target() == "/") {
667 return "HTTP/1.1 200 OK\r\n\r\n";
669 if (request
.target() == "/start_trace") {
670 return handleJSON
<thrift::StartTraceRequest
>(
672 [this](const thrift::StartTraceRequest
& request
,
673 const opentracing::SpanContext
& /* ctx */) {
674 return startTrace(request
);
677 if (request
.target() == "/join_trace") {
678 return handleJSON
<thrift::JoinTraceRequest
>(
680 [this](const thrift::JoinTraceRequest
& request
,
681 const opentracing::SpanContext
& ctx
) {
682 return joinTrace(request
, ctx
);
685 if (request
.target() == "/create_traces") {
686 return generateTraces(request
);
688 return "HTTP/1.1 404 Not Found\r\n\r\n";
691 thrift::TraceResponse
692 Server::startTrace(const crossdock::thrift::StartTraceRequest
& request
)
694 auto span
= _tracer
->StartSpan(request
.serverRole
);
695 if (request
.sampled
) {
696 span
->SetTag("sampling.priority", 1);
698 span
->SetBaggageItem(kBaggageKey
, request
.baggage
);
700 return prepareResponse(span
->context(),
707 thrift::TraceResponse
708 Server::joinTrace(const crossdock::thrift::JoinTraceRequest
& request
,
709 const opentracing::SpanContext
& ctx
)
711 return prepareResponse(ctx
,
713 request
.__isset
.downstream
? &request
.downstream
719 std::string
Server::generateTraces(const net::http::Request
& requestHTTP
)
721 GenerateTracesRequest request
;
723 request
= nlohmann::json::parse(requestHTTP
.body());
724 } catch (const std::exception
& ex
) {
725 std::ostringstream oss
;
726 oss
<< "JSON payload is invalid: " << ex
.what();
727 const auto message
= oss
.str();
730 oss
<< "HTTP/1.1 400 Bad Request\r\n"
732 << message
.size() << "\r\n\r\n"
736 const std::string
message("JSON payload is invalid");
737 std::ostringstream oss
;
738 oss
<< "HTTP/1.1 400 Bad Request\r\n"
740 << message
.size() << "\r\n\r\n"
745 auto tracer
= _handler
->findOrMakeTracer(request
._type
);
747 const std::string
message("Tracer is not initialized");
748 std::ostringstream oss
;
749 oss
<< "HTTP/1.1 500 Internal Server Error\r\n"
751 << message
.size() << "\r\n"
756 for (auto i
= 0; i
< request
._count
; ++i
) {
757 auto span
= tracer
->StartSpan(request
._operation
);
758 for (auto&& pair
: request
._tags
) {
759 span
->SetTag(pair
.first
, pair
.second
);
764 return "HTTP/1.1 200 OK\r\n\r\n";
767 } // namespace crossdock
768 } // namespace jaegertracing
772 const auto rawSenderType
= std::getenv("SENDER");
773 const std::string
senderType(rawSenderType
? rawSenderType
: "");
775 if (senderType
.empty()) {
776 std::cerr
<< "env SENDER is not specified!\n";
780 const auto rawAgentHostPort
= std::getenv("AGENT_HOST_PORT");
781 const std::string
agentHostPort(rawAgentHostPort
? rawAgentHostPort
: "");
783 if (agentHostPort
.empty() && senderType
== "udp") {
784 std::cerr
<< "env AGENT_HOST_PORT is not specified!\n";
788 const std::string
collectorEndpoint(senderType
== "http" ? "http://jaeger-collector:14268/api/traces" : "");
790 const auto rawSamplingServerURL
= std::getenv("SAMPLING_SERVER_URL");
791 const std::string
samplingServerURL(
792 rawSamplingServerURL
? rawSamplingServerURL
: "");
793 if (samplingServerURL
.empty()) {
794 std::cerr
<< "env SAMPLING_SERVER_URL is not specified!\n";
798 jaegertracing::crossdock::Server
server(
799 jaegertracing::net::IPAddress::v4("0.0.0.0:8080"),
800 jaegertracing::net::IPAddress::v4("0.0.0.0:8081"),
806 std::this_thread::sleep_for(std::chrono::hours(1));