]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java
import quincy beta 17.1.0
[ceph.git] / ceph / src / jaegertracing / thrift / lib / java / src / org / apache / thrift / TNonblockingMultiFetchClient.java
diff --git a/ceph/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java b/ceph/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/TNonblockingMultiFetchClient.java
deleted file mode 100644 (file)
index 382d978..0000000
+++ /dev/null
@@ -1,399 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.thrift;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-
-/**
- * This class uses a single thread to set up non-blocking sockets to a set
- * of remote servers (hostname and port pairs), and sends a same request to
- * all these servers. It then fetches responses from servers.
- *
- * Parameters:
- *   int maxRecvBufBytesPerServer - an upper limit for receive buffer size
- * per server (in byte). If a response from a server exceeds this limit, the
- * client will not allocate memory or read response data for it.
- *
- *   int fetchTimeoutSeconds - time limit for fetching responses from all
- * servers (in second). After the timeout, the fetch job is stopped and
- * available responses are returned.
- *
- *   ByteBuffer requestBuf - request message that is sent to all servers.
- *
- * Output:
- *   Responses are stored in an array of ByteBuffers. Index of elements in
- * this array corresponds to index of servers in the server list. Content in
- * a ByteBuffer may be in one of the following forms:
- *   1. First 4 bytes form an integer indicating length of following data,
- * then followed by the data.
- *   2. First 4 bytes form an integer indicating length of following data,
- * then followed by nothing - this happens when the response data size
- * exceeds maxRecvBufBytesPerServer, and the client will not read any
- * response data.
- *   3. No data in the ByteBuffer - this happens when the server does not
- * return any response within fetchTimeoutSeconds.
- *
- *   In some special cases (no servers are given, fetchTimeoutSeconds less
- * than or equal to 0, requestBuf is null), the return is null.
- *
- * Note:
- *   It assumes all remote servers are TNonblockingServers and use
- * TFramedTransport.
- *
- */
-public class TNonblockingMultiFetchClient {
-  
-  private static final Logger LOGGER = LoggerFactory.getLogger(
-    TNonblockingMultiFetchClient.class.getName()
-  );
-
-  // if the size of the response msg exceeds this limit (in byte), we will
-  // not read the msg
-  private int maxRecvBufBytesPerServer;
-
-  // time limit for fetching data from all servers (in second)
-  private int fetchTimeoutSeconds;
-
-  // store request that will be sent to servers  
-  private ByteBuffer requestBuf;
-  private ByteBuffer requestBufDuplication;
-
-  // a list of remote servers
-  private List<InetSocketAddress> servers;
-
-  // store fetch results
-  private TNonblockingMultiFetchStats stats;
-  private ByteBuffer[] recvBuf;
-
-  public TNonblockingMultiFetchClient(int maxRecvBufBytesPerServer,
-    int fetchTimeoutSeconds, ByteBuffer requestBuf,
-    List<InetSocketAddress> servers) {
-    this.maxRecvBufBytesPerServer = maxRecvBufBytesPerServer;
-    this.fetchTimeoutSeconds = fetchTimeoutSeconds;
-    this.requestBuf = requestBuf;
-    this.servers = servers;
-      
-    stats = new TNonblockingMultiFetchStats();
-    recvBuf = null;
-  }
-
-  public synchronized int getMaxRecvBufBytesPerServer() {
-    return maxRecvBufBytesPerServer;
-  }
-
-  public synchronized int getFetchTimeoutSeconds() {
-    return fetchTimeoutSeconds;
-  }
-
-  /**
-   * return a duplication of requestBuf, so that requestBuf will not
-   * be modified by others.
-   */
-  public synchronized ByteBuffer getRequestBuf() {
-    if (requestBuf == null) {
-      return null;
-    } else {
-      if (requestBufDuplication == null) {
-        requestBufDuplication = requestBuf.duplicate();
-      }
-      return requestBufDuplication;  
-    }
-  }
-
-  public synchronized List<InetSocketAddress> getServerList() {
-    if (servers == null) {
-      return null;
-    }
-    return Collections.unmodifiableList(servers);
-  }
-
-  public synchronized TNonblockingMultiFetchStats getFetchStats() {
-    return stats;
-  }
-
-  /**
-   * main entry function for fetching from servers
-   */
-  public synchronized ByteBuffer[] fetch() {
-    // clear previous results
-    recvBuf = null;
-    stats.clear();
-
-    if (servers == null || servers.size() == 0 ||
-        requestBuf == null || fetchTimeoutSeconds <= 0) {
-      return recvBuf;
-    }
-
-    ExecutorService executor = Executors.newSingleThreadExecutor();
-    MultiFetch multiFetch = new MultiFetch();
-    FutureTask<?> task = new FutureTask(multiFetch, null);
-    executor.execute(task);
-    try {
-      task.get(fetchTimeoutSeconds, TimeUnit.SECONDS);
-    } catch(InterruptedException ie) {
-      // attempt to cancel execution of the task.
-      task.cancel(true);
-      LOGGER.error("interrupted during fetch: "+ie.toString());
-    } catch(ExecutionException ee) {
-      // attempt to cancel execution of the task.
-      task.cancel(true);
-      LOGGER.error("exception during fetch: "+ee.toString());
-    } catch(TimeoutException te) {
-      // attempt to cancel execution of the task.  
-      task.cancel(true);
-      LOGGER.error("timeout for fetch: "+te.toString());
-    }
-
-    executor.shutdownNow();
-    multiFetch.close();
-    return recvBuf;
-  }
-
-  /**
-   * Private class that does real fetch job.
-   * Users are not allowed to directly use this class, as its run()
-   * function may run forever.
-   */
-  private class MultiFetch implements Runnable {
-    private Selector selector;
-
-    /**
-     * main entry function for fetching.
-     *
-     * Server responses are stored in TNonblocingMultiFetchClient.recvBuf,
-     * and fetch statistics is in TNonblockingMultiFetchClient.stats.
-     *
-     * Sanity check for parameters has been done in
-     * TNonblockingMultiFetchClient before calling this function.
-     */
-    public void run() {
-      long t1 = System.currentTimeMillis();
-
-      int numTotalServers = servers.size();
-      stats.setNumTotalServers(numTotalServers);
-
-      // buffer for receiving response from servers
-      recvBuf                     = new ByteBuffer[numTotalServers];
-      // buffer for sending request
-      ByteBuffer sendBuf[]        = new ByteBuffer[numTotalServers];
-      long numBytesRead[]         = new long[numTotalServers];
-      int frameSize[]             = new int[numTotalServers];
-      boolean hasReadFrameSize[]  = new boolean[numTotalServers];
-
-      try {
-        selector = Selector.open();
-      } catch (IOException e) {
-        LOGGER.error("selector opens error: "+e.toString());
-        return;
-      }
-
-      for (int i = 0; i < numTotalServers; i++) {
-        // create buffer to send request to server.
-        sendBuf[i] = requestBuf.duplicate();
-        // create buffer to read response's frame size from server
-        recvBuf[i] = ByteBuffer.allocate(4);
-        stats.incTotalRecvBufBytes(4);
-
-        InetSocketAddress server = servers.get(i);
-        SocketChannel s = null;
-        SelectionKey key = null;
-        try {
-          s = SocketChannel.open();
-          s.configureBlocking(false);
-          // now this method is non-blocking
-          s.connect(server);
-          key = s.register(selector, s.validOps());
-          // attach index of the key
-          key.attach(i);
-        } catch (Exception e) {
-          stats.incNumConnectErrorServers();  
-          String err = String.format("set up socket to server %s error: %s",
-            server.toString(), e.toString());
-          LOGGER.error(err);
-          // free resource
-          if (s != null) {
-            try {s.close();} catch (Exception ex) {}
-          }            
-          if (key != null) {
-             key.cancel();
-          }
-        }
-      }
-
-      // wait for events
-      while (stats.getNumReadCompletedServers() +
-        stats.getNumConnectErrorServers() < stats.getNumTotalServers()) {
-        // if the thread is interrupted (e.g., task is cancelled)  
-        if (Thread.currentThread().isInterrupted()) {
-          return;
-        }
-
-        try{
-          selector.select();
-        } catch (Exception e) {
-          LOGGER.error("selector selects error: "+e.toString());
-          continue;
-        }
-
-        Iterator<SelectionKey> it = selector.selectedKeys().iterator();
-        while (it.hasNext()) {
-          SelectionKey selKey = it.next();
-          it.remove();
-
-          // get previously attached index
-          int index = (Integer)selKey.attachment();
-
-          if (selKey.isValid() && selKey.isConnectable()) {
-            // if this socket throws an exception (e.g., connection refused),
-            // print error msg and skip it.
-            try {
-              SocketChannel sChannel = (SocketChannel)selKey.channel();
-              sChannel.finishConnect();
-            } catch (Exception e) {
-              stats.incNumConnectErrorServers();
-              String err = String.format("socket %d connects to server %s " +
-                "error: %s",
-                index, servers.get(index).toString(), e.toString());
-              LOGGER.error(err);
-            }
-          }
-
-          if (selKey.isValid() && selKey.isWritable()) {
-            if (sendBuf[index].hasRemaining()) {
-              // if this socket throws an exception, print error msg and
-              // skip it.
-              try {
-                SocketChannel sChannel = (SocketChannel)selKey.channel();
-                sChannel.write(sendBuf[index]);
-              } catch (Exception e) {
-                String err = String.format("socket %d writes to server %s " +
-                  "error: %s",
-                  index, servers.get(index).toString(), e.toString());
-                LOGGER.error(err);
-              }
-            }
-          }
-
-          if (selKey.isValid() && selKey.isReadable()) {
-            // if this socket throws an exception, print error msg and
-            // skip it.
-            try {
-              SocketChannel sChannel = (SocketChannel)selKey.channel();
-              int bytesRead = sChannel.read(recvBuf[index]);
-
-              if (bytesRead > 0) {
-                numBytesRead[index] += bytesRead;
-
-                if (!hasReadFrameSize[index] &&
-                    recvBuf[index].remaining()==0) {
-                  // if the frame size has been read completely, then prepare
-                  // to read the actual frame.
-                  frameSize[index] = recvBuf[index].getInt(0);
-
-                  if (frameSize[index] <= 0) {
-                    stats.incNumInvalidFrameSize();
-                    String err = String.format("Read an invalid frame size %d"
-                      + " from %s. Does the server use TFramedTransport? ",
-                      frameSize[index], servers.get(index).toString());
-                    LOGGER.error(err);
-                    sChannel.close();
-                    continue;
-                  }
-
-                  if (frameSize[index] + 4 > stats.getMaxResponseBytes()) {
-                    stats.setMaxResponseBytes(frameSize[index]+4);
-                  }
-
-                  if (frameSize[index] + 4 > maxRecvBufBytesPerServer) {
-                    stats.incNumOverflowedRecvBuf();
-                    String err = String.format("Read frame size %d from %s,"
-                      + " total buffer size would exceed limit %d",
-                      frameSize[index], servers.get(index).toString(),
-                      maxRecvBufBytesPerServer);
-                    LOGGER.error(err);                      
-                    sChannel.close();
-                    continue;
-                  }
-
-                  // reallocate buffer for actual frame data
-                  recvBuf[index] = ByteBuffer.allocate(frameSize[index] + 4);
-                  recvBuf[index].putInt(frameSize[index]);
-
-                  stats.incTotalRecvBufBytes(frameSize[index]);
-                  hasReadFrameSize[index] = true;
-                }
-
-                if (hasReadFrameSize[index] &&
-                  numBytesRead[index] >= frameSize[index]+4) {
-                  // has read all data
-                  sChannel.close();
-                  stats.incNumReadCompletedServers();
-                  long t2 = System.currentTimeMillis();
-                  stats.setReadTime(t2-t1);
-                }
-              }
-            } catch (Exception e) {
-              String err = String.format("socket %d reads from server %s " +
-                "error: %s",
-                index, servers.get(index).toString(), e.toString());
-              LOGGER.error(err);
-            }
-          }
-        }
-      }
-    }
-
-    /**
-     * dispose any resource allocated
-     */
-    public void close() {
-      try {
-        if (selector.isOpen()) {
-          Iterator<SelectionKey> it = selector.keys().iterator();
-          while (it.hasNext()) {
-            SelectionKey selKey = it.next();
-            SocketChannel sChannel = (SocketChannel)selKey.channel();
-            sChannel.close();
-          }
-
-          selector.close();
-        }
-      } catch (IOException e) {
-        LOGGER.error("free resource error: "+e.toString());
-      }
-    }
-  }
-}
\ No newline at end of file