Notifier.cc

Go to the documentation of this file.
00001 /*
00002  * IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. By
00003  * downloading, copying, installing or using the software you agree to
00004  * this license. If you do not agree to this license, do not download,
00005  * install, copy or use the software.
00006  * 
00007  * Intel Open Source License 
00008  * 
00009  * Copyright (c) 2004 Intel Corporation. All rights reserved. 
00010  * 
00011  * Redistribution and use in source and binary forms, with or without
00012  * modification, are permitted provided that the following conditions are
00013  * met:
00014  * 
00015  *   Redistributions of source code must retain the above copyright
00016  *   notice, this list of conditions and the following disclaimer.
00017  * 
00018  *   Redistributions in binary form must reproduce the above copyright
00019  *   notice, this list of conditions and the following disclaimer in the
00020  *   documentation and/or other materials provided with the distribution.
00021  * 
00022  *   Neither the name of the Intel Corporation nor the names of its
00023  *   contributors may be used to endorse or promote products derived from
00024  *   this software without specific prior written permission.
00025  *  
00026  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00027  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00028  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
00029  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE INTEL OR
00030  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00031  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00032  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00033  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00034  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00035  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00036  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00037  */
00038 
00039 #include <errno.h>
00040 #include <unistd.h>
00041 #include <sys/poll.h>
00042 #include "Notifier.h"
00043 #include "SpinLock.h"
00044 #include "io/IO.h"
00045 
00046 namespace oasys {
00047 
00048 Notifier::Notifier(const char* logpath, bool quiet)
00049     : Logger("Notifier", logpath), 
00050       count_(0),
00051       quiet_(quiet)
00052 {
00053     logpath_appendf("/notifier");
00054     
00055     if (pipe(pipe_) != 0) {
00056         PANIC("can't create pipe for notifier");
00057     }
00058 
00059     if (!quiet_) {
00060         log_debug("created pipe, fds: %d %d", pipe_[0], pipe_[1]);
00061     }
00062     
00063     for (int n = 0; n < 2; ++n) {
00064         if (IO::set_nonblocking(pipe_[n], true, quiet ? 0 : logpath_) != 0) 
00065         {
00066             PANIC("error setting fd %d to nonblocking: %s",
00067                      pipe_[n], strerror(errno));
00068         }
00069     }
00070 
00071     waiter_ = false;
00072 }
00073 
00074 Notifier::~Notifier()
00075 {
00076     int err;
00077     if (!quiet_) {
00078         log_debug("Notifier shutting down (closing fds %d %d)",
00079                   pipe_[0], pipe_[1]);
00080     }
00081 
00082     err = close(pipe_[0]);
00083     if (err != 0) {
00084         log_err("error closing pipe %d: %s", pipe_[0], strerror(errno));
00085     }
00086 
00087     err = close(pipe_[1]);
00088     if (err != 0) {
00089         log_err("error closing pipe %d: %s", pipe_[1], strerror(errno));
00090     }
00091 }
00092 
00093 void
00094 Notifier::drain_pipe(size_t bytes)
00095 {
00096     int    ret;
00097     char   buf[256];
00098     size_t bytes_drained = 0;
00099 
00100     while (true)
00101     {
00102         if (!quiet_) {
00103             log_debug("drain_pipe: attempting to drain %zu bytes", bytes);
00104         }
00105 
00106         ret = IO::read(read_fd(), buf,
00107                        (bytes == 0) ? sizeof(buf) :
00108                        std::min(sizeof(buf), bytes - bytes_drained));
00109         if (ret <= 0) {
00110             if (ret == IOAGAIN) {
00111                 PANIC("drain_pipe: trying to drain with not enough notify "
00112                       "calls, count = %u and trying to drain %zu bytes", 
00113                       count_, bytes);
00114                 break;
00115             } else {
00116                 log_crit("drain_pipe: unexpected error return from read: %s",
00117                          strerror(errno));
00118                 exit(1);
00119             }
00120         }
00121         
00122         bytes_drained += ret;
00123         if (!quiet_) {
00124             log_debug("drain_pipe: drained %zu/%zu byte(s) from pipe", 
00125                       bytes_drained, bytes);
00126         }
00127         count_ -= ret;
00128         
00129         if (bytes == 0 || bytes_drained == bytes) {
00130             break;
00131         }
00132         
00133         // More bytes were requested from the pipe than there are
00134         // bytes in the pipe. This means that the bytes requested is
00135         // bogus. This probably is the result of a race condition.
00136         if (ret < static_cast<int>(sizeof(buf))) {
00137             log_warn("drain_pipe: only possible to drain %zu bytes out of %zu! "
00138                      "race condition?", bytes_drained, bytes);
00139             break;
00140         }
00141     }
00142 
00143     if (!quiet_) {
00144         log_debug("drain pipe count = %d", count_);
00145     }
00146 }
00147 
00148 bool
00149 Notifier::wait(SpinLock* lock, int timeout)
00150 {
00151     if (waiter_) {
00152         PANIC("Notifier doesn't support multiple waiting threads");
00153     }
00154     waiter_ = true;
00155 
00156     if (!quiet_) {
00157         log_debug("attempting to wait on %p, count = %d", 
00158                   this, count_);
00159     }
00160     
00161     if (lock)
00162         lock->unlock();
00163 
00164     int ret = IO::poll_single(read_fd(), POLLIN, 0, timeout, 0, logpath_);
00165     if (ret < 0 && ret != IOTIMEOUT) {
00166         PANIC("fatal: error return from notifier poll: %s",
00167               strerror(errno));
00168     }
00169     
00170     if (lock) {
00171         lock->lock("Notifier::wait");
00172     }
00173 
00174     waiter_ = false;
00175     
00176     if (ret == IOTIMEOUT) {
00177         if (!quiet_) {
00178             log_debug("notifier wait timeout");
00179         }
00180         return false; // timeout
00181     } else {
00182         drain_pipe(1);
00183         if (!quiet_) {
00184             log_debug("notifier wait successfully notified");
00185         }
00186         return true;
00187     }
00188 }
00189 
00190 void
00191 Notifier::notify(SpinLock* lock)
00192 {
00193     char b = 0;
00194     int num_retries = 0;
00195 
00196     bool need_to_relock = false;
00197     
00198   retry:
00199     if (!quiet_) {
00200         log_debug("notifier notify");
00201     }
00202 
00203     // see the comment below
00204     if (need_to_relock && (lock != NULL)) {
00205         lock->lock("Notifier::notify");
00206     }
00207 
00208     int ret = ::write(write_fd(), &b, 1);
00209     
00210     if (ret == -1) {
00211         if (errno == EAGAIN) {
00212             // If the pipe is full, that probably means the consumer
00213             // is just slow, but keep trying for up to 30 seconds
00214             // because otherwise we will break the semantics of the
00215             // notifier.
00216             //
00217             // We need to release the lock before sleeping to give
00218             // another thread a chance to come in and drain, however
00219             // it is important that we re-take the lock before writing
00220             // to maintain the atomicity required by MsgQueue
00221             if (num_retries == 0) {
00222                 log_warn("pipe appears to be full -- retrying write until success");
00223             }
00224 
00225             if (++num_retries == 600) {
00226                 // bail after 1 minute of spinning
00227                 PANIC("slow reader on pipe: can't notify within 1 minute!");
00228             }
00229             
00230             // give it some time
00231             if (lock) {
00232                 lock->unlock();
00233                 need_to_relock = true;
00234             }
00235             
00236             usleep(100000);
00237             goto retry;
00238         } else {
00239             log_err("unexpected error writing to pipe fd %d: %s",
00240                     write_fd(), strerror(errno));
00241         }
00242     } else if (ret == 0) {
00243         log_err("unexpected eof writing to pipe");
00244     } else {
00245         ASSERT(ret == 1);
00246 
00247         // XXX/demmer potential bug here -- once the pipe has been
00248         // written to, this thread might context-switch out and the
00249         // other thread (that owns the notifier) could be woken up. at
00250         // which point the notifier object itself might be deleted.
00251         //
00252         // solutions: either be careful to only write at the end of
00253         // the fn, or (better) use a lock.
00254         ++count_;
00255         if (!quiet_) {
00256             log_debug("notify count = %d", count_);
00257         }
00258     }
00259 }
00260 
00261 } // namespace oasys

Generated on Fri Dec 22 14:48:00 2006 for DTN Reference Implementation by  doxygen 1.5.1