]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/GetReadableBuffer.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / flight / flight-core / src / main / java / org / apache / arrow / flight / grpc / GetReadableBuffer.java
diff --git a/ceph/src/arrow/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/GetReadableBuffer.java b/ceph/src/arrow/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/GetReadableBuffer.java
new file mode 100644 (file)
index 0000000..5f8a715
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight.grpc;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+
+import org.apache.arrow.memory.ArrowBuf;
+
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteStreams;
+
+import io.grpc.internal.ReadableBuffer;
+
+/**
+ * Enable access to ReadableBuffer directly to copy data from a BufferInputStream into a target
+ * ByteBuffer/ByteBuf.
+ *
+ * <p>This could be solved by BufferInputStream exposing Drainable.
+ */
+public class GetReadableBuffer {
+
+  private static final Field READABLE_BUFFER;
+  private static final Class<?> BUFFER_INPUT_STREAM;
+
+  static {
+    Field tmpField = null;
+    Class<?> tmpClazz = null;
+    try {
+      Class<?> clazz = Class.forName("io.grpc.internal.ReadableBuffers$BufferInputStream");
+
+      Field f = clazz.getDeclaredField("buffer");
+      f.setAccessible(true);
+      // don't set until we've gotten past all exception cases.
+      tmpField = f;
+      tmpClazz = clazz;
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    READABLE_BUFFER = tmpField;
+    BUFFER_INPUT_STREAM = tmpClazz;
+  }
+
+  /**
+   * Extracts the ReadableBuffer for the given input stream.
+   *
+   * @param is Must be an instance of io.grpc.internal.ReadableBuffers$BufferInputStream or
+   *     null will be returned.
+   */
+  public static ReadableBuffer getReadableBuffer(InputStream is) {
+
+    if (BUFFER_INPUT_STREAM == null || !is.getClass().equals(BUFFER_INPUT_STREAM)) {
+      return null;
+    }
+
+    try {
+      return (ReadableBuffer) READABLE_BUFFER.get(is);
+    } catch (Exception ex) {
+      throw Throwables.propagate(ex);
+    }
+  }
+
+  /**
+   * Helper method to read a gRPC-provided InputStream into an ArrowBuf.
+   * @param stream The stream to read from. Should be an instance of {@link #BUFFER_INPUT_STREAM}.
+   * @param buf The buffer to read into.
+   * @param size The number of bytes to read.
+   * @param fastPath Whether to enable the fast path (i.e. detect whether the stream is a {@link #BUFFER_INPUT_STREAM}).
+   * @throws IOException if there is an error reading form the stream
+   */
+  public static void readIntoBuffer(final InputStream stream, final ArrowBuf buf, final int size,
+      final boolean fastPath) throws IOException {
+    ReadableBuffer readableBuffer = fastPath ? getReadableBuffer(stream) : null;
+    if (readableBuffer != null) {
+      readableBuffer.readBytes(buf.nioBuffer(0, size));
+    } else {
+      byte[] heapBytes = new byte[size];
+      ByteStreams.readFully(stream, heapBytes);
+      buf.writeBytes(heapBytes);
+    }
+    buf.writerIndex(size);
+  }
+}