2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org
.apache
.arrow
.flight
;
20 import org
.apache
.arrow
.vector
.VectorSchemaRoot
;
22 import com
.google
.common
.base
.Preconditions
;
25 * Helper interface to dynamically handle backpressure when implementing FlightProducers.
26 * This must only be used in FlightProducer implementations that are non-blocking.
28 public interface BackpressureStrategy
{
30 * The state of the client after a call to waitForListener.
39 * Listener was cancelled by the client.
44 * Timed out waiting for the listener to change state.
49 * Indicates that the wait was interrupted for a reason
50 * unrelated to the listener itself.
56 * Set up operations to work against the given listener.
58 * This must be called exactly once and before any calls to {@link #waitForListener(long)} and
59 * {@link OutboundStreamListener#start(VectorSchemaRoot)}
60 * @param listener The listener this strategy applies to.
62 void register(FlightProducer
.ServerStreamListener listener
);
65 * Waits for the listener to be ready or cancelled up to the given timeout.
67 * @param timeout The timeout in milliseconds. Infinite if timeout is <= 0.
68 * @return The result of the wait.
70 WaitResult
waitForListener(long timeout
);
73 * A back pressure strategy that uses callbacks to notify when the client is ready or cancelled.
75 class CallbackBackpressureStrategy
implements BackpressureStrategy
{
76 private final Object lock
= new Object();
77 private FlightProducer
.ServerStreamListener listener
;
80 public void register(FlightProducer
.ServerStreamListener listener
) {
81 this.listener
= listener
;
82 listener
.setOnReadyHandler(this::onReady
);
83 listener
.setOnCancelHandler(this::onCancel
);
87 public WaitResult
waitForListener(long timeout
) {
88 Preconditions
.checkNotNull(listener
);
89 long remainingTimeout
= timeout
;
90 final long startTime
= System
.currentTimeMillis();
92 while (!listener
.isReady() && !listener
.isCancelled()) {
94 lock
.wait(remainingTimeout
);
95 if (timeout
!= 0) { // If timeout was zero explicitly, we should never report timeout.
96 remainingTimeout
= startTime
+ timeout
- System
.currentTimeMillis();
97 if (remainingTimeout
<= 0) {
98 return WaitResult
.TIMEOUT
;
101 if (!shouldContinueWaiting(listener
, remainingTimeout
)) {
102 return WaitResult
.OTHER
;
104 } catch (InterruptedException ex
) {
105 Thread
.currentThread().interrupt();
106 return WaitResult
.OTHER
;
110 if (listener
.isReady()) {
111 return WaitResult
.READY
;
112 } else if (listener
.isCancelled()) {
113 return WaitResult
.CANCELLED
;
114 } else if (System
.currentTimeMillis() > startTime
+ timeout
) {
115 return WaitResult
.TIMEOUT
;
117 throw new RuntimeException("Invalid state when waiting for listener.");
122 * Interrupt waiting on the listener to change state.
124 * This method can be used in conjunction with
125 * {@link #shouldContinueWaiting(FlightProducer.ServerStreamListener, long)} to allow FlightProducers to
126 * terminate streams internally and notify clients.
128 public void interruptWait() {
129 synchronized (lock
) {
135 * Callback function to run to check if the listener should continue
136 * to be waited on if it leaves the waiting state without being cancelled,
137 * ready, or timed out.
139 * This method should be used to determine if the wait on the listener was interrupted explicitly using a
140 * call to {@link #interruptWait()} or if it was woken up due to a spurious wake.
142 protected boolean shouldContinueWaiting(FlightProducer
.ServerStreamListener listener
, long remainingTimeout
) {
147 * Callback to execute when the listener becomes ready.
149 protected void readyCallback() {
153 * Callback to execute when the listener is cancelled.
155 protected void cancelCallback() {
158 private void onReady() {
159 synchronized (lock
) {
165 private void onCancel() {
166 synchronized (lock
) {