BundleDaemon.cc

Go to the documentation of this file.
00001 /*
00002  * IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. By
00003  * downloading, copying, installing or using the software you agree to
00004  * this license. If you do not agree to this license, do not download,
00005  * install, copy or use the software.
00006  * 
00007  * Intel Open Source License 
00008  * 
00009  * Copyright (c) 2004 Intel Corporation. All rights reserved. 
00010  * 
00011  * Redistribution and use in source and binary forms, with or without
00012  * modification, are permitted provided that the following conditions are
00013  * met:
00014  * 
00015  *   Redistributions of source code must retain the above copyright
00016  *   notice, this list of conditions and the following disclaimer.
00017  * 
00018  *   Redistributions in binary form must reproduce the above copyright
00019  *   notice, this list of conditions and the following disclaimer in the
00020  *   documentation and/or other materials provided with the distribution.
00021  * 
00022  *   Neither the name of the Intel Corporation nor the names of its
00023  *   contributors may be used to endorse or promote products derived from
00024  *   this software without specific prior written permission.
00025  *  
00026  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00027  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00028  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
00029  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE INTEL OR
00030  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00031  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00032  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00033  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00034  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00035  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00036  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00037  */
00038 
00039 #include <oasys/util/Time.h>
00040 
00041 #include "Bundle.h"
00042 #include "BundleActions.h"
00043 #include "BundleEvent.h"
00044 #include "BundleDaemon.h"
00045 #include "BundleStatusReport.h"
00046 #include "BundleTimestamp.h"
00047 #include "CustodySignal.h"
00048 #include "ExpirationTimer.h"
00049 #include "FragmentManager.h"
00050 #include "contacts/Contact.h"
00051 #include "contacts/ContactManager.h"
00052 #include "reg/AdminRegistration.h"
00053 #include "reg/APIRegistration.h"
00054 #include "reg/Registration.h"
00055 #include "reg/RegistrationTable.h"
00056 #include "routing/BundleRouter.h"
00057 #include "routing/RouteTable.h"
00058 #include "storage/BundleStore.h"
00059 #include "storage/RegistrationStore.h"
00060 
00061 namespace dtn {
00062 
00063 BundleDaemon* BundleDaemon::instance_ = NULL;
00064 BundleDaemon::Params BundleDaemon::params_;
00065 
00066 //----------------------------------------------------------------------
00067 BundleDaemon::BundleDaemon()
00068     : BundleEventHandler("BundleDaemon", "/dtn/bundle/daemon"),
00069       Thread("BundleDaemon", CREATE_JOINABLE)
00070 {
00071     // default local eid
00072     // XXX/demmer fixme
00073     local_eid_.assign("dtn://localhost.dtn");
00074 
00075     memset(&stats_, 0, sizeof(stats_));
00076 
00077     pending_bundles_ = new BundleList("pending_bundles");
00078     custody_bundles_ = new BundleList("custody_bundles");
00079 
00080     contactmgr_ = new ContactManager();
00081     fragmentmgr_ = new FragmentManager();
00082     reg_table_ = new RegistrationTable();
00083 
00084     router_ = 0;
00085 
00086     app_shutdown_proc_ = NULL;
00087     app_shutdown_data_ = NULL;
00088 }
00089 
00090 //----------------------------------------------------------------------
00091 void
00092 BundleDaemon::do_init()
00093 {
00094     actions_ = new BundleActions();
00095     eventq_ = new oasys::MsgQueue<BundleEvent*>(logpath_);
00096     eventq_->notify_when_empty();
00097 }
00098 
00099 //----------------------------------------------------------------------
00100 void
00101 BundleDaemon::post(BundleEvent* event)
00102 {
00103     instance_->post_event(event);
00104 }
00105 
00106 //----------------------------------------------------------------------
00107 void
00108 BundleDaemon::post_at_head(BundleEvent* event)
00109 {
00110     instance_->post_event(event, false);
00111 }
00112 
00113 //----------------------------------------------------------------------
00114 bool
00115 BundleDaemon::post_and_wait(BundleEvent* event,
00116                             oasys::Notifier* notifier,
00117                             int timeout, bool at_back)
00118 {
00119     /*
00120      * Make sure that we're either already started up or are about to
00121      * start. Otherwise the wait call below would block indefinitely.
00122      */
00123     ASSERT(! oasys::Thread::start_barrier_enabled());
00124     
00125     ASSERT(event->processed_notifier_ == NULL);
00126     event->processed_notifier_ = notifier;
00127     if (at_back) {
00128         post(event);
00129     } else {
00130         post_at_head(event);
00131     }
00132     return notifier->wait(NULL, timeout);
00133 }
00134 
00135 //----------------------------------------------------------------------
00136 void
00137 BundleDaemon::post_event(BundleEvent* event, bool at_back)
00138 {
00139     log_debug("posting event (%p) with type %s (at %s)",
00140               event, event->type_str(), at_back ? "back" : "head");
00141     eventq_->push(event, at_back);
00142 }
00143 
00144 //----------------------------------------------------------------------
00145 void
00146 BundleDaemon::get_routing_state(oasys::StringBuffer* buf)
00147 {
00148     router_->get_routing_state(buf);
00149     contactmgr_->dump(buf);
00150 }
00151 
00152 //----------------------------------------------------------------------
00153 void
00154 BundleDaemon::get_bundle_stats(oasys::StringBuffer* buf)
00155 {
00156     buf->appendf("%zu pending -- "
00157                  "%zu custody -- "
00158                  "%u received -- "
00159                  "%u delivered -- "
00160                  "%u generated -- "
00161                  "%u transmitted -- "
00162                  "%u expired -- "
00163                  "%u duplicate",
00164                  pending_bundles()->size(),
00165                  custody_bundles()->size(),
00166                  stats_.bundles_received_,
00167                  stats_.bundles_delivered_,
00168                  stats_.bundles_generated_,
00169                  stats_.bundles_transmitted_,
00170                  stats_.bundles_expired_,
00171                  stats_.duplicate_bundles_);
00172 }
00173 
00174 //----------------------------------------------------------------------
00175 void
00176 BundleDaemon::get_daemon_stats(oasys::StringBuffer* buf)
00177 {
00178     buf->appendf("%zu pending_events -- "
00179                  "%u processed_events",
00180                  eventq_->size(),
00181                  stats_.events_processed_);
00182 }
00183 
00184 //----------------------------------------------------------------------
00185 void
00186 BundleDaemon::reset_stats()
00187 {
00188     memset(&stats_, 0, sizeof(stats_));
00189 
00190     oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::reset_stats");
00191     
00192     const LinkSet* links = contactmgr_->links();
00193     LinkSet::const_iterator iter;
00194     for (iter = links->begin(); iter != links->end(); ++iter) {
00195         (*iter)->reset_stats();
00196     }
00197 }
00198 
00199 //----------------------------------------------------------------------
00200 void
00201 BundleDaemon::generate_status_report(Bundle* orig_bundle,
00202                                      status_report_flag_t flag,
00203                                      status_report_reason_t reason)
00204 {
00205     log_debug("generating return receipt status report, "
00206               "flag = 0x%x, reason = 0x%x", flag, reason);
00207         
00208     Bundle* report = new Bundle();
00209     BundleStatusReport::create_status_report(report, orig_bundle,
00210                                              local_eid_, flag, reason);
00211     
00212     BundleReceivedEvent e(report, EVENTSRC_ADMIN, report->payload_.length());
00213     handle_event(&e);
00214 }
00215 
00216 //----------------------------------------------------------------------
00217 void
00218 BundleDaemon::generate_custody_signal(Bundle* bundle, bool succeeded,
00219                                       custody_signal_reason_t reason)
00220 {
00221     if (bundle->local_custody_) {
00222         log_err("send_custody_signal(*%p): already have local custody",
00223                 bundle);
00224         return;
00225     }
00226 
00227     if (bundle->custodian_.equals(EndpointID::NULL_EID())) {
00228         log_err("send_custody_signal(*%p): current custodian is NULL_EID",
00229                 bundle);
00230         return;
00231     }
00232     
00233     Bundle* signal = new Bundle();
00234     CustodySignal::create_custody_signal(signal, bundle, local_eid_,
00235                                          succeeded, reason);
00236     
00237     BundleReceivedEvent e(signal, EVENTSRC_ADMIN, signal->payload_.length());
00238     handle_event(&e);
00239 }
00240 
00241 //----------------------------------------------------------------------
00242 void
00243 BundleDaemon::cancel_custody_timers(Bundle* bundle)
00244 {
00245     oasys::ScopeLock l(&bundle->lock_, "BundleDaemon::cancel_custody_timers");
00246     
00247     CustodyTimerVec::iterator iter;
00248     for (iter =  bundle->custody_timers_.begin();
00249          iter != bundle->custody_timers_.end();
00250          ++iter)
00251     {
00252         bool ok = (*iter)->cancel();
00253         if (!ok) {
00254             log_crit("unexpected error cancelling custody timer for bundle *%p",
00255                      bundle);
00256         }
00257         
00258         // the timer will be deleted when it bubbles to the top of the
00259         // timer queue
00260     }
00261     
00262     bundle->custody_timers_.clear();
00263 }
00264 
00265 //----------------------------------------------------------------------
00266 void
00267 BundleDaemon::accept_custody(Bundle* bundle)
00268 {
00269     log_info("accept_custody *%p", bundle);
00270     
00271     if (bundle->local_custody_) {
00272         log_err("accept_custody(*%p): already have local custody",
00273                 bundle);
00274         return;
00275     }
00276 
00277     if (bundle->custodian_.equals(local_eid_)) {
00278         log_err("send_custody_signal(*%p): "
00279                 "current custodian is already local_eid",
00280                 bundle);
00281         return;
00282     }
00283     
00284     // send a custody acceptance signal to the current custodian (if
00285     // it is someone, and not the null eid)
00286     if (! bundle->custodian_.equals(EndpointID::NULL_EID())) {
00287         generate_custody_signal(bundle, true, BundleProtocol::CUSTODY_NO_ADDTL_INFO);
00288     }
00289 
00290     // now we mark the bundle to indicate that we have custody and add
00291     // it to the custody bundles list
00292     bundle->custodian_.assign(local_eid_);
00293     bundle->local_custody_ = true;
00294     actions_->store_update(bundle);
00295     
00296     custody_bundles_->push_back(bundle);
00297 
00298     // finally, if the bundle requested custody acknowledgements,
00299     // deliver them now
00300     if (bundle->custody_rcpt_) {
00301         generate_status_report(bundle, BundleProtocol::STATUS_CUSTODY_ACCEPTED);
00302     }
00303 }
00304 
00305 //----------------------------------------------------------------------
00306 void
00307 BundleDaemon::release_custody(Bundle* bundle)
00308 {
00309     log_info("release_custody *%p", bundle);
00310     
00311     if (!bundle->local_custody_) {
00312         log_err("release_custody(*%p): don't have local custody",
00313                 bundle);
00314         return;
00315     }
00316 
00317     cancel_custody_timers(bundle);
00318 
00319     bundle->custodian_.assign(EndpointID::NULL_EID());
00320     bundle->local_custody_ = false;
00321     actions_->store_update(bundle);
00322     
00323     custody_bundles_->erase(bundle);
00324 }
00325 
00326 //----------------------------------------------------------------------
00327 void
00328 BundleDaemon::deliver_to_registration(Bundle* bundle,
00329                                       Registration* registration)
00330 {
00331     // tells routers that this Bundle has been taken care of
00332     // by the daemon already
00333     bundle->owner_ = "daemon";
00334 
00335     if (bundle->is_fragment_) {
00336         log_debug("deferring delivery of bundle *%p to registration %d (%s) "
00337                   "since bundle is a fragment",
00338                   bundle, registration->regid(),
00339                   registration->endpoint().c_str());
00340         
00341         fragmentmgr_->process_for_reassembly(bundle);
00342         return;
00343     }
00344 
00345     log_debug("delivering bundle *%p to registration %d (%s)",
00346               bundle, registration->regid(),
00347               registration->endpoint().c_str());
00348     
00349     registration->deliver_bundle(bundle);
00350 }
00351 
00352 //----------------------------------------------------------------------
00353 void
00354 BundleDaemon::check_registrations(Bundle* bundle)
00355 {
00356     int num;
00357     log_debug("checking for matching registrations for bundle *%p", bundle);
00358 
00359     RegistrationList matches;
00360     RegistrationList::iterator iter;
00361 
00362     num = reg_table_->get_matching(bundle->dest_, &matches);
00363 
00364     for (iter = matches.begin(); iter != matches.end(); ++iter)
00365     {
00366         Registration* registration = *iter;
00367         deliver_to_registration(bundle, registration);
00368     }
00369 }
00370 
00371 //----------------------------------------------------------------------
00372 void
00373 BundleDaemon::handle_bundle_received(BundleReceivedEvent* event)
00374 {
00375     Bundle* bundle = event->bundleref_.object();
00376 
00377     // update statistics and store an appropriate event descriptor
00378     const char* source_str = "";
00379     switch (event->source_) {
00380     case EVENTSRC_PEER:
00381         stats_.bundles_received_++;
00382         break;
00383         
00384     case EVENTSRC_APP:
00385         stats_.bundles_received_++;
00386         source_str = " (from app)";
00387         break;
00388         
00389     case EVENTSRC_STORE:
00390         source_str = " (from data store)";
00391         break;
00392         
00393     case EVENTSRC_ADMIN:
00394         stats_.bundles_generated_++;
00395         source_str = " (generated)";
00396         break;
00397         
00398     case EVENTSRC_FRAGMENTATION:
00399         stats_.bundles_generated_++;
00400         source_str = " (from fragmentation)";
00401         break;
00402 
00403     default:
00404         NOTREACHED;
00405     }
00406 
00407     // if debug logging is enabled, dump out a verbose printing of the
00408     // bundle, including all options, otherwise, a more terse log
00409     if (log_enabled(oasys::LOG_DEBUG)) {
00410         oasys::StaticStringBuffer<1024> buf;
00411         buf.appendf("BUNDLE_RECEIVED%s: (%zu bytes recvd)\n",
00412                     source_str, event->bytes_received_);
00413         bundle->format_verbose(&buf);
00414         log_multiline(oasys::LOG_DEBUG, buf.c_str());
00415     } else {
00416         log_info("BUNDLE_RECEIVED%s *%p (%zu bytes recvd)",
00417                  source_str, bundle, event->bytes_received_);
00418     }
00419     
00420     // log a warning if the bundle doesn't have any expiration time or
00421     // has a creation time that's in the future. in either case, we
00422     // proceed as normal
00423     if (bundle->expiration_ == 0) {
00424         log_warn("bundle id %d arrived with zero expiration time",
00425                  bundle->bundleid_);
00426     }
00427 
00428     u_int32_t now = BundleTimestamp::get_current_time();
00429     if ((bundle->creation_ts_.seconds_ > now) &&
00430         (bundle->creation_ts_.seconds_ - now > 30000))
00431     {
00432         log_warn("bundle id %d arrived with creation time in the future "
00433                  "(%u > %u)",
00434                  bundle->bundleid_, bundle->creation_ts_.seconds_, now);
00435     }
00436 
00437     /*
00438      * Log a warning if the convergence layer didn't close the bundle
00439      * payload file.
00440      */
00441     if (bundle->payload_.is_file_open()) {
00442         log_warn("bundle id %d arrived with payload file still open",
00443                  bundle->bundleid_);
00444         bundle->payload_.close_file();
00445     }
00446     
00447     /*
00448      * Send the reception receipt 
00449      */
00450     if (bundle->receive_rcpt_ && (event->source_ != EVENTSRC_STORE)) {
00451         generate_status_report(bundle, BundleProtocol::STATUS_RECEIVED);
00452     }
00453 
00454     /*
00455      * Check if the bundle isn't complete. If so, do reactive
00456      * fragmentation.
00457      */
00458     if (event->bytes_received_ != bundle->payload_.length()) {
00459         log_debug("partial bundle, making reactive fragment of %zu bytes",
00460                   event->bytes_received_);
00461 
00462         fragmentmgr_->convert_to_fragment(bundle, event->bytes_received_);
00463     }
00464 
00465     /*
00466      * Check if the bundle is a duplicate, i.e. shares a source id,
00467      * timestamp, and fragmentation information with some other bundle
00468      * in the system.
00469      */
00470     Bundle* duplicate = find_duplicate(bundle);
00471     if (duplicate != NULL) {
00472         log_notice("got duplicate bundle: %s -> %s creation %u.%u",
00473                    bundle->source_.c_str(),
00474                    bundle->dest_.c_str(),
00475                    bundle->creation_ts_.seconds_,
00476                    bundle->creation_ts_.seqno_);
00477 
00478         stats_.duplicate_bundles_++;
00479         
00480         if (bundle->custody_requested_ && duplicate->local_custody_)
00481         {
00482             generate_custody_signal(bundle, false,
00483                                     BundleProtocol::CUSTODY_REDUNDANT_RECEPTION);
00484         }
00485 
00486         // since we don't want the bundle to be processed by the rest
00487         // of the system, we mark the event as daemon_only (meaning it
00488         // won't be forwarded to routers) and return, which should
00489         // eventually remove all references on the bundle and then it
00490         // will be deleted
00491         event->daemon_only_ = true;
00492         return;
00493     }
00494     
00495     /*
00496      * Add the bundle to the master pending queue and the data store
00497      * (unless the bundle was just reread from the data store on startup)
00498      *
00499      * Note that if add_to_pending returns false, the bundle has
00500      * already expired so we immediately return instead of trying to
00501      * deliver and/or forward the bundle. Otherwise there's a chance
00502      * that expired bundles will persist in the network.
00503      */
00504     bool ok_to_route =
00505         add_to_pending(bundle, (event->source_ != EVENTSRC_STORE));
00506 
00507     if (!ok_to_route) {
00508         event->daemon_only_ = true;
00509         return;
00510     }
00511     
00512     /*
00513      * If the bundle is a custody bundle and we're configured to take
00514      * custody, then do so. In case the event was delivered due to a
00515      * reload from the data store, then if we have local custody, make
00516      * sure it's added to the custody bundles list.
00517      */
00518     if (bundle->custody_requested_ && params_.accept_custody_)
00519     {
00520         if (event->source_ != EVENTSRC_STORE) {
00521             accept_custody(bundle);
00522         
00523         } else if (bundle->local_custody_) {
00524             custody_bundles_->push_back(bundle);
00525         }
00526     }
00527     
00528     /*
00529      * Deliver the bundle to any local registrations that it matches
00530      */
00531     check_registrations(bundle);
00532     
00533     /*
00534      * Finally, bounce out so the router(s) can do something further
00535      * with the bundle in response to the event.
00536      */
00537 }
00538 
00539 //----------------------------------------------------------------------
00540 void
00541 BundleDaemon::handle_bundle_transmitted(BundleTransmittedEvent* event)
00542 {
00543     Bundle* bundle = event->bundleref_.object();
00544     Link* link = event->contact_->link();
00545     
00546     /*
00547      * Update statistics. Note that the link's inflight length must
00548      * always be decremented by the full formatted size of the bundle,
00549      * yet the transmitted length is only the amount reported by the
00550      * event.
00551      */
00552     size_t total_len = BundleProtocol::formatted_length(bundle);
00553     
00554     stats_.bundles_transmitted_++;
00555     link->stats()->bundles_transmitted_++;
00556     link->stats()->bundles_inflight_--;
00557 
00558     link->stats()->bytes_transmitted_ += event->bytes_sent_;
00559     link->stats()->bytes_inflight_ -= total_len;
00560     
00561     log_info("BUNDLE_TRANSMITTED id:%d (%zu bytes_sent/%zu reliable) -> %s (%s)",
00562              bundle->bundleid_,
00563              event->bytes_sent_,
00564              event->reliably_sent_,
00565              link->name(),
00566              link->nexthop());
00567 
00568     /*
00569      * If we're configured to wait for reliable transmission, then
00570      * check the special case where we transmitted some or all a
00571      * bundle but nothing was acked. In this case, we create a
00572      * transmission failed event in the forwarding log and don't do
00573      * any of the rest of the processing below.
00574      *
00575      * Note also the special care taken to handle a zero-length
00576      * bundle. XXX/demmer this should all go away when the lengths
00577      * include both the header length and the payload length (in which
00578      * case it's never zero).
00579      *
00580      * XXX/demmer a better thing to do (maybe) would be to record the
00581      * lengths in the forwarding log as part of the transmitted entry.
00582      */
00583     if (params_.retry_reliable_unacked_ &&
00584         link->is_reliable() &&
00585         (event->bytes_sent_ != event->reliably_sent_) &&
00586         (event->reliably_sent_ == 0))
00587     {
00588         bundle->fwdlog_.update(link, ForwardingInfo::TRANSMIT_FAILED);
00589         return;
00590     }
00591 
00592     /*
00593      * Update the forwarding log
00594      */
00595     bundle->fwdlog_.update(link,
00596                            ForwardingInfo::TRANSMITTED);
00597                             
00598     /*
00599      * Grab the updated forwarding log information so we can find the
00600      * custody timer information (if any).
00601      */
00602     ForwardingInfo fwdinfo;
00603     bool ok = bundle->fwdlog_.get_latest_entry(link, &fwdinfo);
00604     ASSERTF(ok, "no forwarding log entry for transmission");
00605     ASSERT(fwdinfo.state_ == ForwardingInfo::TRANSMITTED);
00606     
00607     /*
00608      * Check for reactive fragmentation. If the bundle was only
00609      * partially sent, then a new bundle received event for the tail
00610      * part of the bundle will be processed immediately after this
00611      * event. For reliable convergence latyer
00612      */
00613     if (link->reliable_) {
00614         fragmentmgr_->try_to_reactively_fragment(bundle, event->reliably_sent_);
00615     } else {
00616         fragmentmgr_->try_to_reactively_fragment(bundle, event->bytes_sent_);
00617     }
00618 
00619     /*
00620      * Generate the forwarding status report if requested
00621      */
00622     if (bundle->forward_rcpt_) {
00623         generate_status_report(bundle, BundleProtocol::STATUS_FORWARDED);
00624     }
00625 
00626     /*
00627      * Schedule a custody timer if we have custody.
00628      */
00629     if (bundle->local_custody_) {
00630         bundle->custody_timers_.push_back(
00631             new CustodyTimer(fwdinfo.timestamp_,
00632                              fwdinfo.custody_timer_,
00633                              bundle, link));
00634         
00635         // XXX/TODO: generate failed custodial signal for "forwarded
00636         // over unidirectional link" if the bundle has the retention
00637         // constraint "custody accepted" and all of the nodes in the
00638         // minimum reception group of the endpoint selected for
00639         // forwarding are known to be unable to send bundles back to
00640         // this node
00641     }
00642 
00643     /*
00644      * Check if we should can delete the bundle from the pending list,
00645      * i.e. we don't have custody and it's not being transmitted
00646      * anywhere else.
00647      */
00648     try_delete_from_pending(bundle);
00649 }
00650 
00651 //----------------------------------------------------------------------
00652 void
00653 BundleDaemon::handle_bundle_transmit_failed(BundleTransmitFailedEvent* event)
00654 {
00655     /*
00656      * The bundle was delivered to a next-hop contact.
00657      */
00658     Bundle* bundle = event->bundleref_.object();
00659 
00660     log_info("BUNDLE_TRANSMIT_FAILED id:%d -> %s (%s)",
00661              bundle->bundleid_,
00662              event->contact_->link()->name(),
00663              event->contact_->link()->nexthop());
00664     
00665     /*
00666      * Update the forwarding log so routers know to try to retransmit
00667      * on the next contact.
00668      */
00669     bundle->fwdlog_.update(event->contact_->link(),
00670                            ForwardingInfo::TRANSMIT_FAILED);
00671 
00672     /*
00673      * Fall through to notify the routers
00674      */
00675 }
00676 
00677 //----------------------------------------------------------------------
00678 void
00679 BundleDaemon::handle_bundle_delivered(BundleDeliveredEvent* event)
00680 {
00681     // update statistics
00682     stats_.bundles_delivered_++;
00683     
00684     /*
00685      * The bundle was delivered to a registration.
00686      */
00687     Bundle* bundle = event->bundleref_.object();
00688 
00689     log_info("BUNDLE_DELIVERED id:%d (%zu bytes) -> regid %d (%s)",
00690              bundle->bundleid_, bundle->payload_.length(),
00691              event->registration_->regid(),
00692              event->registration_->endpoint().c_str());
00693 
00694     /*
00695      * Generate the delivery status report if requested.
00696      */
00697     if (bundle->delivery_rcpt_)
00698     {
00699         generate_status_report(bundle, BundleProtocol::STATUS_DELIVERED);
00700     }
00701 
00702     /*
00703      * If this is a custodial bundle and it was delivered, we either
00704      * release custody (if we have it), or send a custody signal to
00705      * the current custodian indicating that the bundle was
00706      * successfully delivered, unless there is no current custodian
00707      * (the eid is still dtn:none).
00708      */
00709     if (bundle->custody_requested_)
00710     {
00711         if (bundle->local_custody_) {
00712             release_custody(bundle);
00713 
00714         } else if (bundle->custodian_.equals(EndpointID::NULL_EID())) {
00715             log_info("custodial bundle *%p delivered before custody accepted",
00716                      bundle);
00717 
00718         } else {
00719             generate_custody_signal(bundle, true,
00720                                     BundleProtocol::CUSTODY_NO_ADDTL_INFO);
00721         }
00722     }
00723 
00724     /*
00725      * Finally, check if we can and should delete the bundle from the
00726      * pending list, i.e. we don't have custody and it's not being
00727      * transmitted anywhere else.
00728      */
00729     try_delete_from_pending(bundle);
00730 }
00731 
00732 //----------------------------------------------------------------------
00733 void
00734 BundleDaemon::handle_bundle_expired(BundleExpiredEvent* event)
00735 {
00736     // update statistics
00737     stats_.bundles_expired_++;
00738     
00739     Bundle* bundle = event->bundleref_.object();
00740 
00741     log_info("BUNDLE_EXPIRED *%p", bundle);
00742 
00743     ASSERT(bundle->expiration_timer_ == NULL);
00744 
00745     // check if we have custody, if so, remove it
00746     if (bundle->local_custody_) {
00747         release_custody(bundle);
00748     }
00749 
00750     // delete the bundle from the pending list
00751     delete_from_pending(bundle, BundleProtocol::REASON_LIFETIME_EXPIRED);
00752 
00753     // XXX/demmer check if the bundle is a fragment awaiting reassembly
00754     
00755     // XXX/demmer should try to cancel transmission on any links where
00756     // the bundle is active
00757     
00758     // fall through to notify the routers
00759 }
00760 
00761 //----------------------------------------------------------------------
00762 void
00763 BundleDaemon::handle_registration_added(RegistrationAddedEvent* event)
00764 {
00765     Registration* registration = event->registration_;
00766     log_info("REGISTRATION_ADDED %d %s",
00767              registration->regid(), registration->endpoint().c_str());
00768 
00769     if (!reg_table_->add(registration,
00770                          (event->source_ == EVENTSRC_APP) ? true : false))
00771     {
00772         log_err("error adding registration %d to table",
00773                 registration->regid());
00774     }
00775     
00776     oasys::ScopeLock l(pending_bundles_->lock(), 
00777                        "BundleDaemon::handle_registration_added");
00778     BundleList::iterator iter;
00779     for (iter = pending_bundles_->begin();
00780          iter != pending_bundles_->end();
00781          ++iter)
00782     {
00783         Bundle* bundle = *iter;
00784 
00785         if (registration->endpoint().match(bundle->dest_))
00786         {
00787             deliver_to_registration(bundle, registration);
00788         }
00789     }
00790 }
00791 
00792 //----------------------------------------------------------------------
00793 void
00794 BundleDaemon::handle_registration_removed(RegistrationRemovedEvent* event)
00795 {
00796     Registration* registration = event->registration_;
00797     log_info("REGISTRATION_REMOVED %d %s",
00798              registration->regid(), registration->endpoint().c_str());
00799     
00800     if (!reg_table_->del(registration->regid())) {
00801         log_err("error removing registration %d from table",
00802                 registration->regid());
00803         return;
00804     }
00805 
00806     delete registration;
00807 }
00808 
00809 //----------------------------------------------------------------------
00810 void
00811 BundleDaemon::handle_registration_expired(RegistrationExpiredEvent* event)
00812 {
00813     Registration* registration = reg_table_->get(event->regid_);
00814 
00815     if (registration == NULL) {
00816         log_err("REGISTRATION_EXPIRED -- dead regid %d", event->regid_);
00817         return;
00818     }
00819     
00820     registration->set_expired(true);
00821     
00822     if (registration->active()) {
00823         // if the registration is currently active (i.e. has a
00824         // binding), we wait for the binding to clear, which will then
00825         // clean up the registration
00826         log_info("REGISTRATION_EXPIRED %d -- deferred until binding clears",
00827                  event->regid_);
00828     } else {
00829         // otherwise remove the registration from the table
00830         log_info("REGISTRATION_EXPIRED %d", event->regid_);
00831         reg_table_->del(registration->regid());
00832         delete registration;
00833     }
00834 }
00835 
00836 //----------------------------------------------------------------------
00837 void
00838 BundleDaemon::handle_link_available(LinkAvailableEvent* event)
00839 {
00840     Link* link = event->link_;
00841     ASSERT(link->isavailable());
00842 
00843     log_info("LINK_AVAILABLE *%p", link);
00844 }
00845 
00846 //----------------------------------------------------------------------
00847 void
00848 BundleDaemon::handle_link_unavailable(LinkUnavailableEvent* event)
00849 {
00850     Link* link = event->link_;
00851     ASSERT(!link->isavailable());
00852     
00853     log_info("LINK UNAVAILABLE *%p", link);
00854 }
00855 
00856 //----------------------------------------------------------------------
00857 void
00858 BundleDaemon::handle_link_state_change_request(LinkStateChangeRequest* request)
00859 {
00860     // XXX/demmer should also store the contact in the request -- that
00861     // way we can tell if it's still relevant or if it refers to
00862     // something in the past
00863     
00864     Link* link = request->link_;
00865     Link::state_t new_state = request->state_;
00866     Link::state_t old_state = request->old_state_;
00867     ContactEvent::reason_t reason = request->reason_;
00868     
00869     if (link->contact() != request->contact_) {
00870         log_warn("stale LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for link *%p: "
00871                  "contact %p != current contact %p", 
00872                  Link::state_to_str(old_state), Link::state_to_str(new_state),
00873                  ContactEvent::reason_to_str(reason), link,
00874                  request->contact_.object(), link->contact().object());
00875         return;
00876     }
00877 
00878     if (old_state != link->state()) {
00879         log_warn("stale LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for link *%p: "
00880                  "old_state != current state", 
00881                  Link::state_to_str(old_state), Link::state_to_str(new_state),
00882                  ContactEvent::reason_to_str(reason), link);
00883         return;
00884     }
00885     
00886     log_info("LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for link *%p",
00887              Link::state_to_str(old_state), Link::state_to_str(new_state),
00888              ContactEvent::reason_to_str(reason), link);
00889     
00890     switch(new_state) {
00891     case Link::UNAVAILABLE:
00892         if (link->state() != Link::AVAILABLE) {
00893             log_err("LINK_STATE_CHANGE_REQUEST *%p: "
00894                     "tried to set state UNAVAILABLE in state %s",
00895                     link, Link::state_to_str(link->state()));
00896             return;
00897         }
00898         link->set_state(new_state);
00899         post_at_head(new LinkUnavailableEvent(link, reason));
00900         break;
00901 
00902     case Link::AVAILABLE:
00903         if (link->state() == Link::UNAVAILABLE) {
00904             link->set_state(Link::AVAILABLE);
00905             
00906         } else if (link->state() == Link::BUSY &&
00907                    reason        == ContactEvent::UNBLOCKED) {
00908             ASSERT(link->contact() != NULL);
00909             link->set_state(Link::OPEN);
00910             
00911         } else if (link->state() == Link::OPEN &&
00912                    reason        == ContactEvent::UNBLOCKED) {
00913             // a CL might send multiple requests to go from
00914             // BUSY->AVAILABLE, so we can safely ignore this
00915             
00916         } else {
00917             log_err("LINK_STATE_CHANGE_REQUEST *%p: "
00918                     "tried to set state AVAILABLE in state %s",
00919                     link, Link::state_to_str(link->state()));
00920             return;
00921         }
00922 
00923         post_at_head(new LinkAvailableEvent(link, reason));
00924         break;
00925         
00926     case Link::BUSY:
00927         log_err("LINK_STATE_CHANGE_REQUEST can't be used for state %s",
00928                 Link::state_to_str(new_state));
00929         break;
00930         
00931     case Link::OPENING:
00932     case Link::OPEN:
00933         // force the link to be available, since someone really wants it open
00934         if (link->state() == Link::UNAVAILABLE) {
00935             link->set_state(Link::AVAILABLE);
00936         }
00937         actions_->open_link(link);
00938         break;
00939 
00940     case Link::CLOSED:
00941         // The only case where we should get this event when the link
00942         // is not actually open is if it's in the process of being
00943         // opened but the CL can't actually open it.
00944         if (! link->isopen() && ! link->isopening()) {
00945             log_err("LINK_STATE_CHANGE_REQUEST *%p: "
00946                     "setting state CLOSED (%s) in unexpected state %s",
00947                     link, ContactEvent::reason_to_str(reason),
00948                     link->state_to_str(link->state()));
00949             break;
00950         }
00951 
00952         // If the link is open (not OPENING), we need a ContactDownEvent
00953         if (link->isopen()) {
00954             ASSERT(link->contact() != NULL);
00955             post_at_head(new ContactDownEvent(link->contact(), reason));
00956         }
00957 
00958         // close the link
00959         actions_->close_link(link);
00960         
00961         // now, based on the reason code, update the link availability
00962         // and set state accordingly
00963         if (reason == ContactEvent::IDLE) {
00964             link->set_state(Link::AVAILABLE);
00965         } else {
00966             link->set_state(Link::UNAVAILABLE);
00967             post_at_head(new LinkUnavailableEvent(link, reason));
00968         }
00969     
00970         break;
00971 
00972     default:
00973         PANIC("unhandled state %s", Link::state_to_str(new_state));
00974     }
00975 }
00976   
00977 //----------------------------------------------------------------------
00978 void
00979 BundleDaemon::handle_contact_up(ContactUpEvent* event)
00980 {
00981     const ContactRef& contact = event->contact_;
00982     log_info("CONTACT_UP *%p (contact %p)", contact->link(), contact.object());
00983     
00984     Link* link = contact->link();
00985     ASSERT(link->contact() == contact);
00986     link->set_state(Link::OPEN);
00987     link->stats_.contacts_++;
00988 }
00989 
00990 //----------------------------------------------------------------------
00991 void
00992 BundleDaemon::handle_contact_down(ContactDownEvent* event)
00993 {
00994     const ContactRef& contact = event->contact_;
00995     Link* link = contact->link();
00996     ContactEvent::reason_t reason = event->reason_;
00997     
00998     log_info("CONTACT_DOWN *%p (%s) (contact %p)",
00999              link, ContactEvent::reason_to_str(reason), contact.object());
01000 
01001     // we don't need to do anything here since we just generated this
01002     // event in response to a link state change request
01003 }
01004 
01005 //----------------------------------------------------------------------
01006 void
01007 BundleDaemon::handle_reassembly_completed(ReassemblyCompletedEvent* event)
01008 {
01009     log_info("REASSEMBLY_COMPLETED bundle id %d",
01010              event->bundle_->bundleid_);
01011 
01012     // remove all the fragments from the pending list
01013     BundleRef ref("BundleDaemon::handle_reassembly_completed temporary");
01014     while ((ref = event->fragments_.pop_front()) != NULL) {
01015         try_delete_from_pending(ref.object());
01016     }
01017 
01018     // post a new event for the newly reassembled bundle
01019     post_at_head(new BundleReceivedEvent(event->bundle_.object(),
01020                                          EVENTSRC_FRAGMENTATION,
01021                                          event->bundle_->payload_.length()));
01022 }
01023 
01024 
01025 //----------------------------------------------------------------------
01026 void
01027 BundleDaemon::handle_route_add(RouteAddEvent* event)
01028 {
01029     log_info("ROUTE_ADD *%p", event->entry_);
01030 }
01031 
01032 //----------------------------------------------------------------------
01033 void
01034 BundleDaemon::handle_route_del(RouteDelEvent* event)
01035 {
01036     log_info("ROUTE_DEL %s", event->dest_.c_str());
01037 }
01038 
01039 //----------------------------------------------------------------------
01040 void
01041 BundleDaemon::handle_custody_signal(CustodySignalEvent* event)
01042 {
01043     log_info("CUSTODY_SIGNAL: %s %u.%u %s (%s)",
01044              event->data_.orig_source_eid_.c_str(),
01045              event->data_.orig_creation_tv_.seconds_,
01046              event->data_.orig_creation_tv_.seqno_,
01047              event->data_.succeeded_ ? "succeeded" : "failed",
01048              CustodySignal::reason_to_str(event->data_.reason_));
01049 
01050     BundleRef orig_bundle =
01051         custody_bundles_->find(event->data_.orig_source_eid_,
01052                                event->data_.orig_creation_tv_);
01053     
01054     if (orig_bundle == NULL) {
01055         log_warn("received custody signal for bundle %s %u.%u "
01056                  "but don't have custody",
01057                  event->data_.orig_source_eid_.c_str(),
01058                  event->data_.orig_creation_tv_.seconds_,
01059                  event->data_.orig_creation_tv_.seqno_);
01060         return;
01061     }
01062 
01063     // release custody if either the signal succeded or if it
01064     // (paradoxically) failed due to duplicate transmission
01065     bool release = event->data_.succeeded_;
01066     if ((event->data_.succeeded_ == false) &&
01067         (event->data_.reason_ == BundleProtocol::CUSTODY_REDUNDANT_RECEPTION))
01068     {
01069         log_notice("releasing custody for bundle %s %u.%u "
01070                    "due to redundant reception",
01071                    event->data_.orig_source_eid_.c_str(),
01072                    event->data_.orig_creation_tv_.seconds_,
01073                    event->data_.orig_creation_tv_.seqno_);
01074         
01075         release = true;
01076     }
01077     
01078     if (release) {
01079         release_custody(orig_bundle.object());
01080         try_delete_from_pending(orig_bundle.object());
01081     }
01082 }
01083 
01084 //----------------------------------------------------------------------
01085 void
01086 BundleDaemon::handle_custody_timeout(CustodyTimeoutEvent* event)
01087 {
01088     Bundle* bundle = event->bundle_.object();
01089     Link*   link   = event->link_;
01090     
01091     log_info("CUSTODY_TIMEOUT *%p, *%p", bundle, link);
01092     
01093     // remove and delete the expired timer from the bundle
01094     oasys::ScopeLock l(&bundle->lock_, "BundleDaemon::handle_custody_timeout");
01095 
01096     bool found = false;
01097     CustodyTimer* timer;
01098     CustodyTimerVec::iterator iter;
01099     for (iter = bundle->custody_timers_.begin();
01100          iter != bundle->custody_timers_.end();
01101          ++iter)
01102     {
01103         timer = *iter;
01104         if (timer->link_ == link)
01105         {
01106             if (timer->pending()) {
01107                 log_err("multiple pending custody timers for link %s",
01108                         link->nexthop());
01109                 continue;
01110             }
01111             
01112             found = true;
01113             bundle->custody_timers_.erase(iter);
01114             break;
01115         }
01116     }
01117 
01118     if (!found) {
01119         log_err("custody timeout for *%p *%p: timer not found in bundle list",
01120                 bundle, link);
01121         return;
01122     }
01123 
01124     ASSERT(!timer->cancelled());
01125     
01126     if (!pending_bundles_->contains(bundle)) {
01127         log_err("custody timeout for *%p *%p: bundle not in pending list",
01128                 bundle, link);
01129     }
01130 
01131     // add an entry to the forwarding log to indicate that we got the
01132     // custody failure signal. this simplifies the task of routers, as
01133     // the most recent entry in the log will not be SENT, so the
01134     // router will know to retransmit the bundle.
01135     bundle->fwdlog_.add_entry(link, ForwardingInfo::INVALID_ACTION,
01136                               ForwardingInfo::CUSTODY_TIMEOUT,
01137                               CustodyTimerSpec::defaults_);
01138 
01139     delete timer;
01140 
01141     // now fall through to let the router handle the event, typically
01142     // triggering a retransmission to the link in the event
01143 }
01144 
01145 //----------------------------------------------------------------------
01146 void
01147 BundleDaemon::handle_shutdown_request(ShutdownRequest* request)
01148 {
01149     (void)request;
01150     
01151     log_notice("Received shutdown request");
01152 
01153     oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_shutdown");
01154     
01155     const LinkSet* links = contactmgr_->links();
01156     LinkSet::const_iterator iter;
01157     Link* link = NULL;
01158 
01159     // close any open links
01160     for (iter = links->begin(); iter != links->end(); ++iter)
01161     {
01162         link = *iter;
01163         if (link->isopen()) {
01164             log_debug("Shutdown: closing link *%p\n", link);
01165             link->close();
01166         }
01167     }
01168 
01169     // call the app shutdown procedure
01170     if (app_shutdown_proc_) {
01171         (*app_shutdown_proc_)(app_shutdown_data_);
01172     }
01173 
01174     // signal to the main loop to bail
01175     set_should_stop();
01176 
01177     // fall through -- the DTNServer will close and flush all the data
01178     // stores
01179 }
01180 
01181 //----------------------------------------------------------------------
01182 void
01183 BundleDaemon::handle_status_request(StatusRequest* request)
01184 {
01185     (void)request;
01186     log_info("Received status request");
01187 }
01188 
01189 //----------------------------------------------------------------------
01190 bool
01191 BundleDaemon::add_to_pending(Bundle* bundle, bool add_to_store)
01192 {
01193     log_debug("adding bundle *%p to pending list", bundle);
01194     
01195     pending_bundles_->push_back(bundle);
01196     
01197     if (add_to_store) {
01198         bundle->in_datastore_ = true;
01199         actions_->store_add(bundle);
01200     }
01201 
01202     // schedule the bundle expiration timer
01203     struct timeval expiration_time;
01204     expiration_time.tv_sec =
01205         BundleTimestamp::TIMEVAL_CONVERSION +
01206         bundle->creation_ts_.seconds_ + 
01207         bundle->expiration_;
01208     
01209     expiration_time.tv_usec = 0;
01210     
01211     struct timeval now;
01212     gettimeofday(&now, 0);
01213     long int when = expiration_time.tv_sec - now.tv_sec;
01214 
01215     bool ok_to_route = true;
01216     
01217     if (when > 0) {
01218         log_debug("scheduling expiration for bundle id %d at %u.%u "
01219                   "(in %lu seconds)",
01220                   bundle->bundleid_,
01221                   (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
01222                   when);
01223     } else {
01224         log_warn("scheduling IMMEDIATE expiration for bundle id %d: "
01225                  "[expiration %u, creation time %u.%u, offset %u, now %u.%u]",
01226                  bundle->bundleid_, bundle->expiration_,
01227                  (u_int)bundle->creation_ts_.seconds_,
01228                  (u_int)bundle->creation_ts_.seqno_,
01229                  BundleTimestamp::TIMEVAL_CONVERSION,
01230                  (u_int)now.tv_sec, (u_int)now.tv_usec);
01231         
01232         ok_to_route = false;
01233     }
01234 
01235     bundle->expiration_timer_ = new ExpirationTimer(bundle);
01236     bundle->expiration_timer_->schedule_at(&expiration_time);
01237 
01238     return ok_to_route;
01239 }
01240 
01241 //----------------------------------------------------------------------
01242 void
01243 BundleDaemon::delete_from_pending(Bundle* bundle,
01244                                   status_report_reason_t reason)
01245 {
01246     log_debug("removing bundle *%p from pending list", bundle);
01247 
01248     // first try to cancel the expiration timer if it's still
01249     // around
01250     if (bundle->expiration_timer_) {
01251         log_debug("cancelling expiration timer for bundle id %d",
01252                   bundle->bundleid_);
01253         
01254         bool cancelled = bundle->expiration_timer_->cancel();
01255         if (!cancelled) {
01256            log_crit("unexpected error cancelling expiration timer "
01257                      "for bundle *%p", bundle);
01258         }
01259         
01260         bundle->expiration_timer_->bundleref_.release();
01261         bundle->expiration_timer_ = NULL;
01262     }
01263 
01264     bool erased = pending_bundles_->erase(bundle);
01265 
01266     if (erased) {
01267         if (bundle->deletion_rcpt_ &&
01268             (reason != BundleProtocol::REASON_NO_ADDTL_INFO))
01269         {
01270             generate_status_report(bundle,
01271                                    BundleProtocol::STATUS_DELETED,
01272                                    reason);
01273         }
01274     } else {
01275         log_err("unexpected error removing bundle from pending list");
01276     }
01277 }
01278 
01279 //----------------------------------------------------------------------
01280 void
01281 BundleDaemon::try_delete_from_pending(Bundle* bundle)
01282 {
01283     /*
01284      * Check to see if we should remove the bundle from the pending
01285      * list, after which all references to the bundle should be
01286      * cleaned up and the bundle will be deleted from the system.
01287      *
01288      * We do this only if:
01289      *
01290      * 1) We're configured for early deletion
01291      * 2) The bundle isn't queued on any lists other than the pending
01292      *    list. This covers the case where we have custody, since the
01293      *    bundle will be on the custody_bundles list
01294      * 3) The bundle isn't currently in flight, as recorded
01295      *    in the forwarding log.
01296      *
01297      * This allows a router (or the custody system) to maintain a
01298      * retention constraint by putting the bundle on a list, and
01299      * thereby adding a mapping.
01300      */
01301 
01302     if (! bundle->is_queued_on(pending_bundles_))
01303     {
01304         if (bundle->expiration_timer_ == NULL) {
01305             log_debug("try_delete_from_pending(*%p): bundle already expired",
01306                       bundle);
01307             return;
01308         }
01309         
01310         log_err("try_delete_from_pending(*%p): bundle not in pending list!",
01311                 bundle);
01312         return;
01313     }
01314 
01315     if (!params_.early_deletion_) {
01316         log_debug("try_delete_from_pending(*%p): not deleting because "
01317                   "early deletion disabled",
01318                   bundle);
01319         return;
01320     }
01321 
01322     size_t num_mappings = bundle->num_mappings();
01323     if (num_mappings != 1) {
01324         log_debug("try_delete_from_pending(*%p): not deleting because "
01325                   "bundle has %zu mappings",
01326                   bundle, num_mappings);
01327         return;
01328     }
01329     
01330     size_t num_in_flight = bundle->fwdlog_.get_count(ForwardingInfo::IN_FLIGHT);
01331     if (num_in_flight > 0) {
01332         log_debug("try_delete_from_pending(*%p): not deleting because "
01333                   "bundle in flight on %zu links",
01334                   bundle, num_in_flight);
01335         return;
01336     }
01337 
01338     delete_from_pending(bundle, BundleProtocol::REASON_NO_ADDTL_INFO);
01339 }
01340 
01341 //----------------------------------------------------------------------
01342 Bundle*
01343 BundleDaemon::find_duplicate(Bundle* b)
01344 {
01345     oasys::ScopeLock l(pending_bundles_->lock(), 
01346                        "BundleDaemon::find_duplicate");
01347     BundleList::iterator iter;
01348     for (iter = pending_bundles_->begin();
01349          iter != pending_bundles_->end();
01350          ++iter)
01351     {
01352         Bundle* b2 = *iter;
01353         
01354         if ((b->source_.equals(b2->source_)) &&
01355             (b->creation_ts_.seconds_ == b2->creation_ts_.seconds_) &&
01356             (b->creation_ts_.seqno_   == b2->creation_ts_.seqno_) &&
01357             (b->is_fragment_          == b2->is_fragment_) &&
01358             (b->frag_offset_          == b2->frag_offset_) &&
01359             (b->orig_length_          == b2->orig_length_) &&
01360             (b->payload_.length()     == b2->payload_.length()))
01361         {
01362             return b2;
01363         }
01364     }
01365 
01366     return NULL;
01367 }
01368 
01369 //----------------------------------------------------------------------
01370 void
01371 BundleDaemon::handle_bundle_free(BundleFreeEvent* event)
01372 {
01373     Bundle* bundle = event->bundle_;
01374     event->bundle_ = NULL;
01375     ASSERT(bundle->refcount() == 0);
01376 
01377     bundle->lock_.lock("BundleDaemon::handle_bundle_free");
01378 
01379     if (bundle->in_datastore_) {
01380         actions_->store_del(bundle);
01381     }
01382     
01383     delete bundle;
01384 }
01385 
01386 //----------------------------------------------------------------------
01387 void
01388 BundleDaemon::handle_event(BundleEvent* event)
01389 {
01390     dispatch_event(event);
01391     
01392     if (! event->daemon_only_) {
01393         // dispatch the event to the router(s) and the contact manager
01394         router_->handle_event(event);
01395         contactmgr_->handle_event(event);
01396     }
01397 
01398     stats_.events_processed_++;
01399 }
01400 
01401 //----------------------------------------------------------------------
01402 void
01403 BundleDaemon::load_registrations()
01404 {
01405     admin_reg_ = new AdminRegistration();
01406     {
01407         RegistrationAddedEvent e(admin_reg_, EVENTSRC_ADMIN);
01408         handle_event(&e);
01409     }
01410 
01411     Registration* reg;
01412     RegistrationStore* reg_store = RegistrationStore::instance();
01413     RegistrationStore::iterator* iter = reg_store->new_iterator();
01414 
01415     while (iter->next() == 0) {
01416         reg = reg_store->get(iter->cur_val());
01417         if (reg == NULL) {
01418             log_err("error loading registration %d from data store",
01419                     iter->cur_val());
01420             continue;
01421         }
01422         
01423         RegistrationAddedEvent e(reg, EVENTSRC_STORE);
01424         handle_event(&e);
01425     }
01426 
01427     delete iter;
01428 }
01429 
01430 //----------------------------------------------------------------------
01431 void
01432 BundleDaemon::load_bundles()
01433 {
01434     Bundle* bundle;
01435     BundleStore* bundle_store = BundleStore::instance();
01436     BundleStore::iterator* iter = bundle_store->new_iterator();
01437 
01438     log_notice("loading bundles from data store");
01439     for (iter->begin(); iter->more(); iter->next()) {
01440         bundle = bundle_store->get(iter->cur_val());
01441         if (bundle == NULL) {
01442             log_err("error loading bundle %d from data store",
01443                     iter->cur_val());
01444             continue;
01445         }
01446         
01447         BundleReceivedEvent e(bundle, EVENTSRC_STORE);
01448         handle_event(&e);
01449 
01450         // in the constructor, we disabled notifiers on the event
01451         // queue, so in case loading triggers other events, we just
01452         // let them queue up and handle them later when we're done
01453         // loading all the bundles
01454     }
01455 
01456     delete iter;
01457 }
01458 
01459 //----------------------------------------------------------------------
01460 void
01461 BundleDaemon::run()
01462 {
01463     if (! BundleTimestamp::check_local_clock()) {
01464         exit(1);
01465     }
01466     
01467     router_ = BundleRouter::create_router(BundleRouter::Config.type_.c_str());
01468     router_->initialize();
01469     
01470     load_registrations();
01471     load_bundles();
01472 
01473     BundleEvent* event;
01474 
01475     oasys::TimerSystem* timersys = oasys::TimerSystem::instance();
01476     
01477     struct pollfd pollfds[2];
01478     struct pollfd* event_poll = &pollfds[0];
01479     struct pollfd* timer_poll = &pollfds[1];
01480     
01481     event_poll->fd     = eventq_->read_fd();
01482     event_poll->events = POLLIN;
01483 
01484     timer_poll->fd     = timersys->notifier()->read_fd();
01485     timer_poll->events = POLLIN;
01486     
01487     while (1) {
01488         if (should_stop()) {
01489             break;
01490         }
01491 
01492         int timeout = timersys->run_expired_timers();
01493 
01494         if (eventq_->size() > 0) {
01495             bool ok = eventq_->try_pop(&event);
01496             ASSERT(ok);
01497             
01498             oasys::Time now;
01499             now.get_time();
01500         
01501             // handle the event
01502             handle_event(event);
01503 
01504             int elapsed = now.elapsed_ms();
01505             if (elapsed > 2000) {
01506                 log_warn("event %s took %d ms to process",
01507                          event->type_str(), elapsed);
01508             }
01509         
01510             // clean up the event
01511             delete event;
01512             
01513             continue; // no reason to poll
01514         }
01515         
01516         pollfds[0].revents = 0;
01517         pollfds[1].revents = 0;
01518 
01519         int cc = oasys::IO::poll_multiple(pollfds, 2, timeout);
01520         log_debug("poll returned %d", cc);
01521 
01522         if (cc == oasys::IOTIMEOUT) {
01523             log_debug("poll timeout");
01524             continue;
01525 
01526         } else if (cc <= 0) {
01527             log_err("unexpected return %d from poll_multiple!", cc);
01528             continue;
01529         }
01530 
01531         // if the event poll fired, we just go back to the top of the
01532         // loop to drain the queue
01533         if (event_poll->revents != 0) {
01534             log_debug("poll returned new event to handle");
01535         }
01536 
01537         // if the timer notifier fired, then someone just scheduled a
01538         // new timer, so we just continue, which will call
01539         // run_expired_timers and handle it
01540         if (timer_poll->revents != 0) {
01541             log_debug("poll returned new timers to handle");
01542             timersys->notifier()->clear();
01543         }
01544     }
01545 }
01546 
01547 } // namespace dtn

Generated on Fri Dec 22 14:47:58 2006 for DTN Reference Implementation by  doxygen 1.5.1