001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.vfs2.util; 018 019import java.io.BufferedInputStream; 020import java.io.IOException; 021import java.io.InputStream; 022import java.util.concurrent.atomic.AtomicBoolean; 023import java.util.concurrent.atomic.AtomicLong; 024 025/** 026 * An InputStream that provides buffering and end-of-stream monitoring. 027 */ 028public class MonitorInputStream extends BufferedInputStream { 029 private static final int EOF_CHAR = -1; 030 private final AtomicBoolean finished = new AtomicBoolean(false); 031 private final AtomicLong atomicCount = new AtomicLong(0); 032 033 public MonitorInputStream(final InputStream in) { 034 super(in); 035 } 036 037 /** 038 * Returns 0 if the stream is at EOF, else the underlying inputStream will be queried. 039 * 040 * @return The number of bytes that are available. 041 * @throws IOException if an error occurs. 042 * @since 2.0 043 */ 044 @Override 045 public synchronized int available() throws IOException { 046 if (finished.get()) { 047 return 0; 048 } 049 050 return super.available(); 051 } 052 053 /** 054 * Reads a character. 055 * 056 * @return The character that was read as an integer. 057 * @throws IOException if an error occurs. 058 */ 059 @Override 060 public int read() throws IOException { 061 if (finished.get()) { 062 return EOF_CHAR; 063 } 064 065 final int ch = super.read(); 066 if (ch != EOF_CHAR) { 067 atomicCount.incrementAndGet(); 068 return ch; 069 } 070 071 // End-of-stream 072 close(); 073 return EOF_CHAR; 074 } 075 076 /** 077 * Reads bytes from this input stream. 078 * 079 * @param buffer A byte array in which to place the characters read. 080 * @param offset The offset at which to start reading. 081 * @param length The maximum number of bytes to read. 082 * @return The number of bytes read. 083 * @throws IOException if an error occurs. 084 */ 085 @Override 086 public int read(final byte[] buffer, final int offset, final int length) throws IOException { 087 if (finished.get()) { 088 return EOF_CHAR; 089 } 090 091 final int nread = super.read(buffer, offset, length); 092 if (nread != EOF_CHAR) { 093 atomicCount.addAndGet(nread); 094 return nread; 095 } 096 097 // End-of-stream 098 close(); 099 return EOF_CHAR; 100 } 101 102 /** 103 * Closes this input stream and releases any system resources associated with the stream. 104 * 105 * @throws IOException if an error occurs. 106 */ 107 @Override 108 public void close() throws IOException { 109 final boolean closed = finished.getAndSet(true); 110 if (closed) { 111 return; 112 } 113 114 // Close the stream 115 IOException exc = null; 116 try { 117 super.close(); 118 } catch (final IOException ioe) { 119 exc = ioe; 120 } 121 122 // Notify that the stream has been closed 123 try { 124 onClose(); 125 } catch (final IOException ioe) { 126 exc = ioe; 127 } 128 129 if (exc != null) { 130 throw exc; 131 } 132 } 133 134 /** 135 * Called after the stream has been closed. This implementation does nothing. 136 * 137 * @throws IOException if an error occurs. 138 */ 139 protected void onClose() throws IOException { 140 } 141 142 /** 143 * Get the number of bytes read by this input stream. 144 * 145 * @return The number of bytes read by this input stream. 146 */ 147 public long getCount() { 148 return atomicCount.get(); 149 } 150}