]> git.proxmox.com Git - ceph.git/blame - ceph/src/arrow/java/flight/flight-core/src/main/java/org/apache/arrow/flight/auth/ClientAuthWrapper.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / flight / flight-core / src / main / java / org / apache / arrow / flight / auth / ClientAuthWrapper.java
CommitLineData
1d09f67e
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package org.apache.arrow.flight.auth;
19
20import java.util.Iterator;
21import java.util.concurrent.CompletableFuture;
22import java.util.concurrent.ExecutionException;
23import java.util.concurrent.LinkedBlockingQueue;
24
25import org.apache.arrow.flight.auth.ClientAuthHandler.ClientAuthSender;
26import org.apache.arrow.flight.grpc.StatusUtils;
27import org.apache.arrow.flight.impl.Flight.HandshakeRequest;
28import org.apache.arrow.flight.impl.Flight.HandshakeResponse;
29import org.apache.arrow.flight.impl.FlightServiceGrpc.FlightServiceStub;
30
31import com.google.protobuf.ByteString;
32
33import io.grpc.StatusRuntimeException;
34import io.grpc.stub.StreamObserver;
35
36/**
37 * Utility class for performing authorization over using a GRPC stub.
38 */
39public class ClientAuthWrapper {
40
41 /**
42 * Do client auth for a client. The stub will be authenticated after this method returns.
43 *
44 * @param authHandler The handler to use.
45 * @param stub The service stub.
46 */
47 public static void doClientAuth(ClientAuthHandler authHandler, FlightServiceStub stub) {
48 AuthObserver observer = new AuthObserver();
49 try {
50 observer.responseObserver = stub.handshake(observer);
51 authHandler.authenticate(observer.sender, observer.iter);
52 if (!observer.sender.errored) {
53 observer.responseObserver.onCompleted();
54 }
55 } catch (StatusRuntimeException sre) {
56 throw StatusUtils.fromGrpcRuntimeException(sre);
57 }
58 try {
59 if (!observer.completed.get()) {
60 // TODO: ARROW-5681
61 throw new RuntimeException("Unauthenticated");
62 }
63 } catch (InterruptedException e) {
64 throw new RuntimeException(e);
65 } catch (ExecutionException e) {
66 throw StatusUtils.fromThrowable(e.getCause());
67 }
68 }
69
70 private static class AuthObserver implements StreamObserver<HandshakeResponse> {
71
72 private volatile StreamObserver<HandshakeRequest> responseObserver;
73 private final LinkedBlockingQueue<byte[]> messages = new LinkedBlockingQueue<>();
74 private final AuthSender sender = new AuthSender();
75 private CompletableFuture<Boolean> completed;
76
77 public AuthObserver() {
78 super();
79 completed = new CompletableFuture<>();
80 }
81
82 @Override
83 public void onNext(HandshakeResponse value) {
84 ByteString payload = value.getPayload();
85 if (payload != null) {
86 messages.add(payload.toByteArray());
87 }
88 }
89
90 private Iterator<byte[]> iter = new Iterator<byte[]>() {
91
92 @Override
93 public byte[] next() {
94 while (!completed.isDone() || !messages.isEmpty()) {
95 byte[] bytes = messages.poll();
96 if (bytes == null) {
97 // busy wait.
98 continue;
99 } else {
100 return bytes;
101 }
102 }
103
104 if (completed.isCompletedExceptionally()) {
105 // Preserve prior exception behavior
106 // TODO: with ARROW-5681, throw an appropriate Flight exception if gRPC raised an exception
107 try {
108 completed.get();
109 } catch (InterruptedException e) {
110 throw new RuntimeException(e);
111 } catch (ExecutionException e) {
112 if (e.getCause() instanceof StatusRuntimeException) {
113 throw (StatusRuntimeException) e.getCause();
114 }
115 throw new RuntimeException(e);
116 }
117 }
118
119 throw new IllegalStateException("You attempted to retrieve messages after there were none.");
120 }
121
122 @Override
123 public boolean hasNext() {
124 return !messages.isEmpty();
125 }
126 };
127
128 @Override
129 public void onError(Throwable t) {
130 completed.completeExceptionally(t);
131 }
132
133 private class AuthSender implements ClientAuthSender {
134
135 private boolean errored = false;
136
137 @Override
138 public void send(byte[] payload) {
139 try {
140 responseObserver.onNext(HandshakeRequest.newBuilder()
141 .setPayload(ByteString.copyFrom(payload))
142 .build());
143 } catch (StatusRuntimeException sre) {
144 throw StatusUtils.fromGrpcRuntimeException(sre);
145 }
146 }
147
148 @Override
149 public void onError(Throwable cause) {
150 this.errored = true;
151 responseObserver.onError(StatusUtils.toGrpcException(cause));
152 }
153
154 }
155
156 @Override
157 public void onCompleted() {
158 completed.complete(true);
159 }
160 }
161
162}