001/*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.commons.io.input;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.util.ArrayList;
022import java.util.List;
023
024
025/**
026 * The {@link ObservableInputStream} allows, that an InputStream may be consumed
027 * by other receivers, apart from the thread, which is reading it.
028 * The other consumers are implemented as instances of {@link Observer}. A
029 * typical application may be the generation of a {@link java.security.MessageDigest} on the
030 * fly.
031 * {@code Note}: The {@link ObservableInputStream} is <em>not</em> thread safe,
032 * as instances of InputStream usually aren't.
033 * If you must access the stream from multiple threads, then synchronization, locking,
034 * or a similar means must be used.
035 * @see MessageDigestCalculatingInputStream
036 */
037public class ObservableInputStream extends ProxyInputStream {
038
039    public static abstract class Observer {
040
041        /** Called to indicate, that {@link InputStream#read()} has been invoked
042         * on the {@link ObservableInputStream}, and will return a value.
043         * @param pByte The value, which is being returned. This will never be -1 (EOF),
044         *    because, in that case, {@link #finished()} will be invoked instead.
045         * @throws IOException if an i/o-error occurs
046         */
047        void data(final int pByte) throws IOException {}
048
049        /** Called to indicate, that {@link InputStream#read(byte[])}, or
050         * {@link InputStream#read(byte[], int, int)} have been called, and are about to
051         * invoke data.
052         * @param pBuffer The byte array, which has been passed to the read call, and where
053         *   data has been stored.
054         * @param pOffset The offset within the byte array, where data has been stored.
055         * @param pLength The number of bytes, which have been stored in the byte array.
056         * @throws IOException if an i/o-error occurs
057         */
058        void data(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException {}
059
060        /** Called to indicate, that EOF has been seen on the underlying stream.
061         * This method may be called multiple times, if the reader keeps invoking
062         * either of the read methods, and they will consequently keep returning
063         * EOF.
064         * @throws IOException if an i/o-error occurs
065         */
066        void finished() throws IOException {}
067
068        /** Called to indicate, that the {@link ObservableInputStream} has been closed.
069         * @throws IOException if an i/o-error occurs
070         */
071        void closed() throws IOException {}
072
073        /**
074         * Called to indicate, that an error occurred on the underlying stream.
075         * @throws IOException if an i/o-error occurs
076         */
077        void error(final IOException pException) throws IOException { throw pException; }
078    }
079
080    private final List<Observer> observers = new ArrayList<>();
081
082    /**
083     * Creates a new ObservableInputStream for the given InputStream.
084     * @param pProxy the input stream to proxy
085     */
086    public ObservableInputStream(final InputStream pProxy) {
087        super(pProxy);
088    }
089
090    /**
091     * Adds an Observer.
092     * @param pObserver the observer to add
093     */
094    public void add(final Observer pObserver) {
095        observers.add(pObserver);
096    }
097
098    /**
099     * Removes an Observer.
100     * @param pObserver the observer to remove
101     */
102    public void remove(final Observer pObserver) {
103        observers.remove(pObserver);
104    }
105
106    /**
107     * Removes all Observers.
108     */
109    public void removeAllObservers() {
110        observers.clear();
111    }
112
113    @Override
114    public int read() throws IOException {
115        int result = 0;
116        IOException ioe = null;
117        try {
118            result = super.read();
119        } catch (final IOException pException) {
120            ioe = pException;
121        }
122        if (ioe != null) {
123            noteError(ioe);
124        } else if (result == -1) {
125            noteFinished();
126        } else {
127            noteDataByte(result);
128        }
129        return result;
130    }
131
132    @Override
133    public int read(final byte[] pBuffer) throws IOException {
134        int result = 0;
135        IOException ioe = null;
136        try {
137            result = super.read(pBuffer);
138        } catch (final IOException pException) {
139            ioe = pException;
140        }
141        if (ioe != null) {
142            noteError(ioe);
143        } else if (result == -1) {
144            noteFinished();
145        } else if (result > 0) {
146            noteDataBytes(pBuffer, 0, result);
147        }
148        return result;
149    }
150
151    @Override
152    public int read(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException {
153        int result = 0;
154        IOException ioe = null;
155        try {
156            result = super.read(pBuffer, pOffset, pLength);
157        } catch (final IOException pException) {
158            ioe = pException;
159        }
160        if (ioe != null) {
161            noteError(ioe);
162        } else if (result == -1) {
163            noteFinished();
164        } else if (result > 0) {
165            noteDataBytes(pBuffer, pOffset, result);
166        }
167        return result;
168    }
169
170    /** Notifies the observers by invoking {@link Observer#data(byte[],int,int)}
171     * with the given arguments.
172     * @param pBuffer Passed to the observers.
173     * @param pOffset Passed to the observers.
174     * @param pLength Passed to the observers.
175     * @throws IOException Some observer has thrown an exception, which is being
176     *   passed down.
177     */
178    protected void noteDataBytes(final byte[] pBuffer, final int pOffset, final int pLength) throws IOException {
179        for (final Observer observer : getObservers()) {
180            observer.data(pBuffer, pOffset, pLength);
181        }
182    }
183
184    /** Notifies the observers by invoking {@link Observer#finished()}.
185     * @throws IOException Some observer has thrown an exception, which is being
186     *   passed down.
187     */
188    protected void noteFinished() throws IOException {
189        for (final Observer observer : getObservers()) {
190            observer.finished();
191        }
192    }
193
194    /** Notifies the observers by invoking {@link Observer#data(int)}
195     * with the given arguments.
196     * @param pDataByte Passed to the observers.
197     * @throws IOException Some observer has thrown an exception, which is being
198     *   passed down.
199     */
200    protected void noteDataByte(final int pDataByte) throws IOException {
201        for (final Observer observer : getObservers()) {
202            observer.data(pDataByte);
203        }
204    }
205
206    /** Notifies the observers by invoking {@link Observer#error(IOException)}
207     * with the given argument.
208     * @param pException Passed to the observers.
209     * @throws IOException Some observer has thrown an exception, which is being
210     *   passed down. This may be the same exception, which has been passed as an
211     *   argument.
212     */
213    protected void noteError(final IOException pException) throws IOException {
214        for (final Observer observer : getObservers()) {
215            observer.error(pException);
216        }
217    }
218
219    /** Notifies the observers by invoking {@link Observer#finished()}.
220     * @throws IOException Some observer has thrown an exception, which is being
221     *   passed down.
222     */
223    protected void noteClosed() throws IOException {
224        for (final Observer observer : getObservers()) {
225            observer.closed();
226        }
227    }
228
229    /** Gets all currently registered observers.
230     * @return a list of the currently registered observers
231     */
232    protected List<Observer> getObservers() {
233        return observers;
234    }
235
236    @Override
237    public void close() throws IOException {
238        IOException ioe = null;
239        try {
240            super.close();
241        } catch (final IOException e) {
242            ioe = e;
243        }
244        if (ioe == null) {
245            noteClosed();
246        } else {
247            noteError(ioe);
248        }
249    }
250
251    /** Reads all data from the underlying {@link InputStream}, while notifying the
252     * observers.
253     * @throws IOException The underlying {@link InputStream}, or either of the
254     *   observers has thrown an exception.
255     */
256    public void consume() throws IOException {
257        final byte[] buffer = new byte[8192];
258        for (;;) {
259            final int res = read(buffer);
260            if (res == -1) {
261                return;
262            }
263        }
264    }
265
266}