Notifier.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2004-2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 
00018 #include <errno.h>
00019 #include <unistd.h>
00020 #include <sys/poll.h>
00021 #include "Notifier.h"
00022 #include "SpinLock.h"
00023 #include "io/IO.h"
00024 
00025 namespace oasys {
00026 
00027 Notifier::Notifier(const char* logpath, bool quiet)
00028     : Logger("Notifier", logpath), 
00029       count_(0),
00030       quiet_(quiet),
00031       busy_notifiers_(0)
00032 {
00033     logpath_appendf("/notifier");
00034     
00035     if (pipe(pipe_) != 0) {
00036         PANIC("can't create pipe for notifier");
00037     }
00038 
00039     if (!quiet_) {
00040         log_debug("created pipe, fds: %d %d", pipe_[0], pipe_[1]);
00041     }
00042     
00043     for (int n = 0; n < 2; ++n) {
00044         if (IO::set_nonblocking(pipe_[n], true, quiet ? 0 : logpath_) != 0) 
00045         {
00046             PANIC("error setting fd %d to nonblocking: %s",
00047                      pipe_[n], strerror(errno));
00048         }
00049     }
00050 
00051     waiter_ = false;
00052 }
00053 
00054 Notifier::~Notifier()
00055 {
00056     int err;
00057     if (!quiet_) {
00058         log_debug("Notifier shutting down (closing fds %d %d)",
00059                   pipe_[0], pipe_[1]);
00060     }
00061 
00062     err = close(pipe_[0]);
00063     if (err != 0) {
00064         log_err("error closing pipe %d: %s", pipe_[0], strerror(errno));
00065     }
00066 
00067     err = close(pipe_[1]);
00068     if (err != 0) {
00069         log_err("error closing pipe %d: %s", pipe_[1], strerror(errno));
00070     }
00071     
00072     // Allow graceful deletion by a wait thread in a wait/notify scenario.
00073     //
00074     // Upon notification by some "finished" signal, a wait thread
00075     // may decide to delete this object.
00076     //
00077     // We want to avoid having that happen while
00078     // notification of the "finished" message is still in progress.
00079     //
00080     // This deletion is only safe if no threads, other than the waiter,
00081     // receiving the "finished" signal will use the notify object
00082     // after the "finished" signal is sent.
00083     
00084     while(atomic_cmpxchg32(&busy_notifiers_, 0, 1) != 0)
00085     {
00086         usleep(100000);
00087     }
00088 }
00089 
00090 void
00091 Notifier::drain_pipe(size_t bytes)
00092 {
00093     int    ret;
00094     char   buf[256];
00095     size_t bytes_drained = 0;
00096 
00097     while (true)
00098     {
00099         if (!quiet_) {
00100             log_debug("drain_pipe: attempting to drain %zu bytes", bytes);
00101         }
00102 
00103         ret = IO::read(read_fd(), buf,
00104                        (bytes == 0) ? sizeof(buf) :
00105                        std::min(sizeof(buf), bytes - bytes_drained));
00106         if (ret <= 0) {
00107             if (ret == IOAGAIN) {
00108                 PANIC("drain_pipe: trying to drain with not enough notify "
00109                       "calls, count = %u and trying to drain %zu bytes", 
00110                       count_, bytes);
00111                 break;
00112             } else {
00113                 log_crit("drain_pipe: unexpected error return from read: %s",
00114                          strerror(errno));
00115                 exit(1);
00116             }
00117         }
00118         
00119         bytes_drained += ret;
00120         if (!quiet_) {
00121             log_debug("drain_pipe: drained %zu/%zu byte(s) from pipe", 
00122                       bytes_drained, bytes);
00123         }
00124         count_ -= ret;
00125         
00126         if (bytes == 0 || bytes_drained == bytes) {
00127             break;
00128         }
00129         
00130         // More bytes were requested from the pipe than there are
00131         // bytes in the pipe. This means that the bytes requested is
00132         // bogus. This probably is the result of a race condition.
00133         if (ret < static_cast<int>(sizeof(buf))) {
00134             log_warn("drain_pipe: only possible to drain %zu bytes out of %zu! "
00135                      "race condition?", bytes_drained, bytes);
00136             break;
00137         }
00138     }
00139 
00140     if (!quiet_) {
00141         log_debug("drain pipe count = %d", count_);
00142     }
00143 }
00144 
00145 bool
00146 Notifier::wait(SpinLock* lock, int timeout, bool drain_the_pipe)
00147 {
00148     if (waiter_) {
00149         PANIC("Notifier doesn't support multiple waiting threads");
00150     }
00151     waiter_ = true;
00152 
00153     if (!quiet_) {
00154         log_debug("attempting to wait on %p, count = %d", 
00155                   this, count_);
00156     }
00157     
00158     if (lock)
00159         lock->unlock();
00160 
00161     int ret = IO::poll_single(read_fd(), POLLIN, 0, timeout, 0, logpath_);
00162     if (ret < 0 && ret != IOTIMEOUT) {
00163         PANIC("fatal: error return from notifier poll: %s",
00164               strerror(errno));
00165     }
00166     
00167     if (lock) {
00168         lock->lock("Notifier::wait");
00169     }
00170 
00171     waiter_ = false;
00172     
00173     if (ret == IOTIMEOUT) {
00174         if (!quiet_) {
00175             log_debug("notifier wait timeout");
00176         }
00177         return false; // timeout
00178     } else {
00179         if (drain_the_pipe)
00180         {
00181             drain_pipe(1);
00182         }
00183         if (!quiet_) {
00184             log_debug("notifier wait successfully notified");
00185         }
00186         return true;
00187     }
00188 }
00189 
00190 void
00191 Notifier::notify(SpinLock* lock)
00192 {
00193         atomic_incr(&busy_notifiers_);
00194     char b = 0;
00195     int num_retries = 0;
00196 
00197     bool need_to_relock = false;
00198     
00199   retry:
00200     if (!quiet_) {
00201         log_debug("notifier notify");
00202     }
00203 
00204     // see the comment below
00205     if (need_to_relock && (lock != NULL)) {
00206         lock->lock("Notifier::notify");
00207     }
00208 
00209     int ret = ::write(write_fd(), &b, 1);
00210     
00211     if (ret == -1) {
00212         if (errno == EAGAIN) {
00213             // If the pipe is full, that probably means the consumer
00214             // is just slow, but keep trying for up to 30 seconds
00215             // because otherwise we will break the semantics of the
00216             // notifier.
00217             //
00218             // We need to release the lock before sleeping to give
00219             // another thread a chance to come in and drain, however
00220             // it is important that we re-take the lock before writing
00221             // to maintain the atomicity required by MsgQueue
00222             if (num_retries == 0) {
00223                 log_warn("pipe appears to be full -- retrying write until success"); 
00224             }
00225 
00226             if (++num_retries == 600) {
00227                 // bail after 1 minute of spinning
00228                 PANIC("slow reader on pipe: can't notify within 1 minute!");
00229             }
00230             
00231             // give it some time
00232             if (lock) {
00233                 lock->unlock();
00234                 need_to_relock = true;
00235             }
00236             
00237             usleep(100000);
00238             goto retry;
00239         } else {
00240             log_err("unexpected error writing to pipe fd %d: %s",
00241                     write_fd(), strerror(errno));
00242         }
00243     } else if (ret == 0) {
00244         log_err("unexpected eof writing to pipe");
00245     } else {
00246         ASSERT(ret == 1);
00247 
00248         // XXX/demmer potential bug here -- once the pipe has been
00249         // written to, this thread might context-switch out and the
00250         // other thread (that owns the notifier) could be woken up. at
00251         // which point the notifier object itself might be deleted.
00252         //
00253         // solutions: either be careful to only write at the end of
00254         // the fn, or (better) use a lock.
00255         ++count_;
00256         if (!quiet_) {
00257             log_debug("notify count = %d", count_);
00258         }
00259     }
00260     atomic_decr(&busy_notifiers_);
00261 }
00262 
00263 } // namespace oasys

Generated on Thu Jun 7 16:56:51 2007 for DTN Reference Implementation by  doxygen 1.5.1