]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | package org.apache.thrift.transport; | |
21 | ||
22 | import java.io.BufferedInputStream; | |
23 | import java.io.BufferedOutputStream; | |
24 | import java.io.InputStream; | |
25 | import java.io.OutputStream; | |
26 | import java.io.IOException; | |
27 | import java.util.Random; | |
28 | ||
29 | import org.slf4j.Logger; | |
30 | import org.slf4j.LoggerFactory; | |
31 | ||
32 | /** | |
33 | * FileTransport implementation of the TTransport interface. | |
34 | * Currently this is a straightforward port of the cpp implementation | |
35 | * | |
36 | * It may make better sense to provide a basic stream access on top of the framed file format | |
37 | * The FileTransport can then be a user of this framed file format with some additional logic | |
38 | * for chunking. | |
39 | */ | |
40 | public class TFileTransport extends TTransport { | |
41 | ||
42 | private static final Logger LOGGER = LoggerFactory.getLogger(TFileTransport.class.getName()); | |
43 | ||
44 | public static class TruncableBufferedInputStream extends BufferedInputStream { | |
45 | public void trunc() { | |
46 | pos = count = 0; | |
47 | } | |
48 | public TruncableBufferedInputStream(InputStream in) { | |
49 | super(in); | |
50 | } | |
51 | public TruncableBufferedInputStream(InputStream in, int size) { | |
52 | super(in, size); | |
53 | } | |
54 | } | |
55 | ||
56 | ||
57 | public static class Event { | |
58 | private byte[] buf_; | |
59 | private int nread_; | |
60 | private int navailable_; | |
61 | ||
62 | /** | |
63 | * Initialize an event. Initially, it has no valid contents | |
64 | * | |
65 | * @param buf byte array buffer to store event | |
66 | */ | |
67 | public Event(byte[] buf) { | |
68 | buf_ = buf; | |
69 | nread_ = navailable_ = 0; | |
70 | } | |
71 | ||
72 | public byte[] getBuf() { return buf_;} | |
73 | public int getSize() { return buf_.length; } | |
74 | ||
75 | ||
76 | public void setAvailable(int sz) { nread_ = 0; navailable_=sz;} | |
77 | public int getRemaining() { return (navailable_ - nread_); } | |
78 | ||
79 | public int emit(byte[] buf, int offset, int ndesired) { | |
80 | if((ndesired == 0) || (ndesired > getRemaining())) | |
81 | ndesired = getRemaining(); | |
82 | ||
83 | if(ndesired <= 0) | |
84 | return (ndesired); | |
85 | ||
86 | System.arraycopy(buf_, nread_, buf, offset, ndesired); | |
87 | nread_ += ndesired; | |
88 | ||
89 | return(ndesired); | |
90 | } | |
91 | }; | |
92 | ||
93 | public static class ChunkState { | |
94 | /** | |
95 | * Chunk Size. Must be same across all implementations | |
96 | */ | |
97 | public static final int DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024; | |
98 | ||
99 | private int chunk_size_ = DEFAULT_CHUNK_SIZE; | |
100 | private long offset_ = 0; | |
101 | ||
102 | public ChunkState() {} | |
103 | public ChunkState(int chunk_size) { chunk_size_ = chunk_size; } | |
104 | ||
105 | public void skip(int size) {offset_ += size; } | |
106 | public void seek(long offset) {offset_ = offset;} | |
107 | ||
108 | public int getChunkSize() { return chunk_size_;} | |
109 | public int getChunkNum() { return ((int)(offset_/chunk_size_));} | |
110 | public int getRemaining() { return (chunk_size_ - ((int)(offset_ % chunk_size_)));} | |
111 | public long getOffset() { return (offset_);} | |
112 | } | |
113 | ||
114 | public static enum TailPolicy { | |
115 | ||
116 | NOWAIT(0, 0), | |
117 | WAIT_FOREVER(500, -1); | |
118 | ||
119 | /** | |
120 | * Time in milliseconds to sleep before next read | |
121 | * If 0, no sleep | |
122 | */ | |
123 | public final int timeout_; | |
124 | ||
125 | /** | |
126 | * Number of retries before giving up | |
127 | * if 0, no retries | |
128 | * if -1, retry forever | |
129 | */ | |
130 | public final int retries_; | |
131 | ||
132 | /** | |
133 | * ctor for policy | |
134 | * | |
135 | * @param timeout sleep time for this particular policy | |
136 | * @param retries number of retries | |
137 | */ | |
138 | ||
139 | TailPolicy(int timeout, int retries) { | |
140 | timeout_ = timeout; | |
141 | retries_ = retries; | |
142 | } | |
143 | } | |
144 | ||
145 | /** | |
146 | * Current tailing policy | |
147 | */ | |
148 | TailPolicy currentPolicy_ = TailPolicy.NOWAIT; | |
149 | ||
150 | ||
151 | /** | |
152 | * Underlying file being read | |
153 | */ | |
154 | protected TSeekableFile inputFile_ = null; | |
155 | ||
156 | /** | |
157 | * Underlying outputStream | |
158 | */ | |
159 | protected OutputStream outputStream_ = null; | |
160 | ||
161 | ||
162 | /** | |
163 | * Event currently read in | |
164 | */ | |
165 | Event currentEvent_ = null; | |
166 | ||
167 | /** | |
168 | * InputStream currently being used for reading | |
169 | */ | |
170 | InputStream inputStream_ = null; | |
171 | ||
172 | /** | |
173 | * current Chunk state | |
174 | */ | |
175 | ChunkState cs = null; | |
176 | ||
177 | /** | |
178 | * is read only? | |
179 | */ | |
180 | private boolean readOnly_ = false; | |
181 | ||
182 | /** | |
183 | * Get File Tailing Policy | |
184 | * | |
185 | * @return current read policy | |
186 | */ | |
187 | public TailPolicy getTailPolicy() { | |
188 | return (currentPolicy_); | |
189 | } | |
190 | ||
191 | /** | |
192 | * Set file Tailing Policy | |
193 | * | |
194 | * @param policy New policy to set | |
195 | * @return Old policy | |
196 | */ | |
197 | public TailPolicy setTailPolicy(TailPolicy policy) { | |
198 | TailPolicy old = currentPolicy_; | |
199 | currentPolicy_ = policy; | |
200 | return (old); | |
201 | } | |
202 | ||
203 | ||
204 | /** | |
205 | * Initialize read input stream | |
206 | * | |
207 | * @return input stream to read from file | |
208 | */ | |
209 | private InputStream createInputStream() throws TTransportException { | |
210 | InputStream is; | |
211 | try { | |
212 | if(inputStream_ != null) { | |
213 | ((TruncableBufferedInputStream)inputStream_).trunc(); | |
214 | is = inputStream_; | |
215 | } else { | |
216 | is = new TruncableBufferedInputStream(inputFile_.getInputStream()); | |
217 | } | |
218 | } catch (IOException iox) { | |
219 | throw new TTransportException(iox.getMessage(), iox); | |
220 | } | |
221 | return(is); | |
222 | } | |
223 | ||
224 | /** | |
225 | * Read (potentially tailing) an input stream | |
226 | * | |
227 | * @param is InputStream to read from | |
228 | * @param buf Buffer to read into | |
229 | * @param off Offset in buffer to read into | |
230 | * @param len Number of bytes to read | |
231 | * @param tp policy to use if we hit EOF | |
232 | * | |
233 | * @return number of bytes read | |
234 | */ | |
235 | private int tailRead(InputStream is, byte[] buf, | |
236 | int off, int len, TailPolicy tp) throws TTransportException { | |
237 | int orig_len = len; | |
238 | try { | |
239 | int retries = 0; | |
240 | while(len > 0) { | |
241 | int cnt = is.read(buf, off, len); | |
242 | if(cnt > 0) { | |
243 | off += cnt; | |
244 | len -= cnt; | |
245 | retries = 0; | |
246 | cs.skip(cnt); // remember that we read so many bytes | |
247 | } else if (cnt == -1) { | |
248 | // EOF | |
249 | retries++; | |
250 | ||
251 | if((tp.retries_ != -1) && tp.retries_ < retries) | |
252 | return (orig_len - len); | |
253 | ||
254 | if(tp.timeout_ > 0) { | |
255 | try {Thread.sleep(tp.timeout_);} catch(InterruptedException e) {} | |
256 | } | |
257 | } else { | |
258 | // either non-zero or -1 is what the contract says! | |
259 | throw new | |
260 | TTransportException("Unexpected return from InputStream.read = " | |
261 | + cnt); | |
262 | } | |
263 | } | |
264 | } catch (IOException iox) { | |
265 | throw new TTransportException(iox.getMessage(), iox); | |
266 | } | |
267 | ||
268 | return(orig_len - len); | |
269 | } | |
270 | ||
271 | /** | |
272 | * Event is corrupted. Do recovery | |
273 | * | |
274 | * @return true if recovery could be performed and we can read more data | |
275 | * false is returned only when nothing more can be read | |
276 | */ | |
277 | private boolean performRecovery() throws TTransportException { | |
278 | int numChunks = getNumChunks(); | |
279 | int curChunk = cs.getChunkNum(); | |
280 | ||
281 | if(curChunk >= (numChunks-1)) { | |
282 | return false; | |
283 | } | |
284 | seekToChunk(curChunk+1); | |
285 | return true; | |
286 | } | |
287 | ||
288 | /** | |
289 | * Read event from underlying file | |
290 | * | |
291 | * @return true if event could be read, false otherwise (on EOF) | |
292 | */ | |
293 | private boolean readEvent() throws TTransportException { | |
294 | byte[] ebytes = new byte[4]; | |
295 | int esize; | |
296 | int nread; | |
297 | int nrequested; | |
298 | ||
299 | retry: | |
300 | do { | |
301 | // corner case. read to end of chunk | |
302 | nrequested = cs.getRemaining(); | |
303 | if(nrequested < 4) { | |
304 | nread = tailRead(inputStream_, ebytes, 0, nrequested, currentPolicy_); | |
305 | if(nread != nrequested) { | |
306 | return(false); | |
307 | } | |
308 | } | |
309 | ||
310 | // assuming serialized on little endian machine | |
311 | nread = tailRead(inputStream_, ebytes, 0, 4, currentPolicy_); | |
312 | if(nread != 4) { | |
313 | return(false); | |
314 | } | |
315 | ||
316 | esize=0; | |
317 | for(int i=3; i>=0; i--) { | |
318 | int val = (0x000000ff & (int)ebytes[i]); | |
319 | esize |= (val << (i*8)); | |
320 | } | |
321 | ||
322 | // check if event is corrupted and do recovery as required | |
323 | if(esize > cs.getRemaining()) { | |
324 | throw new TTransportException("FileTransport error: bad event size"); | |
325 | /* | |
326 | if(performRecovery()) { | |
327 | esize=0; | |
328 | } else { | |
329 | return false; | |
330 | } | |
331 | */ | |
332 | } | |
333 | } while (esize == 0); | |
334 | ||
335 | // reset existing event or get a larger one | |
336 | if(currentEvent_.getSize() < esize) | |
337 | currentEvent_ = new Event(new byte [esize]); | |
338 | ||
339 | // populate the event | |
340 | byte[] buf = currentEvent_.getBuf(); | |
341 | nread = tailRead(inputStream_, buf, 0, esize, currentPolicy_); | |
342 | if(nread != esize) { | |
343 | return(false); | |
344 | } | |
345 | currentEvent_.setAvailable(esize); | |
346 | return(true); | |
347 | } | |
348 | ||
349 | /** | |
350 | * open if both input/output open unless readonly | |
351 | * | |
352 | * @return true | |
353 | */ | |
354 | public boolean isOpen() { | |
355 | return ((inputStream_ != null) && (readOnly_ || (outputStream_ != null))); | |
356 | } | |
357 | ||
358 | ||
359 | /** | |
360 | * Diverging from the cpp model and sticking to the TSocket model | |
361 | * Files are not opened in ctor - but in explicit open call | |
362 | */ | |
363 | public void open() throws TTransportException { | |
364 | if (isOpen()) | |
365 | throw new TTransportException(TTransportException.ALREADY_OPEN); | |
366 | ||
367 | try { | |
368 | inputStream_ = createInputStream(); | |
369 | cs = new ChunkState(); | |
370 | currentEvent_ = new Event(new byte [256]); | |
371 | ||
372 | if(!readOnly_) | |
373 | outputStream_ = new BufferedOutputStream(inputFile_.getOutputStream()); | |
374 | } catch (IOException iox) { | |
375 | throw new TTransportException(TTransportException.NOT_OPEN, iox); | |
376 | } | |
377 | } | |
378 | ||
379 | /** | |
380 | * Closes the transport. | |
381 | */ | |
382 | public void close() { | |
383 | if (inputFile_ != null) { | |
384 | try { | |
385 | inputFile_.close(); | |
386 | } catch (IOException iox) { | |
387 | LOGGER.warn("WARNING: Error closing input file: " + | |
388 | iox.getMessage()); | |
389 | } | |
390 | inputFile_ = null; | |
391 | } | |
392 | if (outputStream_ != null) { | |
393 | try { | |
394 | outputStream_.close(); | |
395 | } catch (IOException iox) { | |
396 | LOGGER.warn("WARNING: Error closing output stream: " + | |
397 | iox.getMessage()); | |
398 | } | |
399 | outputStream_ = null; | |
400 | } | |
401 | } | |
402 | ||
403 | ||
404 | /** | |
405 | * File Transport ctor | |
406 | * | |
407 | * @param path File path to read and write from | |
408 | * @param readOnly Whether this is a read-only transport | |
409 | */ | |
410 | public TFileTransport(final String path, boolean readOnly) throws IOException { | |
411 | inputFile_ = new TStandardFile(path); | |
412 | readOnly_ = readOnly; | |
413 | } | |
414 | ||
415 | /** | |
416 | * File Transport ctor | |
417 | * | |
418 | * @param inputFile open TSeekableFile to read/write from | |
419 | * @param readOnly Whether this is a read-only transport | |
420 | */ | |
421 | public TFileTransport(TSeekableFile inputFile, boolean readOnly) { | |
422 | inputFile_ = inputFile; | |
423 | readOnly_ = readOnly; | |
424 | } | |
425 | ||
426 | ||
427 | /** | |
428 | * Cloned from TTransport.java:readAll(). Only difference is throwing an EOF exception | |
429 | * where one is detected. | |
430 | */ | |
431 | public int readAll(byte[] buf, int off, int len) | |
432 | throws TTransportException { | |
433 | int got = 0; | |
434 | int ret = 0; | |
435 | while (got < len) { | |
436 | ret = read(buf, off+got, len-got); | |
437 | if (ret < 0) { | |
438 | throw new TTransportException("Error in reading from file"); | |
439 | } | |
440 | if(ret == 0) { | |
441 | throw new TTransportException(TTransportException.END_OF_FILE, | |
442 | "End of File reached"); | |
443 | } | |
444 | got += ret; | |
445 | } | |
446 | return got; | |
447 | } | |
448 | ||
449 | ||
450 | /** | |
451 | * Reads up to len bytes into buffer buf, starting at offset off. | |
452 | * | |
453 | * @param buf Array to read into | |
454 | * @param off Index to start reading at | |
455 | * @param len Maximum number of bytes to read | |
456 | * @return The number of bytes actually read | |
457 | * @throws TTransportException if there was an error reading data | |
458 | */ | |
459 | public int read(byte[] buf, int off, int len) throws TTransportException { | |
460 | if(!isOpen()) | |
461 | throw new TTransportException(TTransportException.NOT_OPEN, | |
462 | "Must open before reading"); | |
463 | ||
464 | if(currentEvent_.getRemaining() == 0) { | |
465 | if(!readEvent()) | |
466 | return(0); | |
467 | } | |
468 | ||
469 | int nread = currentEvent_.emit(buf, off, len); | |
470 | return nread; | |
471 | } | |
472 | ||
473 | public int getNumChunks() throws TTransportException { | |
474 | if(!isOpen()) | |
475 | throw new TTransportException(TTransportException.NOT_OPEN, | |
476 | "Must open before getNumChunks"); | |
477 | try { | |
478 | long len = inputFile_.length(); | |
479 | if(len == 0) | |
480 | return 0; | |
481 | else | |
482 | return (((int)(len/cs.getChunkSize())) + 1); | |
483 | ||
484 | } catch (IOException iox) { | |
485 | throw new TTransportException(iox.getMessage(), iox); | |
486 | } | |
487 | } | |
488 | ||
489 | public int getCurChunk() throws TTransportException { | |
490 | if(!isOpen()) | |
491 | throw new TTransportException(TTransportException.NOT_OPEN, | |
492 | "Must open before getCurChunk"); | |
493 | return (cs.getChunkNum()); | |
494 | ||
495 | } | |
496 | ||
497 | ||
498 | public void seekToChunk(int chunk) throws TTransportException { | |
499 | if(!isOpen()) | |
500 | throw new TTransportException(TTransportException.NOT_OPEN, | |
501 | "Must open before seeking"); | |
502 | ||
503 | int numChunks = getNumChunks(); | |
504 | ||
505 | // file is empty, seeking to chunk is pointless | |
506 | if (numChunks == 0) { | |
507 | return; | |
508 | } | |
509 | ||
510 | // negative indicates reverse seek (from the end) | |
511 | if (chunk < 0) { | |
512 | chunk += numChunks; | |
513 | } | |
514 | ||
515 | // too large a value for reverse seek, just seek to beginning | |
516 | if (chunk < 0) { | |
517 | chunk = 0; | |
518 | } | |
519 | ||
520 | long eofOffset=0; | |
521 | boolean seekToEnd = (chunk >= numChunks); | |
522 | if(seekToEnd) { | |
523 | chunk = chunk - 1; | |
524 | try { eofOffset = inputFile_.length(); } | |
525 | catch (IOException iox) {throw new TTransportException(iox.getMessage(), | |
526 | iox);} | |
527 | } | |
528 | ||
529 | if(chunk*cs.getChunkSize() != cs.getOffset()) { | |
530 | try { inputFile_.seek((long)chunk*cs.getChunkSize()); } | |
531 | catch (IOException iox) { | |
532 | throw new TTransportException("Seek to chunk " + | |
533 | chunk + " " +iox.getMessage(), iox); | |
534 | } | |
535 | ||
536 | cs.seek((long)chunk*cs.getChunkSize()); | |
537 | currentEvent_.setAvailable(0); | |
538 | inputStream_ = createInputStream(); | |
539 | } | |
540 | ||
541 | if(seekToEnd) { | |
542 | // waiting forever here - otherwise we can hit EOF and end up | |
543 | // having consumed partial data from the data stream. | |
544 | TailPolicy old = setTailPolicy(TailPolicy.WAIT_FOREVER); | |
545 | while(cs.getOffset() < eofOffset) { readEvent(); } | |
546 | currentEvent_.setAvailable(0); | |
547 | setTailPolicy(old); | |
548 | } | |
549 | } | |
550 | ||
551 | public void seekToEnd() throws TTransportException { | |
552 | if(!isOpen()) | |
553 | throw new TTransportException(TTransportException.NOT_OPEN, | |
554 | "Must open before seeking"); | |
555 | seekToChunk(getNumChunks()); | |
556 | } | |
557 | ||
558 | ||
559 | /** | |
560 | * Writes up to len bytes from the buffer. | |
561 | * | |
562 | * @param buf The output data buffer | |
563 | * @param off The offset to start writing from | |
564 | * @param len The number of bytes to write | |
565 | * @throws TTransportException if there was an error writing data | |
566 | */ | |
567 | public void write(byte[] buf, int off, int len) throws TTransportException { | |
568 | throw new TTransportException("Not Supported"); | |
569 | } | |
570 | ||
571 | /** | |
572 | * Flush any pending data out of a transport buffer. | |
573 | * | |
574 | * @throws TTransportException if there was an error writing out data. | |
575 | */ | |
576 | public void flush() throws TTransportException { | |
577 | throw new TTransportException("Not Supported"); | |
578 | } | |
579 | ||
580 | /** | |
581 | * test program | |
582 | * | |
583 | */ | |
584 | public static void main(String[] args) throws Exception { | |
585 | ||
586 | int num_chunks = 10; | |
587 | ||
588 | if((args.length < 1) || args[0].equals("--help") | |
589 | || args[0].equals("-h") || args[0].equals("-?")) { | |
590 | printUsage(); | |
591 | } | |
592 | ||
593 | if(args.length > 1) { | |
594 | try { | |
595 | num_chunks = Integer.parseInt(args[1]); | |
596 | } catch (Exception e) { | |
597 | LOGGER.error("Cannot parse " + args[1]); | |
598 | printUsage(); | |
599 | } | |
600 | } | |
601 | ||
602 | TFileTransport t = new TFileTransport(args[0], true); | |
603 | t.open(); | |
604 | LOGGER.info("NumChunks="+t.getNumChunks()); | |
605 | ||
606 | Random r = new Random(); | |
607 | for(int j=0; j<num_chunks; j++) { | |
608 | byte[] buf = new byte[4096]; | |
609 | int cnum = r.nextInt(t.getNumChunks()-1); | |
610 | LOGGER.info("Reading chunk "+cnum); | |
611 | t.seekToChunk(cnum); | |
612 | for(int i=0; i<4096; i++) { | |
613 | t.read(buf, 0, 4096); | |
614 | } | |
615 | } | |
616 | } | |
617 | ||
618 | private static void printUsage() { | |
619 | LOGGER.error("Usage: TFileTransport <filename> [num_chunks]"); | |
620 | LOGGER.error(" (Opens and reads num_chunks chunks from file randomly)"); | |
621 | System.exit(1); | |
622 | } | |
623 | ||
624 | } |