]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | #include <functional> | |
21 | #include <memory> | |
22 | ||
23 | #include <thrift/qt/TQTcpServer.h> | |
24 | #include <thrift/qt/TQIODeviceTransport.h> | |
25 | ||
26 | #include <QMetaType> | |
27 | #include <QTcpSocket> | |
28 | ||
29 | #include <thrift/protocol/TProtocol.h> | |
30 | #include <thrift/async/TAsyncProcessor.h> | |
31 | ||
32 | using apache::thrift::protocol::TProtocol; | |
33 | using apache::thrift::protocol::TProtocolFactory; | |
34 | using apache::thrift::transport::TTransport; | |
35 | using apache::thrift::transport::TTransportException; | |
36 | using apache::thrift::transport::TQIODeviceTransport; | |
37 | using std::bind; | |
38 | using std::function; | |
39 | using std::placeholders::_1; | |
40 | using std::shared_ptr; | |
41 | ||
42 | QT_USE_NAMESPACE | |
43 | ||
44 | namespace apache { | |
45 | namespace thrift { | |
46 | namespace async { | |
47 | ||
48 | struct TQTcpServer::ConnectionContext { | |
49 | shared_ptr<QTcpSocket> connection_; | |
50 | shared_ptr<TTransport> transport_; | |
51 | shared_ptr<TProtocol> iprot_; | |
52 | shared_ptr<TProtocol> oprot_; | |
53 | ||
54 | explicit ConnectionContext(shared_ptr<QTcpSocket> connection, | |
55 | shared_ptr<TTransport> transport, | |
56 | shared_ptr<TProtocol> iprot, | |
57 | shared_ptr<TProtocol> oprot) | |
58 | : connection_(connection), transport_(transport), iprot_(iprot), oprot_(oprot) {} | |
59 | }; | |
60 | ||
61 | TQTcpServer::TQTcpServer(shared_ptr<QTcpServer> server, | |
62 | shared_ptr<TAsyncProcessor> processor, | |
63 | shared_ptr<TProtocolFactory> pfact, | |
64 | QObject* parent) | |
65 | : QObject(parent), server_(server), processor_(processor), pfact_(pfact) { | |
66 | qRegisterMetaType<QTcpSocket*>("QTcpSocket*"); | |
67 | connect(server.get(), SIGNAL(newConnection()), SLOT(processIncoming())); | |
68 | } | |
69 | ||
70 | TQTcpServer::~TQTcpServer() = default; | |
71 | ||
72 | void TQTcpServer::processIncoming() { | |
73 | while (server_->hasPendingConnections()) { | |
74 | // take ownership of the QTcpSocket; technically it could be deleted | |
75 | // when the QTcpServer is destroyed, but any real app should delete this | |
76 | // class before deleting the QTcpServer that we are using | |
77 | shared_ptr<QTcpSocket> connection(server_->nextPendingConnection()); | |
78 | ||
79 | shared_ptr<TTransport> transport; | |
80 | shared_ptr<TProtocol> iprot; | |
81 | shared_ptr<TProtocol> oprot; | |
82 | ||
83 | try { | |
84 | transport = shared_ptr<TTransport>(new TQIODeviceTransport(connection)); | |
85 | iprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport)); | |
86 | oprot = shared_ptr<TProtocol>(pfact_->getProtocol(transport)); | |
87 | } catch (...) { | |
88 | qWarning("[TQTcpServer] Failed to initialize transports/protocols"); | |
89 | continue; | |
90 | } | |
91 | ||
92 | ctxMap_[connection.get()] | |
93 | = std::make_shared<ConnectionContext>(connection, transport, iprot, oprot); | |
94 | ||
95 | connect(connection.get(), SIGNAL(readyRead()), SLOT(beginDecode())); | |
96 | ||
97 | connect(connection.get(), SIGNAL(disconnected()), SLOT(socketClosed())); | |
98 | } | |
99 | } | |
100 | ||
101 | void TQTcpServer::beginDecode() { | |
102 | auto* connection(qobject_cast<QTcpSocket*>(sender())); | |
103 | Q_ASSERT(connection); | |
104 | ||
105 | if (ctxMap_.find(connection) == ctxMap_.end()) { | |
106 | qWarning("[TQTcpServer] Got data on an unknown QTcpSocket"); | |
107 | return; | |
108 | } | |
109 | ||
110 | shared_ptr<ConnectionContext> ctx = ctxMap_[connection]; | |
111 | ||
112 | try { | |
113 | processor_ | |
114 | ->process(bind(&TQTcpServer::finish, this, ctx, _1), | |
115 | ctx->iprot_, | |
116 | ctx->oprot_); | |
117 | } catch (const TTransportException& ex) { | |
118 | qWarning("[TQTcpServer] TTransportException during processing: '%s'", ex.what()); | |
119 | scheduleDeleteConnectionContext(connection); | |
120 | } catch (...) { | |
121 | qWarning("[TQTcpServer] Unknown processor exception"); | |
122 | scheduleDeleteConnectionContext(connection); | |
123 | } | |
124 | } | |
125 | ||
126 | void TQTcpServer::socketClosed() { | |
127 | auto* connection(qobject_cast<QTcpSocket*>(sender())); | |
128 | Q_ASSERT(connection); | |
129 | scheduleDeleteConnectionContext(connection); | |
130 | } | |
131 | ||
132 | void TQTcpServer::deleteConnectionContext(QTcpSocket* connection) { | |
133 | const ConnectionContextMap::size_type deleted = ctxMap_.erase(connection); | |
134 | if (0 == deleted) { | |
135 | qWarning("[TQTcpServer] Unknown QTcpSocket"); | |
136 | } | |
137 | } | |
138 | ||
139 | void TQTcpServer::scheduleDeleteConnectionContext(QTcpSocket* connection) { | |
140 | QMetaObject::invokeMethod(this, "deleteConnectionContext", Qt::QueuedConnection, Q_ARG(QTcpSocket*, connection)); | |
141 | } | |
142 | ||
143 | void TQTcpServer::finish(shared_ptr<ConnectionContext> ctx, bool healthy) { | |
144 | if (!healthy) { | |
145 | qWarning("[TQTcpServer] Processor failed to process data successfully"); | |
146 | deleteConnectionContext(ctx->connection_.get()); | |
147 | } | |
148 | } | |
149 | } | |
150 | } | |
151 | } // apache::thrift::async |