BufferedIO.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2004-2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 #include <algorithm>
00018 #include <errno.h>
00019 
00020 #include "BufferedIO.h"
00021 #include "IO.h"
00022 #include "PrettyPrintBuffer.h"
00023 
00024 namespace oasys {
00025 
00026 /******************************************************************
00027  *
00028  * BufferedInput
00029  *
00030  ******************************************************************/
00031 #define DEFAULT_BUFSIZE 1024
00032 
00033 BufferedInput::BufferedInput(IOClient* client, const char* logbase)
00034     : Logger("BufferedInput", logbase),
00035       client_(client),
00036       buf_(DEFAULT_BUFSIZE),
00037       seen_eof_(false)
00038  {}
00039 
00040 BufferedInput::~BufferedInput()
00041 {}
00042 
00043 int 
00044 BufferedInput::read_line(const char* nl, char** buf, int timeout)
00045 {
00046     int endl;
00047     while((endl = find_nl(nl)) == -1)
00048     {
00049         // can't find a newline, so read in another chunk of data
00050         int cc = internal_read(buf_.fullbytes() + BufferedInput::READ_AHEAD,
00051                                timeout);
00052         
00053         log_debug("readline: cc = %d", cc);
00054         if(cc <= 0)
00055         {
00056             log_debug("%s: read %s", 
00057                  __func__, (cc == 0) ? "eof" : strerror(errno));
00058             return cc;
00059         }
00060     }
00061 
00062     *buf = buf_.start();
00063 
00064     log_debug("endl = %d", endl);
00065     buf_.consume(endl + strlen(nl));
00066 
00067     return endl + strlen(nl);
00068 }
00069 
00070 int 
00071 BufferedInput::read_bytes(size_t len, char** buf, int timeout)
00072 {
00073     ASSERT(len > 0);
00074     
00075     log_debug("read_bytes %zu (timeout %d)", len, timeout);
00076     
00077     size_t total = buf_.fullbytes();
00078     
00079     while (total < len)
00080     {
00081         // fill up the buffer (if possible)
00082         log_debug("read_bytes calling internal_read for %zu needed bytes",
00083                   (len - total));
00084         int cc = internal_read(len, timeout);
00085         if (cc <= 0)
00086         {
00087             log_debug("%s: read %s", 
00088                  __func__, (cc == 0) ? "eof" : strerror(errno));
00089             return cc;
00090         }
00091 
00092         // the return from internal_read is the smaller of what we
00093         // asked for (i.e. len), and the buffer's fullbytes after the
00094         // read. therefore, it's suitable as the value for total
00095         total = cc;
00096     }
00097 
00098     *buf = buf_.start();
00099 
00100     // don't consume more than the user asked for
00101     buf_.consume(len);
00102     
00103     return len;
00104 }
00105 
00106 int 
00107 BufferedInput::read_some_bytes(char** buf, int timeout)
00108 {
00109     int cc;
00110 
00111     // if there's nothing in the buffer, then issue one call to read,
00112     // trying to fill up as much as possible
00113     if (buf_.fullbytes() == 0) {
00114         ASSERT(buf_.start() == buf_.end());
00115         
00116         cc = internal_read(buf_.tailbytes(), timeout);
00117 
00118         if (cc == 0) {
00119             log_debug("%s: read eof", __func__);
00120             return cc; // eof ok
00121         }
00122 
00123         if (cc < 0) {
00124             logf(LOG_ERR, "%s: read error %s", __func__, strerror(errno));
00125             return cc;
00126         }
00127 
00128         ASSERT(buf_.fullbytes() > 0);
00129     }
00130 
00131     *buf = buf_.start();
00132     
00133     cc = buf_.fullbytes();
00134     buf_.consume(cc);
00135     
00136     log_debug("read_some_bytes ret %d (timeout %d)", cc, timeout);
00137 
00138     return cc;
00139 }
00140 
00141 char
00142 BufferedInput::get_char(int timeout)
00143 {
00144     if (buf_.fullbytes() == 0) 
00145     {
00146         int cc = internal_read(buf_.tailbytes(), timeout);
00147         
00148         if (cc <= 0) {
00149             logf(LOG_ERR, "%s: read %s", 
00150                  __func__, (cc == 0) ? "eof" : strerror(errno));
00151             
00152             return 0;
00153         }
00154 
00155         ASSERT(buf_.fullbytes() > 0);
00156     }
00157 
00158     char ret = *buf_.start();
00159     buf_.consume(1);
00160 
00161     return ret;
00162 }
00163 
00164 bool
00165 BufferedInput::eof()
00166 {
00167     return buf_.fullbytes() == 0 && seen_eof_;
00168 }
00169 
00170 int
00171 BufferedInput::internal_read(size_t len, int timeout_ms)
00172 {
00173     int cc;
00174     ASSERT(len > 0);
00175     ASSERT(len > buf_.fullbytes());
00176 
00177     // make sure there's at least len space in buf's tailbytes
00178     buf_.reserve(len);
00179 
00180     // but always try to fill up as much as possible into tailbytes
00181     if (timeout_ms > 0) {
00182         cc = client_->timeout_read(buf_.end(), buf_.tailbytes(), timeout_ms);
00183     } else {
00184         cc = client_->read(buf_.end(), buf_.tailbytes());
00185     }
00186     
00187     if (cc == IOTIMEOUT)
00188     {
00189         log_debug("internal_read %zu (timeout %d) timed out",
00190                   len, timeout_ms);
00191         return cc;
00192     }
00193     else if (cc == 0) 
00194     {
00195         log_debug("internal_read %zu (timeout %d) eof",
00196                   len, timeout_ms);
00197         seen_eof_ = true;
00198         return cc;
00199     }
00200     else if (cc < 0)
00201     {
00202         logf(LOG_ERR, "internal_read %zu (timeout %d) error %d in read: %s",
00203              len, timeout_ms, cc, strerror(errno));
00204         
00205         return cc;
00206     }
00207     
00208     buf_.fill(cc);
00209 
00210     int ret = std::min(buf_.fullbytes(), len);
00211 
00212 #ifndef NDEBUG
00213     PrettyPrintBuf pretty(buf_.start(), ret);
00214         
00215     log_debug("internal_read %u bytes, data =", ret);
00216     std::string s;
00217     bool done;
00218     do {
00219         done = pretty.next_str(&s);
00220         log_debug(s.c_str());
00221     } while(!done);
00222 #else
00223     log_debug("internal_read %zu (timeout %d): cc=%d ret %d",
00224               len, timeout_ms, cc, ret);
00225 #endif
00226 
00227     return ret;
00228 }
00229 
00230 int
00231 BufferedInput::find_nl(const char* nl)
00232 {
00233     char* offset = buf_.start();
00234     int nl_len   = strlen(nl);
00235     size_t bytes_left = buf_.fullbytes();
00236 
00237     for(;;)
00238     {
00239         char* new_offset;
00240         new_offset = static_cast<char*>(memchr(offset, nl[0], bytes_left));
00241         
00242         bytes_left -= new_offset - offset;
00243         offset = new_offset;
00244 
00245         if (offset == 0 || static_cast<int>(bytes_left) < nl_len)
00246             return -1;
00247         
00248         if (memcmp(offset, nl, nl_len) == 0)
00249         {
00250             return offset - buf_.start();
00251         }
00252         
00253         offset++;
00254         bytes_left--;
00255     }
00256 }
00257 
00258 /***************************************************************************
00259  *
00260  * BufferedOuput
00261  *
00262  **************************************************************************/
00263 BufferedOutput::BufferedOutput(IOClient* client, 
00264                                const char* logbase)
00265     : Logger("BufferedOutput", logbase),
00266       client_(client),
00267       buf_(DEFAULT_BUFSIZE), 
00268       flush_limit_(DEFAULT_FLUSH_LIMIT)
00269 {}
00270 
00271 int
00272 BufferedOutput::write(const char* bp, size_t len)
00273 {
00274     if (len == 0)
00275         len = strlen(bp);
00276               
00277     buf_.reserve(len);
00278     memcpy(buf_.end(), bp, len);
00279     buf_.fill(len);
00280     
00281     if ((flush_limit_) > 0 && (buf_.fullbytes() > flush_limit_))
00282     {
00283         flush();
00284     }
00285 
00286     return len;
00287 }
00288 
00289 void
00290 BufferedOutput::clear_buf()
00291 {
00292     buf_.clear();
00293 }
00294 
00295 int
00296 BufferedOutput::vformat_buf(const char* fmt, va_list ap)
00297 {
00298     int nfree = buf_.tailbytes();
00299     int len   = vsnprintf(buf_.end(), nfree, fmt, ap);
00300 
00301     ASSERT(len != -1);
00302     if (len >= nfree) {
00303         buf_.reserve(len);
00304         nfree = len;
00305         len = vsnprintf(buf_.end(), nfree, fmt, ap);
00306         ASSERT(len <= nfree);
00307     }
00308     
00309     buf_.fill(len);
00310     if ((flush_limit_) > 0 && (buf_.fullbytes() > flush_limit_))
00311     {
00312         flush();
00313     }
00314 
00315     return len;
00316 }
00317 
00318 int
00319 BufferedOutput::format_buf(const char* format, ...)
00320 {
00321     va_list ap;
00322     va_start(ap, format);
00323     int ret = vformat_buf(format, ap);
00324     va_end(ap);
00325 
00326     return ret;
00327 }
00328 
00329 int
00330 BufferedOutput::printf(const char* format, ...)
00331 {
00332     va_list ap;
00333     va_start(ap, format);
00334     int ret = vformat_buf(format, ap);
00335     va_end(ap);
00336 
00337     flush();
00338     
00339     return ret;
00340 }
00341 
00342 int
00343 BufferedOutput::flush()
00344 {
00345     int total = 0;
00346 
00347     while(buf_.fullbytes() > 0)
00348     {
00349         int cc = client_->write(buf_.start(), buf_.fullbytes());
00350 
00351         if (cc < 0) 
00352         {
00353             log_err("write error %s", strerror(errno));
00354 
00355             return cc;
00356         }
00357 
00358 #ifndef NDEBUG
00359         PrettyPrintBuf pretty(buf_.start(), cc);
00360         
00361         log_debug("flush %d bytes, data =", cc);
00362         std::string s;
00363         bool done;
00364         do {
00365             done = pretty.next_str(&s);
00366             log_debug(s.c_str());
00367         } while(!done);
00368 
00369 #else
00370         log_debug("flush wrote \"%s\", %d bytes", 
00371                   buf_.start(), cc);
00372 #endif
00373 
00374         buf_.consume(cc);
00375         total += cc;
00376     }
00377 
00378     return total;
00379 }
00380 
00381 void
00382 BufferedOutput::set_flush_limit(size_t limit)
00383 {
00384     flush_limit_ = limit;
00385 }
00386 
00387 } // namespace oasys

Generated on Thu Jun 7 16:56:48 2007 for DTN Reference Implementation by  doxygen 1.5.1