00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include <errno.h>
00040 #include <unistd.h>
00041 #include <sys/poll.h>
00042 #include "Notifier.h"
00043 #include "SpinLock.h"
00044 #include "io/IO.h"
00045
00046 namespace oasys {
00047
00048 Notifier::Notifier(const char* logpath, bool quiet)
00049 : Logger("Notifier", logpath),
00050 count_(0),
00051 quiet_(quiet)
00052 {
00053 logpath_appendf("/notifier");
00054
00055 if (pipe(pipe_) != 0) {
00056 PANIC("can't create pipe for notifier");
00057 }
00058
00059 if (!quiet_) {
00060 log_debug("created pipe, fds: %d %d", pipe_[0], pipe_[1]);
00061 }
00062
00063 for (int n = 0; n < 2; ++n) {
00064 if (IO::set_nonblocking(pipe_[n], true, quiet ? 0 : logpath_) != 0)
00065 {
00066 PANIC("error setting fd %d to nonblocking: %s",
00067 pipe_[n], strerror(errno));
00068 }
00069 }
00070
00071 waiter_ = false;
00072 }
00073
00074 Notifier::~Notifier()
00075 {
00076 int err;
00077 if (!quiet_) {
00078 log_debug("Notifier shutting down (closing fds %d %d)",
00079 pipe_[0], pipe_[1]);
00080 }
00081
00082 err = close(pipe_[0]);
00083 if (err != 0) {
00084 log_err("error closing pipe %d: %s", pipe_[0], strerror(errno));
00085 }
00086
00087 err = close(pipe_[1]);
00088 if (err != 0) {
00089 log_err("error closing pipe %d: %s", pipe_[1], strerror(errno));
00090 }
00091 }
00092
00093 void
00094 Notifier::drain_pipe(size_t bytes)
00095 {
00096 int ret;
00097 char buf[256];
00098 size_t bytes_drained = 0;
00099
00100 while (true)
00101 {
00102 if (!quiet_) {
00103 log_debug("drain_pipe: attempting to drain %zu bytes", bytes);
00104 }
00105
00106 ret = IO::read(read_fd(), buf,
00107 (bytes == 0) ? sizeof(buf) :
00108 std::min(sizeof(buf), bytes - bytes_drained));
00109 if (ret <= 0) {
00110 if (ret == IOAGAIN) {
00111 PANIC("drain_pipe: trying to drain with not enough notify "
00112 "calls, count = %u and trying to drain %zu bytes",
00113 count_, bytes);
00114 break;
00115 } else {
00116 log_crit("drain_pipe: unexpected error return from read: %s",
00117 strerror(errno));
00118 exit(1);
00119 }
00120 }
00121
00122 bytes_drained += ret;
00123 if (!quiet_) {
00124 log_debug("drain_pipe: drained %zu/%zu byte(s) from pipe",
00125 bytes_drained, bytes);
00126 }
00127 count_ -= ret;
00128
00129 if (bytes == 0 || bytes_drained == bytes) {
00130 break;
00131 }
00132
00133
00134
00135
00136 if (ret < static_cast<int>(sizeof(buf))) {
00137 log_warn("drain_pipe: only possible to drain %zu bytes out of %zu! "
00138 "race condition?", bytes_drained, bytes);
00139 break;
00140 }
00141 }
00142
00143 if (!quiet_) {
00144 log_debug("drain pipe count = %d", count_);
00145 }
00146 }
00147
00148 bool
00149 Notifier::wait(SpinLock* lock, int timeout)
00150 {
00151 if (waiter_) {
00152 PANIC("Notifier doesn't support multiple waiting threads");
00153 }
00154 waiter_ = true;
00155
00156 if (!quiet_) {
00157 log_debug("attempting to wait on %p, count = %d",
00158 this, count_);
00159 }
00160
00161 if (lock)
00162 lock->unlock();
00163
00164 int ret = IO::poll_single(read_fd(), POLLIN, 0, timeout, 0, logpath_);
00165 if (ret < 0 && ret != IOTIMEOUT) {
00166 PANIC("fatal: error return from notifier poll: %s",
00167 strerror(errno));
00168 }
00169
00170 if (lock) {
00171 lock->lock("Notifier::wait");
00172 }
00173
00174 waiter_ = false;
00175
00176 if (ret == IOTIMEOUT) {
00177 if (!quiet_) {
00178 log_debug("notifier wait timeout");
00179 }
00180 return false;
00181 } else {
00182 drain_pipe(1);
00183 if (!quiet_) {
00184 log_debug("notifier wait successfully notified");
00185 }
00186 return true;
00187 }
00188 }
00189
00190 void
00191 Notifier::notify(SpinLock* lock)
00192 {
00193 char b = 0;
00194 int num_retries = 0;
00195
00196 bool need_to_relock = false;
00197
00198 retry:
00199 if (!quiet_) {
00200 log_debug("notifier notify");
00201 }
00202
00203
00204 if (need_to_relock && (lock != NULL)) {
00205 lock->lock("Notifier::notify");
00206 }
00207
00208 int ret = ::write(write_fd(), &b, 1);
00209
00210 if (ret == -1) {
00211 if (errno == EAGAIN) {
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221 if (num_retries == 0) {
00222 log_warn("pipe appears to be full -- retrying write until success");
00223 }
00224
00225 if (++num_retries == 600) {
00226
00227 PANIC("slow reader on pipe: can't notify within 1 minute!");
00228 }
00229
00230
00231 if (lock) {
00232 lock->unlock();
00233 need_to_relock = true;
00234 }
00235
00236 usleep(100000);
00237 goto retry;
00238 } else {
00239 log_err("unexpected error writing to pipe fd %d: %s",
00240 write_fd(), strerror(errno));
00241 }
00242 } else if (ret == 0) {
00243 log_err("unexpected eof writing to pipe");
00244 } else {
00245 ASSERT(ret == 1);
00246
00247
00248
00249
00250
00251
00252
00253
00254 ++count_;
00255 if (!quiet_) {
00256 log_debug("notify count = %d", count_);
00257 }
00258 }
00259 }
00260
00261 }