]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/java/flight/flight-core/src/main/java/org/apache/arrow/flight/BackpressureStrategy.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / flight / flight-core / src / main / java / org / apache / arrow / flight / BackpressureStrategy.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.arrow.flight;
19
20 import org.apache.arrow.vector.VectorSchemaRoot;
21
22 import com.google.common.base.Preconditions;
23
24 /**
25 * Helper interface to dynamically handle backpressure when implementing FlightProducers.
26 * This must only be used in FlightProducer implementations that are non-blocking.
27 */
28 public interface BackpressureStrategy {
29 /**
30 * The state of the client after a call to waitForListener.
31 */
32 enum WaitResult {
33 /**
34 * Listener is ready.
35 */
36 READY,
37
38 /**
39 * Listener was cancelled by the client.
40 */
41 CANCELLED,
42
43 /**
44 * Timed out waiting for the listener to change state.
45 */
46 TIMEOUT,
47
48 /**
49 * Indicates that the wait was interrupted for a reason
50 * unrelated to the listener itself.
51 */
52 OTHER
53 }
54
55 /**
56 * Set up operations to work against the given listener.
57 *
58 * This must be called exactly once and before any calls to {@link #waitForListener(long)} and
59 * {@link OutboundStreamListener#start(VectorSchemaRoot)}
60 * @param listener The listener this strategy applies to.
61 */
62 void register(FlightProducer.ServerStreamListener listener);
63
64 /**
65 * Waits for the listener to be ready or cancelled up to the given timeout.
66 *
67 * @param timeout The timeout in milliseconds. Infinite if timeout is <= 0.
68 * @return The result of the wait.
69 */
70 WaitResult waitForListener(long timeout);
71
72 /**
73 * A back pressure strategy that uses callbacks to notify when the client is ready or cancelled.
74 */
75 class CallbackBackpressureStrategy implements BackpressureStrategy {
76 private final Object lock = new Object();
77 private FlightProducer.ServerStreamListener listener;
78
79 @Override
80 public void register(FlightProducer.ServerStreamListener listener) {
81 this.listener = listener;
82 listener.setOnReadyHandler(this::onReady);
83 listener.setOnCancelHandler(this::onCancel);
84 }
85
86 @Override
87 public WaitResult waitForListener(long timeout) {
88 Preconditions.checkNotNull(listener);
89 long remainingTimeout = timeout;
90 final long startTime = System.currentTimeMillis();
91 synchronized (lock) {
92 while (!listener.isReady() && !listener.isCancelled()) {
93 try {
94 lock.wait(remainingTimeout);
95 if (timeout != 0) { // If timeout was zero explicitly, we should never report timeout.
96 remainingTimeout = startTime + timeout - System.currentTimeMillis();
97 if (remainingTimeout <= 0) {
98 return WaitResult.TIMEOUT;
99 }
100 }
101 if (!shouldContinueWaiting(listener, remainingTimeout)) {
102 return WaitResult.OTHER;
103 }
104 } catch (InterruptedException ex) {
105 Thread.currentThread().interrupt();
106 return WaitResult.OTHER;
107 }
108 }
109
110 if (listener.isReady()) {
111 return WaitResult.READY;
112 } else if (listener.isCancelled()) {
113 return WaitResult.CANCELLED;
114 } else if (System.currentTimeMillis() > startTime + timeout) {
115 return WaitResult.TIMEOUT;
116 }
117 throw new RuntimeException("Invalid state when waiting for listener.");
118 }
119 }
120
121 /**
122 * Interrupt waiting on the listener to change state.
123 *
124 * This method can be used in conjunction with
125 * {@link #shouldContinueWaiting(FlightProducer.ServerStreamListener, long)} to allow FlightProducers to
126 * terminate streams internally and notify clients.
127 */
128 public void interruptWait() {
129 synchronized (lock) {
130 lock.notifyAll();
131 }
132 }
133
134 /**
135 * Callback function to run to check if the listener should continue
136 * to be waited on if it leaves the waiting state without being cancelled,
137 * ready, or timed out.
138 *
139 * This method should be used to determine if the wait on the listener was interrupted explicitly using a
140 * call to {@link #interruptWait()} or if it was woken up due to a spurious wake.
141 */
142 protected boolean shouldContinueWaiting(FlightProducer.ServerStreamListener listener, long remainingTimeout) {
143 return true;
144 }
145
146 /**
147 * Callback to execute when the listener becomes ready.
148 */
149 protected void readyCallback() {
150 }
151
152 /**
153 * Callback to execute when the listener is cancelled.
154 */
155 protected void cancelCallback() {
156 }
157
158 private void onReady() {
159 synchronized (lock) {
160 readyCallback();
161 lock.notifyAll();
162 }
163 }
164
165 private void onCancel() {
166 synchronized (lock) {
167 cancelCallback();
168 lock.notifyAll();
169 }
170 }
171 }
172 }