00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <errno.h>
00019 #include <unistd.h>
00020 #include <sys/poll.h>
00021 #include "Notifier.h"
00022 #include "SpinLock.h"
00023 #include "io/IO.h"
00024
00025 namespace oasys {
00026
00027 Notifier::Notifier(const char* logpath, bool quiet)
00028 : Logger("Notifier", logpath),
00029 count_(0),
00030 quiet_(quiet),
00031 busy_notifiers_(0)
00032 {
00033 logpath_appendf("/notifier");
00034
00035 if (pipe(pipe_) != 0) {
00036 PANIC("can't create pipe for notifier");
00037 }
00038
00039 if (!quiet_) {
00040 log_debug("created pipe, fds: %d %d", pipe_[0], pipe_[1]);
00041 }
00042
00043 for (int n = 0; n < 2; ++n) {
00044 if (IO::set_nonblocking(pipe_[n], true, quiet ? 0 : logpath_) != 0)
00045 {
00046 PANIC("error setting fd %d to nonblocking: %s",
00047 pipe_[n], strerror(errno));
00048 }
00049 }
00050
00051 waiter_ = false;
00052 }
00053
00054 Notifier::~Notifier()
00055 {
00056 int err;
00057 if (!quiet_) {
00058 log_debug("Notifier shutting down (closing fds %d %d)",
00059 pipe_[0], pipe_[1]);
00060 }
00061
00062 err = close(pipe_[0]);
00063 if (err != 0) {
00064 log_err("error closing pipe %d: %s", pipe_[0], strerror(errno));
00065 }
00066
00067 err = close(pipe_[1]);
00068 if (err != 0) {
00069 log_err("error closing pipe %d: %s", pipe_[1], strerror(errno));
00070 }
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084 while(atomic_cmpxchg32(&busy_notifiers_, 0, 1) != 0)
00085 {
00086 usleep(100000);
00087 }
00088 }
00089
00090 void
00091 Notifier::drain_pipe(size_t bytes)
00092 {
00093 int ret;
00094 char buf[256];
00095 size_t bytes_drained = 0;
00096
00097 while (true)
00098 {
00099 if (!quiet_) {
00100 log_debug("drain_pipe: attempting to drain %zu bytes", bytes);
00101 }
00102
00103 ret = IO::read(read_fd(), buf,
00104 (bytes == 0) ? sizeof(buf) :
00105 std::min(sizeof(buf), bytes - bytes_drained));
00106 if (ret <= 0) {
00107 if (ret == IOAGAIN) {
00108 PANIC("drain_pipe: trying to drain with not enough notify "
00109 "calls, count = %u and trying to drain %zu bytes",
00110 count_, bytes);
00111 break;
00112 } else {
00113 log_crit("drain_pipe: unexpected error return from read: %s",
00114 strerror(errno));
00115 exit(1);
00116 }
00117 }
00118
00119 bytes_drained += ret;
00120 if (!quiet_) {
00121 log_debug("drain_pipe: drained %zu/%zu byte(s) from pipe",
00122 bytes_drained, bytes);
00123 }
00124 count_ -= ret;
00125
00126 if (bytes == 0 || bytes_drained == bytes) {
00127 break;
00128 }
00129
00130
00131
00132
00133 if (ret < static_cast<int>(sizeof(buf))) {
00134 log_warn("drain_pipe: only possible to drain %zu bytes out of %zu! "
00135 "race condition?", bytes_drained, bytes);
00136 break;
00137 }
00138 }
00139
00140 if (!quiet_) {
00141 log_debug("drain pipe count = %d", count_);
00142 }
00143 }
00144
00145 bool
00146 Notifier::wait(SpinLock* lock, int timeout, bool drain_the_pipe)
00147 {
00148 if (waiter_) {
00149 PANIC("Notifier doesn't support multiple waiting threads");
00150 }
00151 waiter_ = true;
00152
00153 if (!quiet_) {
00154 log_debug("attempting to wait on %p, count = %d",
00155 this, count_);
00156 }
00157
00158 if (lock)
00159 lock->unlock();
00160
00161 int ret = IO::poll_single(read_fd(), POLLIN, 0, timeout, 0, logpath_);
00162 if (ret < 0 && ret != IOTIMEOUT) {
00163 PANIC("fatal: error return from notifier poll: %s",
00164 strerror(errno));
00165 }
00166
00167 if (lock) {
00168 lock->lock("Notifier::wait");
00169 }
00170
00171 waiter_ = false;
00172
00173 if (ret == IOTIMEOUT) {
00174 if (!quiet_) {
00175 log_debug("notifier wait timeout");
00176 }
00177 return false;
00178 } else {
00179 if (drain_the_pipe)
00180 {
00181 drain_pipe(1);
00182 }
00183 if (!quiet_) {
00184 log_debug("notifier wait successfully notified");
00185 }
00186 return true;
00187 }
00188 }
00189
00190 void
00191 Notifier::notify(SpinLock* lock)
00192 {
00193 atomic_incr(&busy_notifiers_);
00194 char b = 0;
00195 int num_retries = 0;
00196
00197 bool need_to_relock = false;
00198
00199 retry:
00200 if (!quiet_) {
00201 log_debug("notifier notify");
00202 }
00203
00204
00205 if (need_to_relock && (lock != NULL)) {
00206 lock->lock("Notifier::notify");
00207 }
00208
00209 int ret = ::write(write_fd(), &b, 1);
00210
00211 if (ret == -1) {
00212 if (errno == EAGAIN) {
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222 if (num_retries == 0) {
00223 log_warn("pipe appears to be full -- retrying write until success");
00224 }
00225
00226 if (++num_retries == 600) {
00227
00228 PANIC("slow reader on pipe: can't notify within 1 minute!");
00229 }
00230
00231
00232 if (lock) {
00233 lock->unlock();
00234 need_to_relock = true;
00235 }
00236
00237 usleep(100000);
00238 goto retry;
00239 } else {
00240 log_err("unexpected error writing to pipe fd %d: %s",
00241 write_fd(), strerror(errno));
00242 }
00243 } else if (ret == 0) {
00244 log_err("unexpected eof writing to pipe");
00245 } else {
00246 ASSERT(ret == 1);
00247
00248
00249
00250
00251
00252
00253
00254
00255 ++count_;
00256 if (!quiet_) {
00257 log_debug("notify count = %d", count_);
00258 }
00259 }
00260 atomic_decr(&busy_notifiers_);
00261 }
00262
00263 }