]> git.proxmox.com Git - ceph.git/blob - ceph/src/jaegertracing/thrift/lib/java/src/org/apache/thrift/transport/TFileTransport.java
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / java / src / org / apache / thrift / transport / TFileTransport.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.thrift.transport;
21
22 import java.io.BufferedInputStream;
23 import java.io.BufferedOutputStream;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.io.IOException;
27 import java.util.Random;
28
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 /**
33 * FileTransport implementation of the TTransport interface.
34 * Currently this is a straightforward port of the cpp implementation
35 *
36 * It may make better sense to provide a basic stream access on top of the framed file format
37 * The FileTransport can then be a user of this framed file format with some additional logic
38 * for chunking.
39 */
40 public class TFileTransport extends TTransport {
41
42 private static final Logger LOGGER = LoggerFactory.getLogger(TFileTransport.class.getName());
43
44 public static class TruncableBufferedInputStream extends BufferedInputStream {
45 public void trunc() {
46 pos = count = 0;
47 }
48 public TruncableBufferedInputStream(InputStream in) {
49 super(in);
50 }
51 public TruncableBufferedInputStream(InputStream in, int size) {
52 super(in, size);
53 }
54 }
55
56
57 public static class Event {
58 private byte[] buf_;
59 private int nread_;
60 private int navailable_;
61
62 /**
63 * Initialize an event. Initially, it has no valid contents
64 *
65 * @param buf byte array buffer to store event
66 */
67 public Event(byte[] buf) {
68 buf_ = buf;
69 nread_ = navailable_ = 0;
70 }
71
72 public byte[] getBuf() { return buf_;}
73 public int getSize() { return buf_.length; }
74
75
76 public void setAvailable(int sz) { nread_ = 0; navailable_=sz;}
77 public int getRemaining() { return (navailable_ - nread_); }
78
79 public int emit(byte[] buf, int offset, int ndesired) {
80 if((ndesired == 0) || (ndesired > getRemaining()))
81 ndesired = getRemaining();
82
83 if(ndesired <= 0)
84 return (ndesired);
85
86 System.arraycopy(buf_, nread_, buf, offset, ndesired);
87 nread_ += ndesired;
88
89 return(ndesired);
90 }
91 };
92
93 public static class ChunkState {
94 /**
95 * Chunk Size. Must be same across all implementations
96 */
97 public static final int DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
98
99 private int chunk_size_ = DEFAULT_CHUNK_SIZE;
100 private long offset_ = 0;
101
102 public ChunkState() {}
103 public ChunkState(int chunk_size) { chunk_size_ = chunk_size; }
104
105 public void skip(int size) {offset_ += size; }
106 public void seek(long offset) {offset_ = offset;}
107
108 public int getChunkSize() { return chunk_size_;}
109 public int getChunkNum() { return ((int)(offset_/chunk_size_));}
110 public int getRemaining() { return (chunk_size_ - ((int)(offset_ % chunk_size_)));}
111 public long getOffset() { return (offset_);}
112 }
113
114 public static enum TailPolicy {
115
116 NOWAIT(0, 0),
117 WAIT_FOREVER(500, -1);
118
119 /**
120 * Time in milliseconds to sleep before next read
121 * If 0, no sleep
122 */
123 public final int timeout_;
124
125 /**
126 * Number of retries before giving up
127 * if 0, no retries
128 * if -1, retry forever
129 */
130 public final int retries_;
131
132 /**
133 * ctor for policy
134 *
135 * @param timeout sleep time for this particular policy
136 * @param retries number of retries
137 */
138
139 TailPolicy(int timeout, int retries) {
140 timeout_ = timeout;
141 retries_ = retries;
142 }
143 }
144
145 /**
146 * Current tailing policy
147 */
148 TailPolicy currentPolicy_ = TailPolicy.NOWAIT;
149
150
151 /**
152 * Underlying file being read
153 */
154 protected TSeekableFile inputFile_ = null;
155
156 /**
157 * Underlying outputStream
158 */
159 protected OutputStream outputStream_ = null;
160
161
162 /**
163 * Event currently read in
164 */
165 Event currentEvent_ = null;
166
167 /**
168 * InputStream currently being used for reading
169 */
170 InputStream inputStream_ = null;
171
172 /**
173 * current Chunk state
174 */
175 ChunkState cs = null;
176
177 /**
178 * is read only?
179 */
180 private boolean readOnly_ = false;
181
182 /**
183 * Get File Tailing Policy
184 *
185 * @return current read policy
186 */
187 public TailPolicy getTailPolicy() {
188 return (currentPolicy_);
189 }
190
191 /**
192 * Set file Tailing Policy
193 *
194 * @param policy New policy to set
195 * @return Old policy
196 */
197 public TailPolicy setTailPolicy(TailPolicy policy) {
198 TailPolicy old = currentPolicy_;
199 currentPolicy_ = policy;
200 return (old);
201 }
202
203
204 /**
205 * Initialize read input stream
206 *
207 * @return input stream to read from file
208 */
209 private InputStream createInputStream() throws TTransportException {
210 InputStream is;
211 try {
212 if(inputStream_ != null) {
213 ((TruncableBufferedInputStream)inputStream_).trunc();
214 is = inputStream_;
215 } else {
216 is = new TruncableBufferedInputStream(inputFile_.getInputStream());
217 }
218 } catch (IOException iox) {
219 throw new TTransportException(iox.getMessage(), iox);
220 }
221 return(is);
222 }
223
224 /**
225 * Read (potentially tailing) an input stream
226 *
227 * @param is InputStream to read from
228 * @param buf Buffer to read into
229 * @param off Offset in buffer to read into
230 * @param len Number of bytes to read
231 * @param tp policy to use if we hit EOF
232 *
233 * @return number of bytes read
234 */
235 private int tailRead(InputStream is, byte[] buf,
236 int off, int len, TailPolicy tp) throws TTransportException {
237 int orig_len = len;
238 try {
239 int retries = 0;
240 while(len > 0) {
241 int cnt = is.read(buf, off, len);
242 if(cnt > 0) {
243 off += cnt;
244 len -= cnt;
245 retries = 0;
246 cs.skip(cnt); // remember that we read so many bytes
247 } else if (cnt == -1) {
248 // EOF
249 retries++;
250
251 if((tp.retries_ != -1) && tp.retries_ < retries)
252 return (orig_len - len);
253
254 if(tp.timeout_ > 0) {
255 try {Thread.sleep(tp.timeout_);} catch(InterruptedException e) {}
256 }
257 } else {
258 // either non-zero or -1 is what the contract says!
259 throw new
260 TTransportException("Unexpected return from InputStream.read = "
261 + cnt);
262 }
263 }
264 } catch (IOException iox) {
265 throw new TTransportException(iox.getMessage(), iox);
266 }
267
268 return(orig_len - len);
269 }
270
271 /**
272 * Event is corrupted. Do recovery
273 *
274 * @return true if recovery could be performed and we can read more data
275 * false is returned only when nothing more can be read
276 */
277 private boolean performRecovery() throws TTransportException {
278 int numChunks = getNumChunks();
279 int curChunk = cs.getChunkNum();
280
281 if(curChunk >= (numChunks-1)) {
282 return false;
283 }
284 seekToChunk(curChunk+1);
285 return true;
286 }
287
288 /**
289 * Read event from underlying file
290 *
291 * @return true if event could be read, false otherwise (on EOF)
292 */
293 private boolean readEvent() throws TTransportException {
294 byte[] ebytes = new byte[4];
295 int esize;
296 int nread;
297 int nrequested;
298
299 retry:
300 do {
301 // corner case. read to end of chunk
302 nrequested = cs.getRemaining();
303 if(nrequested < 4) {
304 nread = tailRead(inputStream_, ebytes, 0, nrequested, currentPolicy_);
305 if(nread != nrequested) {
306 return(false);
307 }
308 }
309
310 // assuming serialized on little endian machine
311 nread = tailRead(inputStream_, ebytes, 0, 4, currentPolicy_);
312 if(nread != 4) {
313 return(false);
314 }
315
316 esize=0;
317 for(int i=3; i>=0; i--) {
318 int val = (0x000000ff & (int)ebytes[i]);
319 esize |= (val << (i*8));
320 }
321
322 // check if event is corrupted and do recovery as required
323 if(esize > cs.getRemaining()) {
324 throw new TTransportException("FileTransport error: bad event size");
325 /*
326 if(performRecovery()) {
327 esize=0;
328 } else {
329 return false;
330 }
331 */
332 }
333 } while (esize == 0);
334
335 // reset existing event or get a larger one
336 if(currentEvent_.getSize() < esize)
337 currentEvent_ = new Event(new byte [esize]);
338
339 // populate the event
340 byte[] buf = currentEvent_.getBuf();
341 nread = tailRead(inputStream_, buf, 0, esize, currentPolicy_);
342 if(nread != esize) {
343 return(false);
344 }
345 currentEvent_.setAvailable(esize);
346 return(true);
347 }
348
349 /**
350 * open if both input/output open unless readonly
351 *
352 * @return true
353 */
354 public boolean isOpen() {
355 return ((inputStream_ != null) && (readOnly_ || (outputStream_ != null)));
356 }
357
358
359 /**
360 * Diverging from the cpp model and sticking to the TSocket model
361 * Files are not opened in ctor - but in explicit open call
362 */
363 public void open() throws TTransportException {
364 if (isOpen())
365 throw new TTransportException(TTransportException.ALREADY_OPEN);
366
367 try {
368 inputStream_ = createInputStream();
369 cs = new ChunkState();
370 currentEvent_ = new Event(new byte [256]);
371
372 if(!readOnly_)
373 outputStream_ = new BufferedOutputStream(inputFile_.getOutputStream());
374 } catch (IOException iox) {
375 throw new TTransportException(TTransportException.NOT_OPEN, iox);
376 }
377 }
378
379 /**
380 * Closes the transport.
381 */
382 public void close() {
383 if (inputFile_ != null) {
384 try {
385 inputFile_.close();
386 } catch (IOException iox) {
387 LOGGER.warn("WARNING: Error closing input file: " +
388 iox.getMessage());
389 }
390 inputFile_ = null;
391 }
392 if (outputStream_ != null) {
393 try {
394 outputStream_.close();
395 } catch (IOException iox) {
396 LOGGER.warn("WARNING: Error closing output stream: " +
397 iox.getMessage());
398 }
399 outputStream_ = null;
400 }
401 }
402
403
404 /**
405 * File Transport ctor
406 *
407 * @param path File path to read and write from
408 * @param readOnly Whether this is a read-only transport
409 */
410 public TFileTransport(final String path, boolean readOnly) throws IOException {
411 inputFile_ = new TStandardFile(path);
412 readOnly_ = readOnly;
413 }
414
415 /**
416 * File Transport ctor
417 *
418 * @param inputFile open TSeekableFile to read/write from
419 * @param readOnly Whether this is a read-only transport
420 */
421 public TFileTransport(TSeekableFile inputFile, boolean readOnly) {
422 inputFile_ = inputFile;
423 readOnly_ = readOnly;
424 }
425
426
427 /**
428 * Cloned from TTransport.java:readAll(). Only difference is throwing an EOF exception
429 * where one is detected.
430 */
431 public int readAll(byte[] buf, int off, int len)
432 throws TTransportException {
433 int got = 0;
434 int ret = 0;
435 while (got < len) {
436 ret = read(buf, off+got, len-got);
437 if (ret < 0) {
438 throw new TTransportException("Error in reading from file");
439 }
440 if(ret == 0) {
441 throw new TTransportException(TTransportException.END_OF_FILE,
442 "End of File reached");
443 }
444 got += ret;
445 }
446 return got;
447 }
448
449
450 /**
451 * Reads up to len bytes into buffer buf, starting at offset off.
452 *
453 * @param buf Array to read into
454 * @param off Index to start reading at
455 * @param len Maximum number of bytes to read
456 * @return The number of bytes actually read
457 * @throws TTransportException if there was an error reading data
458 */
459 public int read(byte[] buf, int off, int len) throws TTransportException {
460 if(!isOpen())
461 throw new TTransportException(TTransportException.NOT_OPEN,
462 "Must open before reading");
463
464 if(currentEvent_.getRemaining() == 0) {
465 if(!readEvent())
466 return(0);
467 }
468
469 int nread = currentEvent_.emit(buf, off, len);
470 return nread;
471 }
472
473 public int getNumChunks() throws TTransportException {
474 if(!isOpen())
475 throw new TTransportException(TTransportException.NOT_OPEN,
476 "Must open before getNumChunks");
477 try {
478 long len = inputFile_.length();
479 if(len == 0)
480 return 0;
481 else
482 return (((int)(len/cs.getChunkSize())) + 1);
483
484 } catch (IOException iox) {
485 throw new TTransportException(iox.getMessage(), iox);
486 }
487 }
488
489 public int getCurChunk() throws TTransportException {
490 if(!isOpen())
491 throw new TTransportException(TTransportException.NOT_OPEN,
492 "Must open before getCurChunk");
493 return (cs.getChunkNum());
494
495 }
496
497
498 public void seekToChunk(int chunk) throws TTransportException {
499 if(!isOpen())
500 throw new TTransportException(TTransportException.NOT_OPEN,
501 "Must open before seeking");
502
503 int numChunks = getNumChunks();
504
505 // file is empty, seeking to chunk is pointless
506 if (numChunks == 0) {
507 return;
508 }
509
510 // negative indicates reverse seek (from the end)
511 if (chunk < 0) {
512 chunk += numChunks;
513 }
514
515 // too large a value for reverse seek, just seek to beginning
516 if (chunk < 0) {
517 chunk = 0;
518 }
519
520 long eofOffset=0;
521 boolean seekToEnd = (chunk >= numChunks);
522 if(seekToEnd) {
523 chunk = chunk - 1;
524 try { eofOffset = inputFile_.length(); }
525 catch (IOException iox) {throw new TTransportException(iox.getMessage(),
526 iox);}
527 }
528
529 if(chunk*cs.getChunkSize() != cs.getOffset()) {
530 try { inputFile_.seek((long)chunk*cs.getChunkSize()); }
531 catch (IOException iox) {
532 throw new TTransportException("Seek to chunk " +
533 chunk + " " +iox.getMessage(), iox);
534 }
535
536 cs.seek((long)chunk*cs.getChunkSize());
537 currentEvent_.setAvailable(0);
538 inputStream_ = createInputStream();
539 }
540
541 if(seekToEnd) {
542 // waiting forever here - otherwise we can hit EOF and end up
543 // having consumed partial data from the data stream.
544 TailPolicy old = setTailPolicy(TailPolicy.WAIT_FOREVER);
545 while(cs.getOffset() < eofOffset) { readEvent(); }
546 currentEvent_.setAvailable(0);
547 setTailPolicy(old);
548 }
549 }
550
551 public void seekToEnd() throws TTransportException {
552 if(!isOpen())
553 throw new TTransportException(TTransportException.NOT_OPEN,
554 "Must open before seeking");
555 seekToChunk(getNumChunks());
556 }
557
558
559 /**
560 * Writes up to len bytes from the buffer.
561 *
562 * @param buf The output data buffer
563 * @param off The offset to start writing from
564 * @param len The number of bytes to write
565 * @throws TTransportException if there was an error writing data
566 */
567 public void write(byte[] buf, int off, int len) throws TTransportException {
568 throw new TTransportException("Not Supported");
569 }
570
571 /**
572 * Flush any pending data out of a transport buffer.
573 *
574 * @throws TTransportException if there was an error writing out data.
575 */
576 public void flush() throws TTransportException {
577 throw new TTransportException("Not Supported");
578 }
579
580 /**
581 * test program
582 *
583 */
584 public static void main(String[] args) throws Exception {
585
586 int num_chunks = 10;
587
588 if((args.length < 1) || args[0].equals("--help")
589 || args[0].equals("-h") || args[0].equals("-?")) {
590 printUsage();
591 }
592
593 if(args.length > 1) {
594 try {
595 num_chunks = Integer.parseInt(args[1]);
596 } catch (Exception e) {
597 LOGGER.error("Cannot parse " + args[1]);
598 printUsage();
599 }
600 }
601
602 TFileTransport t = new TFileTransport(args[0], true);
603 t.open();
604 LOGGER.info("NumChunks="+t.getNumChunks());
605
606 Random r = new Random();
607 for(int j=0; j<num_chunks; j++) {
608 byte[] buf = new byte[4096];
609 int cnum = r.nextInt(t.getNumChunks()-1);
610 LOGGER.info("Reading chunk "+cnum);
611 t.seekToChunk(cnum);
612 for(int i=0; i<4096; i++) {
613 t.read(buf, 0, 4096);
614 }
615 }
616 }
617
618 private static void printUsage() {
619 LOGGER.error("Usage: TFileTransport <filename> [num_chunks]");
620 LOGGER.error(" (Opens and reads num_chunks chunks from file randomly)");
621 System.exit(1);
622 }
623
624 }