00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include <algorithm>
00018 #include <errno.h>
00019
00020 #include "BufferedIO.h"
00021 #include "IO.h"
00022 #include "PrettyPrintBuffer.h"
00023
00024 namespace oasys {
00025
00026
00027
00028
00029
00030
00031 #define DEFAULT_BUFSIZE 1024
00032
00033 BufferedInput::BufferedInput(IOClient* client, const char* logbase)
00034 : Logger("BufferedInput", logbase),
00035 client_(client),
00036 buf_(DEFAULT_BUFSIZE),
00037 seen_eof_(false)
00038 {}
00039
00040 BufferedInput::~BufferedInput()
00041 {}
00042
00043 int
00044 BufferedInput::read_line(const char* nl, char** buf, int timeout)
00045 {
00046 int endl;
00047 while((endl = find_nl(nl)) == -1)
00048 {
00049
00050 int cc = internal_read(buf_.fullbytes() + BufferedInput::READ_AHEAD,
00051 timeout);
00052
00053 log_debug("readline: cc = %d", cc);
00054 if(cc <= 0)
00055 {
00056 log_debug("%s: read %s",
00057 __func__, (cc == 0) ? "eof" : strerror(errno));
00058 return cc;
00059 }
00060 }
00061
00062 *buf = buf_.start();
00063
00064 log_debug("endl = %d", endl);
00065 buf_.consume(endl + strlen(nl));
00066
00067 return endl + strlen(nl);
00068 }
00069
00070 int
00071 BufferedInput::read_bytes(size_t len, char** buf, int timeout)
00072 {
00073 ASSERT(len > 0);
00074
00075 log_debug("read_bytes %zu (timeout %d)", len, timeout);
00076
00077 size_t total = buf_.fullbytes();
00078
00079 while (total < len)
00080 {
00081
00082 log_debug("read_bytes calling internal_read for %zu needed bytes",
00083 (len - total));
00084 int cc = internal_read(len, timeout);
00085 if (cc <= 0)
00086 {
00087 log_debug("%s: read %s",
00088 __func__, (cc == 0) ? "eof" : strerror(errno));
00089 return cc;
00090 }
00091
00092
00093
00094
00095 total = cc;
00096 }
00097
00098 *buf = buf_.start();
00099
00100
00101 buf_.consume(len);
00102
00103 return len;
00104 }
00105
00106 int
00107 BufferedInput::read_some_bytes(char** buf, int timeout)
00108 {
00109 int cc;
00110
00111
00112
00113 if (buf_.fullbytes() == 0) {
00114 ASSERT(buf_.start() == buf_.end());
00115
00116 cc = internal_read(buf_.tailbytes(), timeout);
00117
00118 if (cc == 0) {
00119 log_debug("%s: read eof", __func__);
00120 return cc;
00121 }
00122
00123 if (cc < 0) {
00124 logf(LOG_ERR, "%s: read error %s", __func__, strerror(errno));
00125 return cc;
00126 }
00127
00128 ASSERT(buf_.fullbytes() > 0);
00129 }
00130
00131 *buf = buf_.start();
00132
00133 cc = buf_.fullbytes();
00134 buf_.consume(cc);
00135
00136 log_debug("read_some_bytes ret %d (timeout %d)", cc, timeout);
00137
00138 return cc;
00139 }
00140
00141 char
00142 BufferedInput::get_char(int timeout)
00143 {
00144 if (buf_.fullbytes() == 0)
00145 {
00146 int cc = internal_read(buf_.tailbytes(), timeout);
00147
00148 if (cc <= 0) {
00149 logf(LOG_ERR, "%s: read %s",
00150 __func__, (cc == 0) ? "eof" : strerror(errno));
00151
00152 return 0;
00153 }
00154
00155 ASSERT(buf_.fullbytes() > 0);
00156 }
00157
00158 char ret = *buf_.start();
00159 buf_.consume(1);
00160
00161 return ret;
00162 }
00163
00164 bool
00165 BufferedInput::eof()
00166 {
00167 return buf_.fullbytes() == 0 && seen_eof_;
00168 }
00169
00170 int
00171 BufferedInput::internal_read(size_t len, int timeout_ms)
00172 {
00173 int cc;
00174 ASSERT(len > 0);
00175 ASSERT(len > buf_.fullbytes());
00176
00177
00178 buf_.reserve(len);
00179
00180
00181 if (timeout_ms > 0) {
00182 cc = client_->timeout_read(buf_.end(), buf_.tailbytes(), timeout_ms);
00183 } else {
00184 cc = client_->read(buf_.end(), buf_.tailbytes());
00185 }
00186
00187 if (cc == IOTIMEOUT)
00188 {
00189 log_debug("internal_read %zu (timeout %d) timed out",
00190 len, timeout_ms);
00191 return cc;
00192 }
00193 else if (cc == 0)
00194 {
00195 log_debug("internal_read %zu (timeout %d) eof",
00196 len, timeout_ms);
00197 seen_eof_ = true;
00198 return cc;
00199 }
00200 else if (cc < 0)
00201 {
00202 logf(LOG_ERR, "internal_read %zu (timeout %d) error %d in read: %s",
00203 len, timeout_ms, cc, strerror(errno));
00204
00205 return cc;
00206 }
00207
00208 buf_.fill(cc);
00209
00210 int ret = std::min(buf_.fullbytes(), len);
00211
00212 #ifndef NDEBUG
00213 PrettyPrintBuf pretty(buf_.start(), ret);
00214
00215 log_debug("internal_read %u bytes, data =", ret);
00216 std::string s;
00217 bool done;
00218 do {
00219 done = pretty.next_str(&s);
00220 log_debug(s.c_str());
00221 } while(!done);
00222 #else
00223 log_debug("internal_read %zu (timeout %d): cc=%d ret %d",
00224 len, timeout_ms, cc, ret);
00225 #endif
00226
00227 return ret;
00228 }
00229
00230 int
00231 BufferedInput::find_nl(const char* nl)
00232 {
00233 char* offset = buf_.start();
00234 int nl_len = strlen(nl);
00235 size_t bytes_left = buf_.fullbytes();
00236
00237 for(;;)
00238 {
00239 char* new_offset;
00240 new_offset = static_cast<char*>(memchr(offset, nl[0], bytes_left));
00241
00242 bytes_left -= new_offset - offset;
00243 offset = new_offset;
00244
00245 if (offset == 0 || static_cast<int>(bytes_left) < nl_len)
00246 return -1;
00247
00248 if (memcmp(offset, nl, nl_len) == 0)
00249 {
00250 return offset - buf_.start();
00251 }
00252
00253 offset++;
00254 bytes_left--;
00255 }
00256 }
00257
00258
00259
00260
00261
00262
00263 BufferedOutput::BufferedOutput(IOClient* client,
00264 const char* logbase)
00265 : Logger("BufferedOutput", logbase),
00266 client_(client),
00267 buf_(DEFAULT_BUFSIZE),
00268 flush_limit_(DEFAULT_FLUSH_LIMIT)
00269 {}
00270
00271 int
00272 BufferedOutput::write(const char* bp, size_t len)
00273 {
00274 if (len == 0)
00275 len = strlen(bp);
00276
00277 buf_.reserve(len);
00278 memcpy(buf_.end(), bp, len);
00279 buf_.fill(len);
00280
00281 if ((flush_limit_) > 0 && (buf_.fullbytes() > flush_limit_))
00282 {
00283 flush();
00284 }
00285
00286 return len;
00287 }
00288
00289 void
00290 BufferedOutput::clear_buf()
00291 {
00292 buf_.clear();
00293 }
00294
00295 int
00296 BufferedOutput::vformat_buf(const char* fmt, va_list ap)
00297 {
00298 int nfree = buf_.tailbytes();
00299 int len = vsnprintf(buf_.end(), nfree, fmt, ap);
00300
00301 ASSERT(len != -1);
00302 if (len >= nfree) {
00303 buf_.reserve(len);
00304 nfree = len;
00305 len = vsnprintf(buf_.end(), nfree, fmt, ap);
00306 ASSERT(len <= nfree);
00307 }
00308
00309 buf_.fill(len);
00310 if ((flush_limit_) > 0 && (buf_.fullbytes() > flush_limit_))
00311 {
00312 flush();
00313 }
00314
00315 return len;
00316 }
00317
00318 int
00319 BufferedOutput::format_buf(const char* format, ...)
00320 {
00321 va_list ap;
00322 va_start(ap, format);
00323 int ret = vformat_buf(format, ap);
00324 va_end(ap);
00325
00326 return ret;
00327 }
00328
00329 int
00330 BufferedOutput::printf(const char* format, ...)
00331 {
00332 va_list ap;
00333 va_start(ap, format);
00334 int ret = vformat_buf(format, ap);
00335 va_end(ap);
00336
00337 flush();
00338
00339 return ret;
00340 }
00341
00342 int
00343 BufferedOutput::flush()
00344 {
00345 int total = 0;
00346
00347 while(buf_.fullbytes() > 0)
00348 {
00349 int cc = client_->write(buf_.start(), buf_.fullbytes());
00350
00351 if (cc < 0)
00352 {
00353 log_err("write error %s", strerror(errno));
00354
00355 return cc;
00356 }
00357
00358 #ifndef NDEBUG
00359 PrettyPrintBuf pretty(buf_.start(), cc);
00360
00361 log_debug("flush %d bytes, data =", cc);
00362 std::string s;
00363 bool done;
00364 do {
00365 done = pretty.next_str(&s);
00366 log_debug(s.c_str());
00367 } while(!done);
00368
00369 #else
00370 log_debug("flush wrote \"%s\", %d bytes",
00371 buf_.start(), cc);
00372 #endif
00373
00374 buf_.consume(cc);
00375 total += cc;
00376 }
00377
00378 return total;
00379 }
00380
00381 void
00382 BufferedOutput::set_flush_limit(size_t limit)
00383 {
00384 flush_limit_ = limit;
00385 }
00386
00387 }