001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.commons.io.input; 018 019import java.io.IOException; 020import java.io.InputStream; 021import java.util.ArrayList; 022import java.util.List; 023 024 025/** 026 * The {@link ObservableInputStream} allows, that an InputStream may be consumed 027 * by other receivers, apart from the thread, which is reading it. 028 * The other consumers are implemented as instances of {@link Observer}. A 029 * typical application may be the generation of a {@link java.security.MessageDigest} on the 030 * fly. 031 * {@code Note}: The {@link ObservableInputStream} is <em>not</em> thread safe, 032 * as instances of InputStream usually aren't. 033 * If you must access the stream from multiple threads, then synchronization, locking, 034 * or a similar means must be used. 035 * @see MessageDigestCalculatingInputStream 036 */ 037public class ObservableInputStream extends ProxyInputStream { 038 039 public static abstract class Observer { 040 041 /** Called to indicate, that {@link InputStream#read()} has been invoked 042 * on the {@link ObservableInputStream}, and will return a value. 043 * @param pByte The value, which is being returned. This will never be -1 (EOF), 044 * because, in that case, {@link #finished()} will be invoked instead. 045 * @throws IOException if an i/o-error occurs 046 */ 047 void data(final int pByte) throws IOException {} 048 049 /** Called to indicate, that {@link InputStream#read(byte[])}, or 050 * {@link InputStream#read(byte[], int, int)} have been called, and are about to 051 * invoke data. 052 * @param pBuffer The byte array, which has been passed to the read call, and where 053 * data has been stored. 054 * @param pOffset The offset within the byte array, where data has been stored. 055 * @param pLength The number of bytes, which have been stored in the byte array. 056 * @throws IOException if an i/o-error occurs 057 */ 058 void data(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException {} 059 060 /** Called to indicate, that EOF has been seen on the underlying stream. 061 * This method may be called multiple times, if the reader keeps invoking 062 * either of the read methods, and they will consequently keep returning 063 * EOF. 064 * @throws IOException if an i/o-error occurs 065 */ 066 void finished() throws IOException {} 067 068 /** Called to indicate, that the {@link ObservableInputStream} has been closed. 069 * @throws IOException if an i/o-error occurs 070 */ 071 void closed() throws IOException {} 072 073 /** 074 * Called to indicate, that an error occurred on the underlying stream. 075 * @throws IOException if an i/o-error occurs 076 */ 077 void error(final IOException pException) throws IOException { throw pException; } 078 } 079 080 private final List<Observer> observers = new ArrayList<>(); 081 082 /** 083 * Creates a new ObservableInputStream for the given InputStream. 084 * @param pProxy the input stream to proxy 085 */ 086 public ObservableInputStream(final InputStream pProxy) { 087 super(pProxy); 088 } 089 090 /** 091 * Adds an Observer. 092 * @param pObserver the observer to add 093 */ 094 public void add(final Observer pObserver) { 095 observers.add(pObserver); 096 } 097 098 /** 099 * Removes an Observer. 100 * @param pObserver the observer to remove 101 */ 102 public void remove(final Observer pObserver) { 103 observers.remove(pObserver); 104 } 105 106 /** 107 * Removes all Observers. 108 */ 109 public void removeAllObservers() { 110 observers.clear(); 111 } 112 113 @Override 114 public int read() throws IOException { 115 int result = 0; 116 IOException ioe = null; 117 try { 118 result = super.read(); 119 } catch (final IOException pException) { 120 ioe = pException; 121 } 122 if (ioe != null) { 123 noteError(ioe); 124 } else if (result == -1) { 125 noteFinished(); 126 } else { 127 noteDataByte(result); 128 } 129 return result; 130 } 131 132 @Override 133 public int read(final byte[] pBuffer) throws IOException { 134 int result = 0; 135 IOException ioe = null; 136 try { 137 result = super.read(pBuffer); 138 } catch (final IOException pException) { 139 ioe = pException; 140 } 141 if (ioe != null) { 142 noteError(ioe); 143 } else if (result == -1) { 144 noteFinished(); 145 } else if (result > 0) { 146 noteDataBytes(pBuffer, 0, result); 147 } 148 return result; 149 } 150 151 @Override 152 public int read(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException { 153 int result = 0; 154 IOException ioe = null; 155 try { 156 result = super.read(pBuffer, pOffset, pLength); 157 } catch (final IOException pException) { 158 ioe = pException; 159 } 160 if (ioe != null) { 161 noteError(ioe); 162 } else if (result == -1) { 163 noteFinished(); 164 } else if (result > 0) { 165 noteDataBytes(pBuffer, pOffset, result); 166 } 167 return result; 168 } 169 170 /** Notifies the observers by invoking {@link Observer#data(byte[],int,int)} 171 * with the given arguments. 172 * @param pBuffer Passed to the observers. 173 * @param pOffset Passed to the observers. 174 * @param pLength Passed to the observers. 175 * @throws IOException Some observer has thrown an exception, which is being 176 * passed down. 177 */ 178 protected void noteDataBytes(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException { 179 for (final Observer observer : getObservers()) { 180 observer.data(pBuffer, pOffset, pLength); 181 } 182 } 183 184 /** Notifies the observers by invoking {@link Observer#finished()}. 185 * @throws IOException Some observer has thrown an exception, which is being 186 * passed down. 187 */ 188 protected void noteFinished() throws IOException { 189 for (final Observer observer : getObservers()) { 190 observer.finished(); 191 } 192 } 193 194 /** Notifies the observers by invoking {@link Observer#data(int)} 195 * with the given arguments. 196 * @param pDataByte Passed to the observers. 197 * @throws IOException Some observer has thrown an exception, which is being 198 * passed down. 199 */ 200 protected void noteDataByte(final int pDataByte) throws IOException { 201 for (final Observer observer : getObservers()) { 202 observer.data(pDataByte); 203 } 204 } 205 206 /** Notifies the observers by invoking {@link Observer#error(IOException)} 207 * with the given argument. 208 * @param pException Passed to the observers. 209 * @throws IOException Some observer has thrown an exception, which is being 210 * passed down. This may be the same exception, which has been passed as an 211 * argument. 212 */ 213 protected void noteError(final IOException pException) throws IOException { 214 for (final Observer observer : getObservers()) { 215 observer.error(pException); 216 } 217 } 218 219 /** Notifies the observers by invoking {@link Observer#finished()}. 220 * @throws IOException Some observer has thrown an exception, which is being 221 * passed down. 222 */ 223 protected void noteClosed() throws IOException { 224 for (final Observer observer : getObservers()) { 225 observer.closed(); 226 } 227 } 228 229 /** Gets all currently registered observers. 230 * @return a list of the currently registered observers 231 */ 232 protected List<Observer> getObservers() { 233 return observers; 234 } 235 236 @Override 237 public void close() throws IOException { 238 IOException ioe = null; 239 try { 240 super.close(); 241 } catch (final IOException e) { 242 ioe = e; 243 } 244 if (ioe == null) { 245 noteClosed(); 246 } else { 247 noteError(ioe); 248 } 249 } 250 251 /** Reads all data from the underlying {@link InputStream}, while notifying the 252 * observers. 253 * @throws IOException The underlying {@link InputStream}, or either of the 254 * observers has thrown an exception. 255 */ 256 public void consume() throws IOException { 257 final byte[] buffer = new byte[8192]; 258 for (;;) { 259 final int res = read(buffer); 260 if (res == -1) { 261 return; 262 } 263 } 264 } 265 266}