00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038 #include <algorithm>
00039 #include <errno.h>
00040
00041 #include "BufferedIO.h"
00042 #include "IO.h"
00043 #include "PrettyPrintBuffer.h"
00044
00045 namespace oasys {
00046
00047
00048
00049
00050
00051
00052 #define DEFAULT_BUFSIZE 1024
00053
00054 BufferedInput::BufferedInput(IOClient* client, const char* logbase)
00055 : Logger("BufferedInput", logbase),
00056 client_(client),
00057 buf_(DEFAULT_BUFSIZE),
00058 seen_eof_(false)
00059 {}
00060
00061 BufferedInput::~BufferedInput()
00062 {}
00063
00064 int
00065 BufferedInput::read_line(const char* nl, char** buf, int timeout)
00066 {
00067 int endl;
00068 while((endl = find_nl(nl)) == -1)
00069 {
00070
00071 int cc = internal_read(buf_.fullbytes() + BufferedInput::READ_AHEAD,
00072 timeout);
00073
00074 log_debug("readline: cc = %d", cc);
00075 if(cc <= 0)
00076 {
00077 log_debug("%s: read %s",
00078 __func__, (cc == 0) ? "eof" : strerror(errno));
00079 return cc;
00080 }
00081 }
00082
00083 *buf = buf_.start();
00084
00085 log_debug("endl = %d", endl);
00086 buf_.consume(endl + strlen(nl));
00087
00088 return endl + strlen(nl);
00089 }
00090
00091 int
00092 BufferedInput::read_bytes(size_t len, char** buf, int timeout)
00093 {
00094 ASSERT(len > 0);
00095
00096 log_debug("read_bytes %zu (timeout %d)", len, timeout);
00097
00098 size_t total = buf_.fullbytes();
00099
00100 while (total < len)
00101 {
00102
00103 log_debug("read_bytes calling internal_read for %zu needed bytes",
00104 (len - total));
00105 int cc = internal_read(len, timeout);
00106 if (cc <= 0)
00107 {
00108 log_debug("%s: read %s",
00109 __func__, (cc == 0) ? "eof" : strerror(errno));
00110 return cc;
00111 }
00112
00113
00114
00115
00116 total = cc;
00117 }
00118
00119 *buf = buf_.start();
00120
00121
00122 buf_.consume(len);
00123
00124 return len;
00125 }
00126
00127 int
00128 BufferedInput::read_some_bytes(char** buf, int timeout)
00129 {
00130 int cc;
00131
00132
00133
00134 if (buf_.fullbytes() == 0) {
00135 ASSERT(buf_.start() == buf_.end());
00136
00137 cc = internal_read(buf_.tailbytes(), timeout);
00138
00139 if (cc == 0) {
00140 log_debug("%s: read eof", __func__);
00141 return cc;
00142 }
00143
00144 if (cc < 0) {
00145 logf(LOG_ERR, "%s: read error %s", __func__, strerror(errno));
00146 return cc;
00147 }
00148
00149 ASSERT(buf_.fullbytes() > 0);
00150 }
00151
00152 *buf = buf_.start();
00153
00154 cc = buf_.fullbytes();
00155 buf_.consume(cc);
00156
00157 log_debug("read_some_bytes ret %d (timeout %d)", cc, timeout);
00158
00159 return cc;
00160 }
00161
00162 char
00163 BufferedInput::get_char(int timeout)
00164 {
00165 if (buf_.fullbytes() == 0)
00166 {
00167 int cc = internal_read(buf_.tailbytes(), timeout);
00168
00169 if (cc <= 0) {
00170 logf(LOG_ERR, "%s: read %s",
00171 __func__, (cc == 0) ? "eof" : strerror(errno));
00172
00173 return 0;
00174 }
00175
00176 ASSERT(buf_.fullbytes() > 0);
00177 }
00178
00179 char ret = *buf_.start();
00180 buf_.consume(1);
00181
00182 return ret;
00183 }
00184
00185 bool
00186 BufferedInput::eof()
00187 {
00188 return buf_.fullbytes() == 0 && seen_eof_;
00189 }
00190
00191 int
00192 BufferedInput::internal_read(size_t len, int timeout_ms)
00193 {
00194 int cc;
00195 ASSERT(len > 0);
00196 ASSERT(len > buf_.fullbytes());
00197
00198
00199 buf_.reserve(len);
00200
00201
00202 if (timeout_ms > 0) {
00203 cc = client_->timeout_read(buf_.end(), buf_.tailbytes(), timeout_ms);
00204 } else {
00205 cc = client_->read(buf_.end(), buf_.tailbytes());
00206 }
00207
00208 if (cc == IOTIMEOUT)
00209 {
00210 log_debug("internal_read %zu (timeout %d) timed out",
00211 len, timeout_ms);
00212 return cc;
00213 }
00214 else if (cc == 0)
00215 {
00216 log_debug("internal_read %zu (timeout %d) eof",
00217 len, timeout_ms);
00218 seen_eof_ = true;
00219 return cc;
00220 }
00221 else if (cc < 0)
00222 {
00223 logf(LOG_ERR, "internal_read %zu (timeout %d) error %d in read: %s",
00224 len, timeout_ms, cc, strerror(errno));
00225
00226 return cc;
00227 }
00228
00229 buf_.fill(cc);
00230
00231 int ret = std::min(buf_.fullbytes(), len);
00232
00233 #ifndef NDEBUG
00234 PrettyPrintBuf pretty(buf_.start(), ret);
00235
00236 log_debug("internal_read %u bytes, data =", ret);
00237 std::string s;
00238 bool done;
00239 do {
00240 done = pretty.next_str(&s);
00241 log_debug(s.c_str());
00242 } while(!done);
00243 #else
00244 log_debug("internal_read %zu (timeout %d): cc=%d ret %d",
00245 len, timeout_ms, cc, ret);
00246 #endif
00247
00248 return ret;
00249 }
00250
00251 int
00252 BufferedInput::find_nl(const char* nl)
00253 {
00254 char* offset = buf_.start();
00255 int nl_len = strlen(nl);
00256 size_t bytes_left = buf_.fullbytes();
00257
00258 for(;;)
00259 {
00260 char* new_offset;
00261 new_offset = static_cast<char*>(memchr(offset, nl[0], bytes_left));
00262
00263 bytes_left -= new_offset - offset;
00264 offset = new_offset;
00265
00266 if (offset == 0 || static_cast<int>(bytes_left) < nl_len)
00267 return -1;
00268
00269 if (memcmp(offset, nl, nl_len) == 0)
00270 {
00271 return offset - buf_.start();
00272 }
00273
00274 offset++;
00275 bytes_left--;
00276 }
00277 }
00278
00279
00280
00281
00282
00283
00284 BufferedOutput::BufferedOutput(IOClient* client,
00285 const char* logbase)
00286 : Logger("BufferedOutput", logbase),
00287 client_(client),
00288 buf_(DEFAULT_BUFSIZE),
00289 flush_limit_(DEFAULT_FLUSH_LIMIT)
00290 {}
00291
00292 int
00293 BufferedOutput::write(const char* bp, size_t len)
00294 {
00295 if (len == 0)
00296 len = strlen(bp);
00297
00298 buf_.reserve(len);
00299 memcpy(buf_.end(), bp, len);
00300 buf_.fill(len);
00301
00302 if ((flush_limit_) > 0 && (buf_.fullbytes() > flush_limit_))
00303 {
00304 flush();
00305 }
00306
00307 return len;
00308 }
00309
00310 void
00311 BufferedOutput::clear_buf()
00312 {
00313 buf_.clear();
00314 }
00315
00316 int
00317 BufferedOutput::vformat_buf(const char* fmt, va_list ap)
00318 {
00319 int nfree = buf_.tailbytes();
00320 int len = vsnprintf(buf_.end(), nfree, fmt, ap);
00321
00322 ASSERT(len != -1);
00323 if (len >= nfree) {
00324 buf_.reserve(len);
00325 nfree = len;
00326 len = vsnprintf(buf_.end(), nfree, fmt, ap);
00327 ASSERT(len <= nfree);
00328 }
00329
00330 buf_.fill(len);
00331 if ((flush_limit_) > 0 && (buf_.fullbytes() > flush_limit_))
00332 {
00333 flush();
00334 }
00335
00336 return len;
00337 }
00338
00339 int
00340 BufferedOutput::format_buf(const char* format, ...)
00341 {
00342 va_list ap;
00343 va_start(ap, format);
00344 int ret = vformat_buf(format, ap);
00345 va_end(ap);
00346
00347 return ret;
00348 }
00349
00350 int
00351 BufferedOutput::printf(const char* format, ...)
00352 {
00353 va_list ap;
00354 va_start(ap, format);
00355 int ret = vformat_buf(format, ap);
00356 va_end(ap);
00357
00358 flush();
00359
00360 return ret;
00361 }
00362
00363 int
00364 BufferedOutput::flush()
00365 {
00366 int total = 0;
00367
00368 while(buf_.fullbytes() > 0)
00369 {
00370 int cc = client_->write(buf_.start(), buf_.fullbytes());
00371
00372 if (cc < 0)
00373 {
00374 log_err("write error %s", strerror(errno));
00375
00376 return cc;
00377 }
00378
00379 #ifndef NDEBUG
00380 PrettyPrintBuf pretty(buf_.start(), cc);
00381
00382 log_debug("flush %d bytes, data =", cc);
00383 std::string s;
00384 bool done;
00385 do {
00386 done = pretty.next_str(&s);
00387 log_debug(s.c_str());
00388 } while(!done);
00389
00390 #else
00391 log_debug("flush wrote \"%s\", %d bytes",
00392 buf_.start(), cc);
00393 #endif
00394
00395 buf_.consume(cc);
00396 total += cc;
00397 }
00398
00399 return total;
00400 }
00401
00402 void
00403 BufferedOutput::set_flush_limit(size_t limit)
00404 {
00405 flush_limit_ = limit;
00406 }
00407
00408 }