]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/server/TThreadedServer.cpp
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / jaegertracing / thrift / lib / cpp / src / thrift / server / TThreadedServer.cpp
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <string>
21 #include <memory>
22 #include <thrift/concurrency/ThreadFactory.h>
23 #include <thrift/server/TThreadedServer.h>
24
25 namespace apache {
26 namespace thrift {
27 namespace server {
28
29 using apache::thrift::concurrency::Runnable;
30 using apache::thrift::concurrency::Synchronized;
31 using apache::thrift::concurrency::Thread;
32 using apache::thrift::concurrency::ThreadFactory;
33 using apache::thrift::protocol::TProtocol;
34 using apache::thrift::protocol::TProtocolFactory;
35 using std::make_shared;
36 using std::shared_ptr;
37 using apache::thrift::transport::TServerTransport;
38 using apache::thrift::transport::TTransport;
39 using apache::thrift::transport::TTransportException;
40 using apache::thrift::transport::TTransportFactory;
41
42 TThreadedServer::TThreadedServer(const shared_ptr<TProcessorFactory>& processorFactory,
43 const shared_ptr<TServerTransport>& serverTransport,
44 const shared_ptr<TTransportFactory>& transportFactory,
45 const shared_ptr<TProtocolFactory>& protocolFactory,
46 const shared_ptr<ThreadFactory>& threadFactory)
47 : TServerFramework(processorFactory, serverTransport, transportFactory, protocolFactory),
48 threadFactory_(threadFactory) {
49 }
50
51 TThreadedServer::TThreadedServer(const shared_ptr<TProcessor>& processor,
52 const shared_ptr<TServerTransport>& serverTransport,
53 const shared_ptr<TTransportFactory>& transportFactory,
54 const shared_ptr<TProtocolFactory>& protocolFactory,
55 const shared_ptr<ThreadFactory>& threadFactory)
56 : TServerFramework(processor, serverTransport, transportFactory, protocolFactory),
57 threadFactory_(threadFactory) {
58 }
59
60 TThreadedServer::TThreadedServer(const shared_ptr<TProcessorFactory>& processorFactory,
61 const shared_ptr<TServerTransport>& serverTransport,
62 const shared_ptr<TTransportFactory>& inputTransportFactory,
63 const shared_ptr<TTransportFactory>& outputTransportFactory,
64 const shared_ptr<TProtocolFactory>& inputProtocolFactory,
65 const shared_ptr<TProtocolFactory>& outputProtocolFactory,
66 const shared_ptr<ThreadFactory>& threadFactory)
67 : TServerFramework(processorFactory,
68 serverTransport,
69 inputTransportFactory,
70 outputTransportFactory,
71 inputProtocolFactory,
72 outputProtocolFactory),
73 threadFactory_(threadFactory) {
74 }
75
76 TThreadedServer::TThreadedServer(const shared_ptr<TProcessor>& processor,
77 const shared_ptr<TServerTransport>& serverTransport,
78 const shared_ptr<TTransportFactory>& inputTransportFactory,
79 const shared_ptr<TTransportFactory>& outputTransportFactory,
80 const shared_ptr<TProtocolFactory>& inputProtocolFactory,
81 const shared_ptr<TProtocolFactory>& outputProtocolFactory,
82 const shared_ptr<ThreadFactory>& threadFactory)
83 : TServerFramework(processor,
84 serverTransport,
85 inputTransportFactory,
86 outputTransportFactory,
87 inputProtocolFactory,
88 outputProtocolFactory),
89 threadFactory_(threadFactory) {
90 }
91
92 TThreadedServer::~TThreadedServer() = default;
93
94 void TThreadedServer::serve() {
95 TServerFramework::serve();
96
97 // Ensure post-condition of no active clients
98 Synchronized s(clientMonitor_);
99 while (!activeClientMap_.empty()) {
100 clientMonitor_.wait();
101 }
102
103 drainDeadClients();
104 }
105
106 void TThreadedServer::drainDeadClients() {
107 // we're in a monitor here
108 while (!deadClientMap_.empty()) {
109 auto it = deadClientMap_.begin();
110 it->second->join();
111 deadClientMap_.erase(it);
112 }
113 }
114
115 void TThreadedServer::onClientConnected(const shared_ptr<TConnectedClient>& pClient) {
116 Synchronized sync(clientMonitor_);
117 shared_ptr<TConnectedClientRunner> pRunnable = make_shared<TConnectedClientRunner>(pClient);
118 shared_ptr<Thread> pThread = threadFactory_->newThread(pRunnable);
119 pRunnable->thread(pThread);
120 activeClientMap_.insert(ClientMap::value_type(pClient.get(), pThread));
121 pThread->start();
122 }
123
124 void TThreadedServer::onClientDisconnected(TConnectedClient* pClient) {
125 Synchronized sync(clientMonitor_);
126 drainDeadClients(); // use the outgoing thread to do some maintenance on our dead client backlog
127 auto it = activeClientMap_.find(pClient);
128 if (it != activeClientMap_.end()) {
129 auto end = it;
130 deadClientMap_.insert(it, ++end);
131 activeClientMap_.erase(it);
132 }
133 if (activeClientMap_.empty()) {
134 clientMonitor_.notify();
135 }
136 }
137
138 TThreadedServer::TConnectedClientRunner::TConnectedClientRunner(const shared_ptr<TConnectedClient>& pClient)
139 : pClient_(pClient) {
140 }
141
142 TThreadedServer::TConnectedClientRunner::~TConnectedClientRunner() = default;
143
144 void TThreadedServer::TConnectedClientRunner::run() /* override */ {
145 pClient_->run(); // Run the client
146 pClient_.reset(); // The client is done - release it here rather than in the destructor for safety
147 }
148
149 }
150 }
151 } // apache::thrift::server