2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 package org
.apache
.thrift
.transport
;
22 import java
.io
.BufferedInputStream
;
23 import java
.io
.BufferedOutputStream
;
24 import java
.io
.InputStream
;
25 import java
.io
.OutputStream
;
26 import java
.io
.IOException
;
27 import java
.util
.Random
;
29 import org
.slf4j
.Logger
;
30 import org
.slf4j
.LoggerFactory
;
33 * FileTransport implementation of the TTransport interface.
34 * Currently this is a straightforward port of the cpp implementation
36 * It may make better sense to provide a basic stream access on top of the framed file format
37 * The FileTransport can then be a user of this framed file format with some additional logic
40 public class TFileTransport
extends TTransport
{
42 private static final Logger LOGGER
= LoggerFactory
.getLogger(TFileTransport
.class.getName());
44 public static class TruncableBufferedInputStream
extends BufferedInputStream
{
48 public TruncableBufferedInputStream(InputStream in
) {
51 public TruncableBufferedInputStream(InputStream in
, int size
) {
57 public static class Event
{
60 private int navailable_
;
63 * Initialize an event. Initially, it has no valid contents
65 * @param buf byte array buffer to store event
67 public Event(byte[] buf
) {
69 nread_
= navailable_
= 0;
72 public byte[] getBuf() { return buf_
;}
73 public int getSize() { return buf_
.length
; }
76 public void setAvailable(int sz
) { nread_
= 0; navailable_
=sz
;}
77 public int getRemaining() { return (navailable_
- nread_
); }
79 public int emit(byte[] buf
, int offset
, int ndesired
) {
80 if((ndesired
== 0) || (ndesired
> getRemaining()))
81 ndesired
= getRemaining();
86 System
.arraycopy(buf_
, nread_
, buf
, offset
, ndesired
);
93 public static class ChunkState
{
95 * Chunk Size. Must be same across all implementations
97 public static final int DEFAULT_CHUNK_SIZE
= 16 * 1024 * 1024;
99 private int chunk_size_
= DEFAULT_CHUNK_SIZE
;
100 private long offset_
= 0;
102 public ChunkState() {}
103 public ChunkState(int chunk_size
) { chunk_size_
= chunk_size
; }
105 public void skip(int size
) {offset_
+= size
; }
106 public void seek(long offset
) {offset_
= offset
;}
108 public int getChunkSize() { return chunk_size_
;}
109 public int getChunkNum() { return ((int)(offset_
/chunk_size_
));}
110 public int getRemaining() { return (chunk_size_
- ((int)(offset_
% chunk_size_
)));}
111 public long getOffset() { return (offset_
);}
114 public static enum TailPolicy
{
117 WAIT_FOREVER(500, -1);
120 * Time in milliseconds to sleep before next read
123 public final int timeout_
;
126 * Number of retries before giving up
128 * if -1, retry forever
130 public final int retries_
;
135 * @param timeout sleep time for this particular policy
136 * @param retries number of retries
139 TailPolicy(int timeout
, int retries
) {
146 * Current tailing policy
148 TailPolicy currentPolicy_
= TailPolicy
.NOWAIT
;
152 * Underlying file being read
154 protected TSeekableFile inputFile_
= null;
157 * Underlying outputStream
159 protected OutputStream outputStream_
= null;
163 * Event currently read in
165 Event currentEvent_
= null;
168 * InputStream currently being used for reading
170 InputStream inputStream_
= null;
173 * current Chunk state
175 ChunkState cs
= null;
180 private boolean readOnly_
= false;
183 * Get File Tailing Policy
185 * @return current read policy
187 public TailPolicy
getTailPolicy() {
188 return (currentPolicy_
);
192 * Set file Tailing Policy
194 * @param policy New policy to set
197 public TailPolicy
setTailPolicy(TailPolicy policy
) {
198 TailPolicy old
= currentPolicy_
;
199 currentPolicy_
= policy
;
205 * Initialize read input stream
207 * @return input stream to read from file
209 private InputStream
createInputStream() throws TTransportException
{
212 if(inputStream_
!= null) {
213 ((TruncableBufferedInputStream
)inputStream_
).trunc();
216 is
= new TruncableBufferedInputStream(inputFile_
.getInputStream());
218 } catch (IOException iox
) {
219 throw new TTransportException(iox
.getMessage(), iox
);
225 * Read (potentially tailing) an input stream
227 * @param is InputStream to read from
228 * @param buf Buffer to read into
229 * @param off Offset in buffer to read into
230 * @param len Number of bytes to read
231 * @param tp policy to use if we hit EOF
233 * @return number of bytes read
235 private int tailRead(InputStream is
, byte[] buf
,
236 int off
, int len
, TailPolicy tp
) throws TTransportException
{
241 int cnt
= is
.read(buf
, off
, len
);
246 cs
.skip(cnt
); // remember that we read so many bytes
247 } else if (cnt
== -1) {
251 if((tp
.retries_
!= -1) && tp
.retries_
< retries
)
252 return (orig_len
- len
);
254 if(tp
.timeout_
> 0) {
255 try {Thread
.sleep(tp
.timeout_
);} catch(InterruptedException e
) {}
258 // either non-zero or -1 is what the contract says!
260 TTransportException("Unexpected return from InputStream.read = "
264 } catch (IOException iox
) {
265 throw new TTransportException(iox
.getMessage(), iox
);
268 return(orig_len
- len
);
272 * Event is corrupted. Do recovery
274 * @return true if recovery could be performed and we can read more data
275 * false is returned only when nothing more can be read
277 private boolean performRecovery() throws TTransportException
{
278 int numChunks
= getNumChunks();
279 int curChunk
= cs
.getChunkNum();
281 if(curChunk
>= (numChunks
-1)) {
284 seekToChunk(curChunk
+1);
289 * Read event from underlying file
291 * @return true if event could be read, false otherwise (on EOF)
293 private boolean readEvent() throws TTransportException
{
294 byte[] ebytes
= new byte[4];
301 // corner case. read to end of chunk
302 nrequested
= cs
.getRemaining();
304 nread
= tailRead(inputStream_
, ebytes
, 0, nrequested
, currentPolicy_
);
305 if(nread
!= nrequested
) {
310 // assuming serialized on little endian machine
311 nread
= tailRead(inputStream_
, ebytes
, 0, 4, currentPolicy_
);
317 for(int i
=3; i
>=0; i
--) {
318 int val
= (0x000000ff & (int)ebytes
[i
]);
319 esize
|= (val
<< (i
*8));
322 // check if event is corrupted and do recovery as required
323 if(esize
> cs
.getRemaining()) {
324 throw new TTransportException("FileTransport error: bad event size");
326 if(performRecovery()) {
333 } while (esize
== 0);
335 // reset existing event or get a larger one
336 if(currentEvent_
.getSize() < esize
)
337 currentEvent_
= new Event(new byte [esize
]);
339 // populate the event
340 byte[] buf
= currentEvent_
.getBuf();
341 nread
= tailRead(inputStream_
, buf
, 0, esize
, currentPolicy_
);
345 currentEvent_
.setAvailable(esize
);
350 * open if both input/output open unless readonly
354 public boolean isOpen() {
355 return ((inputStream_
!= null) && (readOnly_
|| (outputStream_
!= null)));
360 * Diverging from the cpp model and sticking to the TSocket model
361 * Files are not opened in ctor - but in explicit open call
363 public void open() throws TTransportException
{
365 throw new TTransportException(TTransportException
.ALREADY_OPEN
);
368 inputStream_
= createInputStream();
369 cs
= new ChunkState();
370 currentEvent_
= new Event(new byte [256]);
373 outputStream_
= new BufferedOutputStream(inputFile_
.getOutputStream());
374 } catch (IOException iox
) {
375 throw new TTransportException(TTransportException
.NOT_OPEN
, iox
);
380 * Closes the transport.
382 public void close() {
383 if (inputFile_
!= null) {
386 } catch (IOException iox
) {
387 LOGGER
.warn("WARNING: Error closing input file: " +
392 if (outputStream_
!= null) {
394 outputStream_
.close();
395 } catch (IOException iox
) {
396 LOGGER
.warn("WARNING: Error closing output stream: " +
399 outputStream_
= null;
405 * File Transport ctor
407 * @param path File path to read and write from
408 * @param readOnly Whether this is a read-only transport
410 public TFileTransport(final String path
, boolean readOnly
) throws IOException
{
411 inputFile_
= new TStandardFile(path
);
412 readOnly_
= readOnly
;
416 * File Transport ctor
418 * @param inputFile open TSeekableFile to read/write from
419 * @param readOnly Whether this is a read-only transport
421 public TFileTransport(TSeekableFile inputFile
, boolean readOnly
) {
422 inputFile_
= inputFile
;
423 readOnly_
= readOnly
;
428 * Cloned from TTransport.java:readAll(). Only difference is throwing an EOF exception
429 * where one is detected.
431 public int readAll(byte[] buf
, int off
, int len
)
432 throws TTransportException
{
436 ret
= read(buf
, off
+got
, len
-got
);
438 throw new TTransportException("Error in reading from file");
441 throw new TTransportException(TTransportException
.END_OF_FILE
,
442 "End of File reached");
451 * Reads up to len bytes into buffer buf, starting at offset off.
453 * @param buf Array to read into
454 * @param off Index to start reading at
455 * @param len Maximum number of bytes to read
456 * @return The number of bytes actually read
457 * @throws TTransportException if there was an error reading data
459 public int read(byte[] buf
, int off
, int len
) throws TTransportException
{
461 throw new TTransportException(TTransportException
.NOT_OPEN
,
462 "Must open before reading");
464 if(currentEvent_
.getRemaining() == 0) {
469 int nread
= currentEvent_
.emit(buf
, off
, len
);
473 public int getNumChunks() throws TTransportException
{
475 throw new TTransportException(TTransportException
.NOT_OPEN
,
476 "Must open before getNumChunks");
478 long len
= inputFile_
.length();
482 return (((int)(len
/cs
.getChunkSize())) + 1);
484 } catch (IOException iox
) {
485 throw new TTransportException(iox
.getMessage(), iox
);
489 public int getCurChunk() throws TTransportException
{
491 throw new TTransportException(TTransportException
.NOT_OPEN
,
492 "Must open before getCurChunk");
493 return (cs
.getChunkNum());
498 public void seekToChunk(int chunk
) throws TTransportException
{
500 throw new TTransportException(TTransportException
.NOT_OPEN
,
501 "Must open before seeking");
503 int numChunks
= getNumChunks();
505 // file is empty, seeking to chunk is pointless
506 if (numChunks
== 0) {
510 // negative indicates reverse seek (from the end)
515 // too large a value for reverse seek, just seek to beginning
521 boolean seekToEnd
= (chunk
>= numChunks
);
524 try { eofOffset
= inputFile_
.length(); }
525 catch (IOException iox
) {throw new TTransportException(iox
.getMessage(),
529 if(chunk
*cs
.getChunkSize() != cs
.getOffset()) {
530 try { inputFile_
.seek((long)chunk
*cs
.getChunkSize()); }
531 catch (IOException iox
) {
532 throw new TTransportException("Seek to chunk " +
533 chunk
+ " " +iox
.getMessage(), iox
);
536 cs
.seek((long)chunk
*cs
.getChunkSize());
537 currentEvent_
.setAvailable(0);
538 inputStream_
= createInputStream();
542 // waiting forever here - otherwise we can hit EOF and end up
543 // having consumed partial data from the data stream.
544 TailPolicy old
= setTailPolicy(TailPolicy
.WAIT_FOREVER
);
545 while(cs
.getOffset() < eofOffset
) { readEvent(); }
546 currentEvent_
.setAvailable(0);
551 public void seekToEnd() throws TTransportException
{
553 throw new TTransportException(TTransportException
.NOT_OPEN
,
554 "Must open before seeking");
555 seekToChunk(getNumChunks());
560 * Writes up to len bytes from the buffer.
562 * @param buf The output data buffer
563 * @param off The offset to start writing from
564 * @param len The number of bytes to write
565 * @throws TTransportException if there was an error writing data
567 public void write(byte[] buf
, int off
, int len
) throws TTransportException
{
568 throw new TTransportException("Not Supported");
572 * Flush any pending data out of a transport buffer.
574 * @throws TTransportException if there was an error writing out data.
576 public void flush() throws TTransportException
{
577 throw new TTransportException("Not Supported");
584 public static void main(String
[] args
) throws Exception
{
588 if((args
.length
< 1) || args
[0].equals("--help")
589 || args
[0].equals("-h") || args
[0].equals("-?")) {
593 if(args
.length
> 1) {
595 num_chunks
= Integer
.parseInt(args
[1]);
596 } catch (Exception e
) {
597 LOGGER
.error("Cannot parse " + args
[1]);
602 TFileTransport t
= new TFileTransport(args
[0], true);
604 LOGGER
.info("NumChunks="+t
.getNumChunks());
606 Random r
= new Random();
607 for(int j
=0; j
<num_chunks
; j
++) {
608 byte[] buf
= new byte[4096];
609 int cnum
= r
.nextInt(t
.getNumChunks()-1);
610 LOGGER
.info("Reading chunk "+cnum
);
612 for(int i
=0; i
<4096; i
++) {
613 t
.read(buf
, 0, 4096);
618 private static void printUsage() {
619 LOGGER
.error("Usage: TFileTransport <filename> [num_chunks]");
620 LOGGER
.error(" (Opens and reads num_chunks chunks from file randomly)");