BundleDaemon.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2004-2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 
00018 #include <oasys/util/Time.h>
00019 
00020 #include "Bundle.h"
00021 #include "BundleActions.h"
00022 #include "BundleEvent.h"
00023 #include "BundleDaemon.h"
00024 #include "BundleStatusReport.h"
00025 #include "BundleTimestamp.h"
00026 #include "CustodySignal.h"
00027 #include "ExpirationTimer.h"
00028 #include "FragmentManager.h"
00029 #include "contacts/Contact.h"
00030 #include "contacts/ContactManager.h"
00031 #include "reg/AdminRegistration.h"
00032 #include "reg/APIRegistration.h"
00033 #include "reg/PingRegistration.h"
00034 #include "reg/Registration.h"
00035 #include "reg/RegistrationTable.h"
00036 #include "routing/BundleRouter.h"
00037 #include "routing/RouteTable.h"
00038 #include "storage/BundleStore.h"
00039 #include "storage/RegistrationStore.h"
00040 
00041 namespace dtn {
00042 
00043 template <>
00044 BundleDaemon* oasys::Singleton<BundleDaemon, false>::instance_ = NULL;
00045 
00046 bool
00047 BundleDaemon::is_simulator_ = false;
00048 
00049 BundleDaemon::Params::Params()
00050     :  early_deletion_(true),
00051        accept_custody_(true),
00052        reactive_frag_enabled_(true),
00053        retry_reliable_unacked_(true),
00054        test_permuted_delivery_(false) {}
00055 
00056 BundleDaemon::Params BundleDaemon::params_;
00057 
00058 bool BundleDaemon::shutting_down_ = false;
00059 
00060 //----------------------------------------------------------------------
00061 BundleDaemon::BundleDaemon()
00062     : BundleEventHandler("BundleDaemon", "/dtn/bundle/daemon"),
00063       Thread("BundleDaemon", CREATE_JOINABLE)
00064 {
00065     // default local eid
00066     // XXX/demmer fixme
00067     local_eid_.assign("dtn://localhost.dtn");
00068 
00069     memset(&stats_, 0, sizeof(stats_));
00070 
00071     pending_bundles_ = new BundleList("pending_bundles");
00072     custody_bundles_ = new BundleList("custody_bundles");
00073 
00074     contactmgr_ = new ContactManager();
00075     fragmentmgr_ = new FragmentManager();
00076     reg_table_ = new RegistrationTable();
00077 
00078     router_ = 0;
00079 
00080     app_shutdown_proc_ = NULL;
00081     app_shutdown_data_ = NULL;
00082 
00083     rtr_shutdown_proc_ = 0;
00084     rtr_shutdown_data_ = 0;
00085 }
00086 
00087 //----------------------------------------------------------------------
00088 BundleDaemon::~BundleDaemon()
00089 {
00090     delete pending_bundles_;
00091     delete custody_bundles_;
00092     
00093     delete contactmgr_;
00094     delete fragmentmgr_;
00095     delete reg_table_;
00096     delete router_;
00097 
00098     delete actions_;
00099     delete eventq_;
00100 }
00101 
00102 //----------------------------------------------------------------------
00103 void
00104 BundleDaemon::do_init()
00105 {
00106     actions_ = new BundleActions();
00107     eventq_ = new oasys::MsgQueue<BundleEvent*>(logpath_);
00108     eventq_->notify_when_empty();
00109     BundleProtocol::init_default_processors();
00110 }
00111 
00112 //----------------------------------------------------------------------
00113 void
00114 BundleDaemon::post(BundleEvent* event)
00115 {
00116     instance_->post_event(event);
00117 }
00118 
00119 //----------------------------------------------------------------------
00120 void
00121 BundleDaemon::post_at_head(BundleEvent* event)
00122 {
00123     instance_->post_event(event, false);
00124 }
00125 
00126 //----------------------------------------------------------------------
00127 bool
00128 BundleDaemon::post_and_wait(BundleEvent* event,
00129                             oasys::Notifier* notifier,
00130                             int timeout, bool at_back)
00131 {
00132     /*
00133      * Make sure that we're either already started up or are about to
00134      * start. Otherwise the wait call below would block indefinitely.
00135      */
00136     ASSERT(! oasys::Thread::start_barrier_enabled());
00137     
00138     ASSERT(event->processed_notifier_ == NULL);
00139     event->processed_notifier_ = notifier;
00140     if (at_back) {
00141         post(event);
00142     } else {
00143         post_at_head(event);
00144     }
00145     return notifier->wait(NULL, timeout);
00146 }
00147 
00148 //----------------------------------------------------------------------
00149 void
00150 BundleDaemon::post_event(BundleEvent* event, bool at_back)
00151 {
00152     log_debug("posting event (%p) with type %s (at %s)",
00153               event, event->type_str(), at_back ? "back" : "head");
00154     eventq_->push(event, at_back);
00155 }
00156 
00157 //----------------------------------------------------------------------
00158 void
00159 BundleDaemon::get_routing_state(oasys::StringBuffer* buf)
00160 {
00161     router_->get_routing_state(buf);
00162     contactmgr_->dump(buf);
00163 }
00164 
00165 //----------------------------------------------------------------------
00166 void
00167 BundleDaemon::get_bundle_stats(oasys::StringBuffer* buf)
00168 {
00169     buf->appendf("%zu pending -- "
00170                  "%zu custody -- "
00171                  "%u received -- "
00172                  "%u delivered -- "
00173                  "%u generated -- "
00174                  "%u transmitted -- "
00175                  "%u expired -- "
00176                  "%u duplicate",
00177                  pending_bundles()->size(),
00178                  custody_bundles()->size(),
00179                  stats_.bundles_received_,
00180                  stats_.bundles_delivered_,
00181                  stats_.bundles_generated_,
00182                  stats_.bundles_transmitted_,
00183                  stats_.bundles_expired_,
00184                  stats_.duplicate_bundles_);
00185 }
00186 
00187 //----------------------------------------------------------------------
00188 void
00189 BundleDaemon::get_daemon_stats(oasys::StringBuffer* buf)
00190 {
00191     buf->appendf("%zu pending_events -- "
00192                  "%u processed_events",
00193                  eventq_->size(),
00194                  stats_.events_processed_);
00195 }
00196 
00197 //----------------------------------------------------------------------
00198 void
00199 BundleDaemon::reset_stats()
00200 {
00201     memset(&stats_, 0, sizeof(stats_));
00202 
00203     oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::reset_stats");
00204     
00205     const LinkSet* links = contactmgr_->links();
00206     LinkSet::const_iterator iter;
00207     for (iter = links->begin(); iter != links->end(); ++iter) {
00208         (*iter)->reset_stats();
00209     }
00210 }
00211 
00212 //----------------------------------------------------------------------
00213 void
00214 BundleDaemon::generate_status_report(Bundle* orig_bundle,
00215                                      status_report_flag_t flag,
00216                                      status_report_reason_t reason)
00217 {
00218     log_debug("generating return receipt status report, "
00219               "flag = 0x%x, reason = 0x%x", flag, reason);
00220         
00221     Bundle* report = new Bundle();
00222     BundleStatusReport::create_status_report(report, orig_bundle,
00223                                              local_eid_, flag, reason);
00224     
00225     BundleReceivedEvent e(report, EVENTSRC_ADMIN);
00226     handle_event(&e);
00227 }
00228 
00229 //----------------------------------------------------------------------
00230 void
00231 BundleDaemon::generate_custody_signal(Bundle* bundle, bool succeeded,
00232                                       custody_signal_reason_t reason)
00233 {
00234     if (bundle->local_custody_) {
00235         log_err("send_custody_signal(*%p): already have local custody",
00236                 bundle);
00237         return;
00238     }
00239 
00240     if (bundle->custodian_.equals(EndpointID::NULL_EID())) {
00241         log_err("send_custody_signal(*%p): current custodian is NULL_EID",
00242                 bundle);
00243         return;
00244     }
00245     
00246     Bundle* signal = new Bundle();
00247     CustodySignal::create_custody_signal(signal, bundle, local_eid_,
00248                                          succeeded, reason);
00249     
00250     BundleReceivedEvent e(signal, EVENTSRC_ADMIN);
00251     handle_event(&e);
00252 }
00253 
00254 //----------------------------------------------------------------------
00255 void
00256 BundleDaemon::cancel_custody_timers(Bundle* bundle)
00257 {
00258     oasys::ScopeLock l(&bundle->lock_, "BundleDaemon::cancel_custody_timers");
00259     
00260     CustodyTimerVec::iterator iter;
00261     for (iter =  bundle->custody_timers_.begin();
00262          iter != bundle->custody_timers_.end();
00263          ++iter)
00264     {
00265         bool ok = (*iter)->cancel();
00266         if (!ok) {
00267             log_crit("unexpected error cancelling custody timer for bundle *%p",
00268                      bundle);
00269         }
00270         
00271         // the timer will be deleted when it bubbles to the top of the
00272         // timer queue
00273     }
00274     
00275     bundle->custody_timers_.clear();
00276 }
00277 
00278 //----------------------------------------------------------------------
00279 void
00280 BundleDaemon::accept_custody(Bundle* bundle)
00281 {
00282     log_info("accept_custody *%p", bundle);
00283     
00284     if (bundle->local_custody_) {
00285         log_err("accept_custody(*%p): already have local custody",
00286                 bundle);
00287         return;
00288     }
00289 
00290     if (bundle->custodian_.equals(local_eid_)) {
00291         log_err("send_custody_signal(*%p): "
00292                 "current custodian is already local_eid",
00293                 bundle);
00294         return;
00295     }
00296     
00297     // send a custody acceptance signal to the current custodian (if
00298     // it is someone, and not the null eid)
00299     if (! bundle->custodian_.equals(EndpointID::NULL_EID())) {
00300         generate_custody_signal(bundle, true, BundleProtocol::CUSTODY_NO_ADDTL_INFO);
00301     }
00302 
00303     // now we mark the bundle to indicate that we have custody and add
00304     // it to the custody bundles list
00305     bundle->custodian_.assign(local_eid_);
00306     bundle->local_custody_ = true;
00307     actions_->store_update(bundle);
00308     
00309     custody_bundles_->push_back(bundle);
00310 
00311     // finally, if the bundle requested custody acknowledgements,
00312     // deliver them now
00313     if (bundle->custody_rcpt_) {
00314         generate_status_report(bundle, BundleProtocol::STATUS_CUSTODY_ACCEPTED);
00315     }
00316 }
00317 
00318 //----------------------------------------------------------------------
00319 void
00320 BundleDaemon::release_custody(Bundle* bundle)
00321 {
00322     log_info("release_custody *%p", bundle);
00323     
00324     if (!bundle->local_custody_) {
00325         log_err("release_custody(*%p): don't have local custody",
00326                 bundle);
00327         return;
00328     }
00329 
00330     cancel_custody_timers(bundle);
00331 
00332     bundle->custodian_.assign(EndpointID::NULL_EID());
00333     bundle->local_custody_ = false;
00334     actions_->store_update(bundle);
00335     
00336     custody_bundles_->erase(bundle);
00337 }
00338 
00339 //----------------------------------------------------------------------
00340 void
00341 BundleDaemon::deliver_to_registration(Bundle* bundle,
00342                                       Registration* registration)
00343 {
00344     // tells routers that this Bundle has been taken care of
00345     // by the daemon already
00346     bundle->owner_ = "daemon";
00347 
00348     if (bundle->is_fragment_) {
00349         log_debug("deferring delivery of bundle *%p to registration %d (%s) "
00350                   "since bundle is a fragment",
00351                   bundle, registration->regid(),
00352                   registration->endpoint().c_str());
00353         
00354         fragmentmgr_->process_for_reassembly(bundle);
00355         return;
00356     }
00357 
00358     log_debug("delivering bundle *%p to registration %d (%s)",
00359               bundle, registration->regid(),
00360               registration->endpoint().c_str());
00361     
00362     registration->deliver_bundle(bundle);
00363 }
00364 
00365 //----------------------------------------------------------------------
00366 void
00367 BundleDaemon::check_registrations(Bundle* bundle)
00368 {
00369     int num;
00370     log_debug("checking for matching registrations for bundle *%p", bundle);
00371 
00372     RegistrationList matches;
00373     RegistrationList::iterator iter;
00374 
00375     num = reg_table_->get_matching(bundle->dest_, &matches);
00376 
00377     for (iter = matches.begin(); iter != matches.end(); ++iter)
00378     {
00379         Registration* registration = *iter;
00380         deliver_to_registration(bundle, registration);
00381     }
00382 }
00383 
00384 //----------------------------------------------------------------------
00385 void
00386 BundleDaemon::handle_bundle_accept(BundleAcceptRequest* request)
00387 {
00388     *request->result_ =
00389         router_->accept_bundle(request->bundle_.object(), request->reason_);
00390 
00391     log_info("BUNDLE_ACCEPT_REQUEST: bundle *%p %s (reason %s)",
00392              request->bundle_.object(),
00393              *request->result_ ? "accepted" : "not accepted",
00394              BundleStatusReport::reason_to_str(*request->reason_));
00395 }
00396     
00397 //----------------------------------------------------------------------
00398 void
00399 BundleDaemon::handle_bundle_received(BundleReceivedEvent* event)
00400 {
00401     Bundle* bundle = event->bundleref_.object();
00402 
00403     // update statistics and store an appropriate event descriptor
00404     const char* source_str = "";
00405     switch (event->source_) {
00406     case EVENTSRC_PEER:
00407         stats_.bundles_received_++;
00408         break;
00409         
00410     case EVENTSRC_APP:
00411         stats_.bundles_received_++;
00412         source_str = " (from app)";
00413         break;
00414         
00415     case EVENTSRC_STORE:
00416         source_str = " (from data store)";
00417         break;
00418         
00419     case EVENTSRC_ADMIN:
00420         stats_.bundles_generated_++;
00421         source_str = " (generated)";
00422         break;
00423         
00424     case EVENTSRC_FRAGMENTATION:
00425         stats_.bundles_generated_++;
00426         source_str = " (from fragmentation)";
00427         break;
00428 
00429     default:
00430         NOTREACHED;
00431     }
00432 
00433     // if debug logging is enabled, dump out a verbose printing of the
00434     // bundle, including all options, otherwise, a more terse log
00435     if (log_enabled(oasys::LOG_DEBUG)) {
00436         oasys::StaticStringBuffer<1024> buf;
00437         buf.appendf("BUNDLE_RECEIVED%s: (%u bytes recvd)\n",
00438                     source_str, event->bytes_received_);
00439         bundle->format_verbose(&buf);
00440         log_multiline(oasys::LOG_DEBUG, buf.c_str());
00441     } else {
00442         log_info("BUNDLE_RECEIVED%s *%p (%u bytes recvd)",
00443                  source_str, bundle, event->bytes_received_);
00444     }
00445     
00446     // log a warning if the bundle doesn't have any expiration time or
00447     // has a creation time that's in the future. in either case, we
00448     // proceed as normal
00449     if (bundle->expiration_ == 0) {
00450         log_warn("bundle id %d arrived with zero expiration time",
00451                  bundle->bundleid_);
00452     }
00453 
00454     u_int32_t now = BundleTimestamp::get_current_time();
00455     if ((bundle->creation_ts_.seconds_ > now) &&
00456         (bundle->creation_ts_.seconds_ - now > 30000))
00457     {
00458         log_warn("bundle id %d arrived with creation time in the future "
00459                  "(%u > %u)",
00460                  bundle->bundleid_, bundle->creation_ts_.seconds_, now);
00461     }
00462 
00463     /*
00464      * Check if the bundle isn't complete. If so, do reactive
00465      * fragmentation.
00466      */
00467     if (event->source_ == EVENTSRC_PEER) {
00468         ASSERT(event->bytes_received_ != 0);
00469         size_t payload_offset = BundleProtocol::payload_offset(&bundle->recv_blocks_);
00470         fragmentmgr_->try_to_convert_to_fragment(bundle, payload_offset,
00471                                                  event->bytes_received_);
00472     }
00473 
00474     // validate a bundle, including all bundle blocks, received from a peer
00475     if (event->source_ == EVENTSRC_PEER) { 
00476         status_report_reason_t
00477             reception_reason = BundleProtocol::REASON_NO_ADDTL_INFO,
00478             deletion_reason = BundleProtocol::REASON_NO_ADDTL_INFO;
00479 
00480         bool accept_bundle = is_simulator_ ? true :
00481                              BundleProtocol::validate(bundle,
00482                                                       &reception_reason,
00483                                                       &deletion_reason);
00484         /*
00485          * Send the reception receipt if requested within the primary block
00486          * or a block validation error was encountered.
00487          */
00488         if (bundle->receive_rcpt_ ||
00489             reception_reason != BundleProtocol::REASON_NO_ADDTL_INFO) {
00490             generate_status_report(bundle, BundleProtocol::STATUS_RECEIVED,
00491                                    reception_reason);
00492         }
00493 
00494         /*
00495          * Delete a bundle if a validation error was encountered.
00496          */
00497         if (!accept_bundle) {
00498             delete_bundle(bundle, deletion_reason);
00499             event->daemon_only_ = true;
00500             return;
00501         }
00502     }
00503     
00504     /*
00505      * Check if the bundle is a duplicate, i.e. shares a source id,
00506      * timestamp, and fragmentation information with some other bundle
00507      * in the system.
00508      */
00509     Bundle* duplicate = find_duplicate(bundle);
00510     if (duplicate != NULL) {
00511         log_notice("got duplicate bundle: %s -> %s creation %u.%u",
00512                    bundle->source_.c_str(),
00513                    bundle->dest_.c_str(),
00514                    bundle->creation_ts_.seconds_,
00515                    bundle->creation_ts_.seqno_);
00516 
00517         stats_.duplicate_bundles_++;
00518         
00519         if (bundle->custody_requested_ && duplicate->local_custody_)
00520         {
00521             generate_custody_signal(bundle, false,
00522                                     BundleProtocol::CUSTODY_REDUNDANT_RECEPTION);
00523         }
00524 
00525         // since we don't want the bundle to be processed by the rest
00526         // of the system, we mark the event as daemon_only (meaning it
00527         // won't be forwarded to routers) and return, which should
00528         // eventually remove all references on the bundle and then it
00529         // will be deleted
00530         event->daemon_only_ = true;
00531         return;
00532     }
00533 
00534     /*
00535      * Check all BlockProcessors to validate the bundle.
00536      */
00537     
00538     /*
00539      * Add the bundle to the master pending queue and the data store
00540      * (unless the bundle was just reread from the data store on startup)
00541      *
00542      * Note that if add_to_pending returns false, the bundle has
00543      * already expired so we immediately return instead of trying to
00544      * deliver and/or forward the bundle. Otherwise there's a chance
00545      * that expired bundles will persist in the network.
00546      */
00547     bool ok_to_route =
00548         add_to_pending(bundle, (event->source_ != EVENTSRC_STORE));
00549 
00550     if (!ok_to_route) {
00551         event->daemon_only_ = true;
00552         return;
00553     }
00554     
00555     /*
00556      * If the bundle is a custody bundle and we're configured to take
00557      * custody, then do so. In case the event was delivered due to a
00558      * reload from the data store, then if we have local custody, make
00559      * sure it's added to the custody bundles list.
00560      */
00561     if (bundle->custody_requested_ && params_.accept_custody_)
00562     {
00563         if (event->source_ != EVENTSRC_STORE) {
00564             accept_custody(bundle);
00565         
00566         } else if (bundle->local_custody_) {
00567             custody_bundles_->push_back(bundle);
00568         }
00569     }
00570     
00571     /*
00572      * Deliver the bundle to any local registrations that it matches
00573      */
00574     check_registrations(bundle);
00575     
00576     /*
00577      * Finally, bounce out so the router(s) can do something further
00578      * with the bundle in response to the event.
00579      */
00580 }
00581 
00582 //----------------------------------------------------------------------
00583 void
00584 BundleDaemon::handle_bundle_transmitted(BundleTransmittedEvent* event)
00585 {
00586     Bundle* bundle = event->bundleref_.object();
00587 
00588     Link* link = event->link_;
00589     
00590     /*
00591      * Update statistics. Note that the link's queued length must
00592      * always be decremented by the full formatted size of the bundle,
00593      * yet the transmitted length is only the amount reported by the
00594      * event.
00595      */
00596     BlockInfoVec* blocks = bundle->xmit_blocks_.find_blocks(link);
00597     ASSERT(blocks != NULL);
00598         
00599     size_t total_len = BundleProtocol::total_length(blocks);
00600     
00601     stats_.bundles_transmitted_++;
00602     link->stats()->bundles_transmitted_++;
00603     link->stats()->bundles_queued_--;
00604 
00605     link->stats()->bytes_transmitted_ += event->bytes_sent_;
00606     link->stats()->bytes_queued_ -= total_len;
00607     
00608     log_info("BUNDLE_TRANSMITTED id:%d (%u bytes_sent/%u reliable) -> %s (%s)",
00609              bundle->bundleid_,
00610              event->bytes_sent_,
00611              event->reliably_sent_,
00612              link->name(),
00613              link->nexthop());
00614 
00615     /*
00616      * If we're configured to wait for reliable transmission, then
00617      * check the special case where we transmitted some or all a
00618      * bundle but nothing was acked. In this case, we create a
00619      * transmission failed event in the forwarding log and don't do
00620      * any of the rest of the processing below.
00621      *
00622      * Note also the special care taken to handle a zero-length
00623      * bundle. XXX/demmer this should all go away when the lengths
00624      * include both the header length and the payload length (in which
00625      * case it's never zero).
00626      *
00627      * XXX/demmer a better thing to do (maybe) would be to record the
00628      * lengths in the forwarding log as part of the transmitted entry.
00629      */
00630     if (params_.retry_reliable_unacked_ &&
00631         link->is_reliable() &&
00632         (event->bytes_sent_ != event->reliably_sent_) &&
00633         (event->reliably_sent_ == 0))
00634     {
00635         bundle->fwdlog_.update(link, ForwardingInfo::TRANSMIT_FAILED);
00636         bundle->xmit_blocks_.delete_blocks(link);
00637         return;
00638     }
00639 
00640     /*
00641      * Update the forwarding log
00642      */
00643     bundle->fwdlog_.update(link, ForwardingInfo::TRANSMITTED);
00644                             
00645     /*
00646      * Grab the updated forwarding log information so we can find the
00647      * custody timer information (if any).
00648      */
00649     ForwardingInfo fwdinfo;
00650     bool ok = bundle->fwdlog_.get_latest_entry(link, &fwdinfo);
00651     ASSERTF(ok, "no forwarding log entry for transmission");
00652     ASSERT(fwdinfo.state_ == ForwardingInfo::TRANSMITTED);
00653     
00654     /*
00655      * Check for reactive fragmentation. If the bundle was only
00656      * partially sent, then a new bundle received event for the tail
00657      * part of the bundle will be processed immediately after this
00658      * event.
00659      */
00660     if (link->reliable_) {
00661         fragmentmgr_->try_to_reactively_fragment(bundle,
00662                                                  BundleProtocol::payload_offset(blocks),
00663                                                  event->reliably_sent_);
00664     } else {
00665         fragmentmgr_->try_to_reactively_fragment(bundle,
00666                                                  BundleProtocol::payload_offset(blocks),
00667                                                  event->bytes_sent_);
00668     }
00669 
00670     /*
00671      * Remove the formatted block info from the bundle since we don't
00672      * need it any more.
00673      */
00674     bundle->xmit_blocks_.delete_blocks(link);
00675     blocks = NULL;
00676 
00677     /*
00678      * Generate the forwarding status report if requested
00679      */
00680     if (bundle->forward_rcpt_) {
00681         generate_status_report(bundle, BundleProtocol::STATUS_FORWARDED);
00682     }
00683 
00684     /*
00685      * Schedule a custody timer if we have custody.
00686      */
00687     if (bundle->local_custody_) {
00688         bundle->custody_timers_.push_back(
00689             new CustodyTimer(fwdinfo.timestamp_,
00690                              fwdinfo.custody_timer_,
00691                              bundle, link));
00692         
00693         // XXX/TODO: generate failed custodial signal for "forwarded
00694         // over unidirectional link" if the bundle has the retention
00695         // constraint "custody accepted" and all of the nodes in the
00696         // minimum reception group of the endpoint selected for
00697         // forwarding are known to be unable to send bundles back to
00698         // this node
00699     }
00700 
00701     /*
00702      * Check if we should can delete the bundle from the pending list,
00703      * i.e. we don't have custody and it's not being transmitted
00704      * anywhere else.
00705      */
00706     try_delete_from_pending(bundle);
00707 }
00708 
00709 //----------------------------------------------------------------------
00710 void
00711 BundleDaemon::handle_bundle_transmit_failed(BundleTransmitFailedEvent* event)
00712 {
00713     /*
00714      * The bundle was delivered to a next-hop contact.
00715      */
00716     Bundle* bundle = event->bundleref_.object();
00717 
00718     Link* link = event->link_;
00719     bundle->xmit_blocks_.delete_blocks(link);
00720     
00721     log_info("BUNDLE_TRANSMIT_FAILED id:%d -> %s (%s)",
00722              bundle->bundleid_,
00723              event->contact_->link()->name(),
00724              event->contact_->link()->nexthop());
00725     
00726     /*
00727      * Update the forwarding log so routers know to try to retransmit
00728      * on the next contact.
00729      */
00730     bundle->fwdlog_.update(event->contact_->link(),
00731                            ForwardingInfo::TRANSMIT_FAILED);
00732 
00733     /*
00734      * Fall through to notify the routers
00735      */
00736 }
00737 
00738 //----------------------------------------------------------------------
00739 void
00740 BundleDaemon::handle_bundle_delivered(BundleDeliveredEvent* event)
00741 {
00742     // update statistics
00743     stats_.bundles_delivered_++;
00744     
00745     /*
00746      * The bundle was delivered to a registration.
00747      */
00748     Bundle* bundle = event->bundleref_.object();
00749 
00750     log_info("BUNDLE_DELIVERED id:%d (%zu bytes) -> regid %d (%s)",
00751              bundle->bundleid_, bundle->payload_.length(),
00752              event->registration_->regid(),
00753              event->registration_->endpoint().c_str());
00754 
00755     /*
00756      * Generate the delivery status report if requested.
00757      */
00758     if (bundle->delivery_rcpt_)
00759     {
00760         generate_status_report(bundle, BundleProtocol::STATUS_DELIVERED);
00761     }
00762 
00763     /*
00764      * If this is a custodial bundle and it was delivered, we either
00765      * release custody (if we have it), or send a custody signal to
00766      * the current custodian indicating that the bundle was
00767      * successfully delivered, unless there is no current custodian
00768      * (the eid is still dtn:none).
00769      */
00770     if (bundle->custody_requested_)
00771     {
00772         if (bundle->local_custody_) {
00773             release_custody(bundle);
00774 
00775         } else if (bundle->custodian_.equals(EndpointID::NULL_EID())) {
00776             log_info("custodial bundle *%p delivered before custody accepted",
00777                      bundle);
00778 
00779         } else {
00780             generate_custody_signal(bundle, true,
00781                                     BundleProtocol::CUSTODY_NO_ADDTL_INFO);
00782         }
00783     }
00784 
00785     /*
00786      * Finally, check if we can and should delete the bundle from the
00787      * pending list, i.e. we don't have custody and it's not being
00788      * transmitted anywhere else.
00789      */
00790     try_delete_from_pending(bundle);
00791 }
00792 
00793 //----------------------------------------------------------------------
00794 void
00795 BundleDaemon::handle_bundle_expired(BundleExpiredEvent* event)
00796 {
00797     // update statistics
00798     stats_.bundles_expired_++;
00799     
00800     Bundle* bundle = event->bundleref_.object();
00801 
00802     log_info("BUNDLE_EXPIRED *%p", bundle);
00803 
00804     // note that there may or may not still be a pending expiration
00805     // timer, since this event may be coming from the console, in
00806     // which case we just fall through to delete_bundle which will
00807     // cancel the timer
00808 
00809     delete_bundle(bundle, BundleProtocol::REASON_LIFETIME_EXPIRED);
00810     
00811     // fall through to notify the routers
00812 }
00813 
00814 //----------------------------------------------------------------------
00815 void
00816 BundleDaemon::handle_bundle_send(BundleSendRequest* event)
00817 {
00818     Link *link = contactmgr_->find_link(event->link_.c_str());
00819 
00820     if(!link) return;
00821 
00822     BundleRef br = pending_bundles_->find(event->bundleid_);
00823 
00824     if(!br.object()) {
00825         br = custody_bundles_->find(event->bundleid_);
00826 
00827         if(!br.object()) {
00828             return;
00829         }
00830     }
00831 
00832     ForwardingInfo::action_t fwd_action =
00833         ForwardingInfo::action_t(event->action_);
00834 
00835     if(fwd_action == ForwardingInfo::INVALID_ACTION) {
00836         return;
00837     }
00838 
00839     bool result = actions_->send_bundle(br.object(), link,
00840         fwd_action, CustodyTimerSpec::defaults_);
00841 
00842     if(result == false) {
00843         // log something
00844         // BundleTransmitFailed?
00845     }
00846 }
00847 
00848 //----------------------------------------------------------------------
00849 void
00850 BundleDaemon::handle_bundle_cancel(BundleCancelRequest* event)
00851 {
00852     Link *link = contactmgr_->find_link(event->link_.c_str());
00853 
00854     if(!link) {
00855         return;
00856     }
00857 
00858     BundleRef br = pending_bundles_->find(event->bundleid_);
00859 
00860     if(!br.object()) {
00861         br = custody_bundles_->find(event->bundleid_);
00862 
00863         if(!br.object()) {
00864             return;
00865         }
00866     }
00867 
00868     bool result = actions_->cancel_bundle(br.object(), link);
00869 
00870     if(result == false) {
00871         // log something
00872         // BundleTransmitFailed?
00873     }
00874 }
00875 
00876 //----------------------------------------------------------------------
00877 void
00878 BundleDaemon::handle_bundle_inject(BundleInjectRequest* event)
00879 {
00880     // the new bundle is *not* placed on the pending queue or
00881     // in durable storage (no call to BundleActions::inject_bundle)
00882 
00883     Link *link = contactmgr_->find_link(event->link_.c_str());
00884 
00885     if(!link) {
00886         return;
00887     }
00888 
00889     // make a bundle
00890     Bundle *bundle=new Bundle();
00891     if(bundle->source_.assign(event->src_) &&
00892         bundle->dest_.assign(event->dest_)) {
00893 
00894         if(! bundle->replyto_.assign(event->replyto_))
00895             bundle->replyto_.assign(EndpointID::NULL_EID());
00896 
00897         if(! bundle->custodian_.assign(event->custodian_))
00898             bundle->custodian_.assign(EndpointID::NULL_EID()); 
00899 
00900         // bundle COS defaults to COS_BULK
00901         bundle->priority_ = event->priority_;
00902 
00903         // bundle expiration (on remote dtn nodes)
00904         // defaults to 5 minutes
00905         if(event->expiration_ == 0)
00906             bundle->expiration_ = 300;
00907         else
00908             bundle->expiration_ = event->expiration_;
00909 
00910         // set the payload
00911         const u_char *payload = (const u_char*)event->payload_.c_str();
00912         bundle->payload_.set_data(payload,sizeof(payload));
00913 
00914         // send attempt
00915         bool success = false;
00916         success = actions_->send_bundle(bundle, link,
00917             ForwardingInfo::action_t(event->action_),
00918             CustodyTimerSpec::defaults_);
00919         if(!success)
00920             delete bundle;
00921 
00922     } else
00923         delete bundle;
00924 }
00925 
00926 //----------------------------------------------------------------------
00927 void
00928 BundleDaemon::handle_bundle_query(BundleQueryRequest*)
00929 {
00930     BundleDaemon::post_at_head(new BundleReportEvent());
00931 }
00932 
00933 //----------------------------------------------------------------------
00934 void
00935 BundleDaemon::handle_bundle_report(BundleReportEvent*)
00936 {
00937 }
00938 
00939 //----------------------------------------------------------------------
00940 void
00941 BundleDaemon::handle_registration_added(RegistrationAddedEvent* event)
00942 {
00943     Registration* registration = event->registration_;
00944     log_info("REGISTRATION_ADDED %d %s",
00945              registration->regid(), registration->endpoint().c_str());
00946 
00947     if (!reg_table_->add(registration,
00948                          (event->source_ == EVENTSRC_APP) ? true : false))
00949     {
00950         log_err("error adding registration %d to table",
00951                 registration->regid());
00952     }
00953     
00954     oasys::ScopeLock l(pending_bundles_->lock(), 
00955                        "BundleDaemon::handle_registration_added");
00956     BundleList::iterator iter;
00957     for (iter = pending_bundles_->begin();
00958          iter != pending_bundles_->end();
00959          ++iter)
00960     {
00961         Bundle* bundle = *iter;
00962 
00963         if (registration->endpoint().match(bundle->dest_))
00964         {
00965             deliver_to_registration(bundle, registration);
00966         }
00967     }
00968 }
00969 
00970 //----------------------------------------------------------------------
00971 void
00972 BundleDaemon::handle_registration_removed(RegistrationRemovedEvent* event)
00973 {
00974     Registration* registration = event->registration_;
00975     log_info("REGISTRATION_REMOVED %d %s",
00976              registration->regid(), registration->endpoint().c_str());
00977     
00978     if (!reg_table_->del(registration->regid())) {
00979         log_err("error removing registration %d from table",
00980                 registration->regid());
00981         return;
00982     }
00983 
00984     delete registration;
00985 }
00986 
00987 //----------------------------------------------------------------------
00988 void
00989 BundleDaemon::handle_registration_expired(RegistrationExpiredEvent* event)
00990 {
00991     Registration* registration = reg_table_->get(event->regid_);
00992 
00993     if (registration == NULL) {
00994         log_err("REGISTRATION_EXPIRED -- dead regid %d", event->regid_);
00995         return;
00996     }
00997     
00998     registration->set_expired(true);
00999     
01000     if (registration->active()) {
01001         // if the registration is currently active (i.e. has a
01002         // binding), we wait for the binding to clear, which will then
01003         // clean up the registration
01004         log_info("REGISTRATION_EXPIRED %d -- deferred until binding clears",
01005                  event->regid_);
01006     } else {
01007         // otherwise remove the registration from the table
01008         log_info("REGISTRATION_EXPIRED %d", event->regid_);
01009         reg_table_->del(registration->regid());
01010         delete registration;
01011     }
01012 }
01013 
01014 //----------------------------------------------------------------------
01015 void
01016 BundleDaemon::handle_link_available(LinkAvailableEvent* event)
01017 {
01018     Link* link = event->link_;
01019     ASSERT(link->isavailable());
01020 
01021     log_info("LINK_AVAILABLE *%p", link);
01022 }
01023 
01024 //----------------------------------------------------------------------
01025 void
01026 BundleDaemon::handle_link_unavailable(LinkUnavailableEvent* event)
01027 {
01028     Link* link = event->link_;
01029     ASSERT(!link->isavailable());
01030     
01031     log_info("LINK UNAVAILABLE *%p", link);
01032 }
01033 
01034 //----------------------------------------------------------------------
01035 void
01036 BundleDaemon::handle_link_state_change_request(LinkStateChangeRequest* request)
01037 {
01038     Link* link = request->link_;
01039     if (! link) {
01040         log_warn("LINK_STATE_CHANGE_REQUEST received invalid link");
01041         return;
01042     }
01043 
01044     Link::state_t new_state = Link::state_t(request->state_);
01045     Link::state_t old_state = Link::state_t(request->old_state_);
01046     int reason = request->reason_;
01047     
01048     if (link->contact() != request->contact_) {
01049         log_warn("stale LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for link *%p: "
01050                  "contact %p != current contact %p", 
01051                  Link::state_to_str(old_state), Link::state_to_str(new_state),
01052                  ContactEvent::reason_to_str(reason), link,
01053                  request->contact_.object(), link->contact().object());
01054         return;
01055     }
01056 
01057     log_info("LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for link *%p",
01058              Link::state_to_str(old_state), Link::state_to_str(new_state),
01059              ContactEvent::reason_to_str(reason), link);
01060 
01061     //avoid a race condition caused by opening a partially closed link
01062     oasys::ScopeLock l;
01063     if (new_state == Link::OPEN)
01064     {
01065         l.set_lock(contactmgr_->lock(), "BundleDaemon::handle_link_state_change_request");
01066     }
01067     
01068     switch(new_state) {
01069     case Link::UNAVAILABLE:
01070         if (link->state() != Link::AVAILABLE) {
01071             log_err("LINK_STATE_CHANGE_REQUEST *%p: "
01072                     "tried to set state UNAVAILABLE in state %s",
01073                     link, Link::state_to_str(link->state()));
01074             return;
01075         }
01076         link->set_state(new_state);
01077         post_at_head(new LinkUnavailableEvent(link,
01078                      ContactEvent::reason_t(reason)));
01079         break;
01080 
01081     case Link::AVAILABLE:
01082         if (link->state() == Link::UNAVAILABLE) {
01083             link->set_state(Link::AVAILABLE);
01084             
01085         } else if (link->state() == Link::BUSY &&
01086                    reason        == ContactEvent::UNBLOCKED) {
01087             ASSERT(link->contact() != NULL);
01088             link->set_state(Link::OPEN);
01089             
01090         } else if (link->state() == Link::OPEN &&
01091                    reason        == ContactEvent::UNBLOCKED) {
01092             // a CL might send multiple requests to go from
01093             // BUSY->AVAILABLE, so we can safely ignore this
01094             
01095         } else {
01096             log_err("LINK_STATE_CHANGE_REQUEST *%p: "
01097                     "tried to set state AVAILABLE in state %s",
01098                     link, Link::state_to_str(link->state()));
01099             return;
01100         }
01101 
01102         post_at_head(new LinkAvailableEvent(link,
01103                      ContactEvent::reason_t(reason)));
01104         break;
01105         
01106     case Link::BUSY:
01107         log_err("LINK_STATE_CHANGE_REQUEST can't be used for state %s",
01108                 Link::state_to_str(new_state));
01109         break;
01110         
01111     case Link::OPENING:
01112     case Link::OPEN:
01113         // force the link to be available, since someone really wants it open
01114         if (link->state() == Link::UNAVAILABLE) {
01115             link->set_state(Link::AVAILABLE);
01116         }
01117         actions_->open_link(link);
01118         break;
01119 
01120     case Link::CLOSED:
01121         // The only case where we should get this event when the link
01122         // is not actually open is if it's in the process of being
01123         // opened but the CL can't actually open it.
01124         if (! link->isopen() && ! link->isopening()) {
01125             log_err("LINK_STATE_CHANGE_REQUEST *%p: "
01126                     "setting state CLOSED (%s) in unexpected state %s",
01127                     link, ContactEvent::reason_to_str(reason),
01128                     link->state_to_str(link->state()));
01129             break;
01130         }
01131 
01132         // If the link is open (not OPENING), we need a ContactDownEvent
01133         if (link->isopen()) {
01134             ASSERT(link->contact() != NULL);
01135             post_at_head(new ContactDownEvent(link->contact(),
01136                          ContactEvent::reason_t(reason)));
01137         }
01138 
01139         // close the link
01140         actions_->close_link(link);
01141         
01142         // now, based on the reason code, update the link availability
01143         // and set state accordingly
01144         if (reason == ContactEvent::IDLE) {
01145             link->set_state(Link::AVAILABLE);
01146         } else {
01147             link->set_state(Link::UNAVAILABLE);
01148             post_at_head(new LinkUnavailableEvent(link,
01149                          ContactEvent::reason_t(reason)));
01150         }
01151     
01152         break;
01153 
01154     default:
01155         PANIC("unhandled state %s", Link::state_to_str(new_state));
01156     }
01157 }
01158 
01159 //----------------------------------------------------------------------
01160 void
01161 BundleDaemon::handle_link_create(LinkCreateRequest*)
01162 {
01163     NOTIMPLEMENTED;
01164 }
01165 
01166 //----------------------------------------------------------------------
01167 void
01168 BundleDaemon::handle_link_query(LinkQueryRequest*)
01169 {
01170     BundleDaemon::post_at_head(new LinkReportEvent());
01171 }
01172 
01173 //----------------------------------------------------------------------
01174 void
01175 BundleDaemon::handle_link_report(LinkReportEvent*)
01176 {
01177 }
01178   
01179 //----------------------------------------------------------------------
01180 void
01181 BundleDaemon::handle_contact_up(ContactUpEvent* event)
01182 {
01183     const ContactRef& contact = event->contact_;
01184     Link* link = contact->link();
01185     
01186     //ignore stale notifications that an old contact is up
01187     oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_contact_up");
01188     if(link->contact() != contact)
01189     {
01190         log_info("CONTACT_UP *%p (contact %p) being ignored (old contact)", contact->link(), contact.object());
01191         return;
01192     }
01193     
01194     log_info("CONTACT_UP *%p (contact %p)", contact->link(), contact.object());
01195     link->set_state(Link::OPEN);
01196     link->stats_.contacts_++;
01197 }
01198 
01199 
01200 
01201 //----------------------------------------------------------------------
01202 void
01203 BundleDaemon::handle_contact_down(ContactDownEvent* event)
01204 {
01205     const ContactRef& contact = event->contact_;
01206     Link* link = contact->link();
01207     int reason = event->reason_;
01208     
01209     log_info("CONTACT_DOWN *%p (%s) (contact %p)",
01210              link, ContactEvent::reason_to_str(reason), contact.object());
01211 
01212     // we don't need to do anything here since we just generated this
01213     // event in response to a link state change request
01214 }
01215 
01216 //----------------------------------------------------------------------
01217 void
01218 BundleDaemon::handle_contact_query(ContactQueryRequest*)
01219 {
01220     BundleDaemon::post_at_head(new ContactReportEvent());
01221 }
01222 
01223 //----------------------------------------------------------------------
01224 void
01225 BundleDaemon::handle_contact_report(ContactReportEvent*)
01226 {
01227 }
01228 
01229 //----------------------------------------------------------------------
01230 void
01231 BundleDaemon::handle_reassembly_completed(ReassemblyCompletedEvent* event)
01232 {
01233     log_info("REASSEMBLY_COMPLETED bundle id %d",
01234              event->bundle_->bundleid_);
01235 
01236     // remove all the fragments from the pending list
01237     BundleRef ref("BundleDaemon::handle_reassembly_completed temporary");
01238     while ((ref = event->fragments_.pop_front()) != NULL) {
01239         try_delete_from_pending(ref.object());
01240     }
01241 
01242     // post a new event for the newly reassembled bundle
01243     post_at_head(new BundleReceivedEvent(event->bundle_.object(),
01244                                          EVENTSRC_FRAGMENTATION));
01245 }
01246 
01247 
01248 //----------------------------------------------------------------------
01249 void
01250 BundleDaemon::handle_route_add(RouteAddEvent* event)
01251 {
01252     log_info("ROUTE_ADD *%p", event->entry_);
01253 }
01254 
01255 //----------------------------------------------------------------------
01256 void
01257 BundleDaemon::handle_route_del(RouteDelEvent* event)
01258 {
01259     log_info("ROUTE_DEL %s", event->dest_.c_str());
01260 }
01261 
01262 //----------------------------------------------------------------------
01263 void
01264 BundleDaemon::handle_route_query(RouteQueryRequest*)
01265 {
01266     BundleDaemon::post_at_head(new RouteReportEvent());
01267 }
01268 
01269 //----------------------------------------------------------------------
01270 void
01271 BundleDaemon::handle_route_report(RouteReportEvent*)
01272 {
01273 }
01274 
01275 //----------------------------------------------------------------------
01276 void
01277 BundleDaemon::handle_custody_signal(CustodySignalEvent* event)
01278 {
01279     log_info("CUSTODY_SIGNAL: %s %u.%u %s (%s)",
01280              event->data_.orig_source_eid_.c_str(),
01281              event->data_.orig_creation_tv_.seconds_,
01282              event->data_.orig_creation_tv_.seqno_,
01283              event->data_.succeeded_ ? "succeeded" : "failed",
01284              CustodySignal::reason_to_str(event->data_.reason_));
01285 
01286     BundleRef orig_bundle =
01287         custody_bundles_->find(event->data_.orig_source_eid_,
01288                                event->data_.orig_creation_tv_);
01289     
01290     if (orig_bundle == NULL) {
01291         log_warn("received custody signal for bundle %s %u.%u "
01292                  "but don't have custody",
01293                  event->data_.orig_source_eid_.c_str(),
01294                  event->data_.orig_creation_tv_.seconds_,
01295                  event->data_.orig_creation_tv_.seqno_);
01296         return;
01297     }
01298 
01299     // release custody if either the signal succeded or if it
01300     // (paradoxically) failed due to duplicate transmission
01301     bool release = event->data_.succeeded_;
01302     if ((event->data_.succeeded_ == false) &&
01303         (event->data_.reason_ == BundleProtocol::CUSTODY_REDUNDANT_RECEPTION))
01304     {
01305         log_notice("releasing custody for bundle %s %u.%u "
01306                    "due to redundant reception",
01307                    event->data_.orig_source_eid_.c_str(),
01308                    event->data_.orig_creation_tv_.seconds_,
01309                    event->data_.orig_creation_tv_.seqno_);
01310         
01311         release = true;
01312     }
01313     
01314     if (release) {
01315         release_custody(orig_bundle.object());
01316         try_delete_from_pending(orig_bundle.object());
01317     }
01318 }
01319 
01320 //----------------------------------------------------------------------
01321 void
01322 BundleDaemon::handle_custody_timeout(CustodyTimeoutEvent* event)
01323 {
01324     Bundle* bundle = event->bundle_.object();
01325     Link*   link   = event->link_;
01326     
01327     log_info("CUSTODY_TIMEOUT *%p, *%p", bundle, link);
01328     
01329     // remove and delete the expired timer from the bundle
01330     oasys::ScopeLock l(&bundle->lock_, "BundleDaemon::handle_custody_timeout");
01331 
01332     bool found = false;
01333     CustodyTimer* timer = NULL;
01334     CustodyTimerVec::iterator iter;
01335     for (iter = bundle->custody_timers_.begin();
01336          iter != bundle->custody_timers_.end();
01337          ++iter)
01338     {
01339         timer = *iter;
01340         if (timer->link_ == link)
01341         {
01342             if (timer->pending()) {
01343                 log_err("multiple pending custody timers for link %s",
01344                         link->nexthop());
01345                 continue;
01346             }
01347             
01348             found = true;
01349             bundle->custody_timers_.erase(iter);
01350             break;
01351         }
01352     }
01353 
01354     if (!found) {
01355         log_err("custody timeout for *%p *%p: timer not found in bundle list",
01356                 bundle, link);
01357         return;
01358     }
01359 
01360     ASSERT(!timer->cancelled());
01361     
01362     if (!pending_bundles_->contains(bundle)) {
01363         log_err("custody timeout for *%p *%p: bundle not in pending list",
01364                 bundle, link);
01365     }
01366 
01367     // modify the TRANSMITTED entry in the forwarding log to indicate
01368     // that we got a custody timeout. then when the routers go through
01369     // to figure out whether the bundle needs to be re-sent, the
01370     // TRANSMITTED entry is no longer in there
01371     bool ok = bundle->fwdlog_.update(link, ForwardingInfo::CUSTODY_TIMEOUT);
01372     if (!ok) {
01373         log_err("custody timeout can't find ForwardingLog entry for link *%p",
01374                 link);
01375     }
01376     
01377     delete timer;
01378 
01379     // now fall through to let the router handle the event, typically
01380     // triggering a retransmission to the link in the event
01381 }
01382 
01383 //----------------------------------------------------------------------
01384 void
01385 BundleDaemon::handle_shutdown_request(ShutdownRequest* request)
01386 {
01387     shutting_down_ = true;
01388 
01389     (void)request;
01390 
01391     log_notice("Received shutdown request");
01392 
01393     oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_shutdown");
01394     
01395     const LinkSet* links = contactmgr_->links();
01396     LinkSet::const_iterator iter;
01397     Link* link = NULL;
01398 
01399     // close any open links
01400     for (iter = links->begin(); iter != links->end(); ++iter)
01401     {
01402         link = *iter;
01403         if (link->isopen()) {
01404             log_debug("Shutdown: closing link *%p\n", link);
01405             link->close();
01406         }
01407     }
01408 
01409     // call the rtr shutdown procedure
01410     if (rtr_shutdown_proc_) {
01411         (*rtr_shutdown_proc_)(rtr_shutdown_data_);
01412     }
01413 
01414     // call the app shutdown procedure
01415     if (app_shutdown_proc_) {
01416         (*app_shutdown_proc_)(app_shutdown_data_);
01417     }
01418 
01419     // signal to the main loop to bail
01420     set_should_stop();
01421 
01422     // fall through -- the DTNServer will close and flush all the data
01423     // stores
01424 }
01425 
01426 //----------------------------------------------------------------------
01427 void
01428 BundleDaemon::handle_status_request(StatusRequest* request)
01429 {
01430     (void)request;
01431     log_info("Received status request");
01432 }
01433 
01434 //----------------------------------------------------------------------
01435 bool
01436 BundleDaemon::add_to_pending(Bundle* bundle, bool add_to_store)
01437 {
01438     log_debug("adding bundle *%p to pending list", bundle);
01439     
01440     pending_bundles_->push_back(bundle);
01441     
01442     if (add_to_store) {
01443         bundle->in_datastore_ = true;
01444         actions_->store_add(bundle);
01445     }
01446 
01447     struct timeval now;
01448     gettimeofday(&now, 0);
01449     
01450     // schedule the bundle expiration timer
01451     struct timeval expiration_time;
01452     expiration_time.tv_sec =
01453         BundleTimestamp::TIMEVAL_CONVERSION +
01454         bundle->creation_ts_.seconds_ + 
01455         bundle->expiration_;
01456     
01457     expiration_time.tv_usec = now.tv_usec;
01458     
01459     long int when = expiration_time.tv_sec - now.tv_sec;
01460 
01461     bool ok_to_route = true;
01462     
01463     if (when > 0) {
01464         log_debug("scheduling expiration for bundle id %d at %u.%u "
01465                   "(in %lu seconds)",
01466                   bundle->bundleid_,
01467                   (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
01468                   when);
01469     } else {
01470         log_warn("scheduling IMMEDIATE expiration for bundle id %d: "
01471                  "[expiration %u, creation time %u.%u, offset %u, now %u.%u]",
01472                  bundle->bundleid_, bundle->expiration_,
01473                  (u_int)bundle->creation_ts_.seconds_,
01474                  (u_int)bundle->creation_ts_.seqno_,
01475                  BundleTimestamp::TIMEVAL_CONVERSION,
01476                  (u_int)now.tv_sec, (u_int)now.tv_usec);
01477         expiration_time = now;
01478         ok_to_route = false;
01479     }
01480 
01481     bundle->expiration_timer_ = new ExpirationTimer(bundle);
01482     bundle->expiration_timer_->schedule_at(&expiration_time);
01483 
01484     return ok_to_route;
01485 }
01486 
01487 //----------------------------------------------------------------------
01488 bool
01489 BundleDaemon::delete_from_pending(Bundle* bundle)
01490 {
01491     log_debug("removing bundle *%p from pending list", bundle);
01492 
01493     // first try to cancel the expiration timer if it's still
01494     // around
01495     if (bundle->expiration_timer_) {
01496         log_debug("cancelling expiration timer for bundle id %d",
01497                   bundle->bundleid_);
01498         
01499         bool cancelled = bundle->expiration_timer_->cancel();
01500         if (!cancelled) {
01501            log_crit("unexpected error cancelling expiration timer "
01502                      "for bundle *%p", bundle);
01503         }
01504         
01505         bundle->expiration_timer_->bundleref_.release();
01506         bundle->expiration_timer_ = NULL;
01507     }
01508 
01509     bool erased = pending_bundles_->erase(bundle);
01510 
01511     if (!erased) {
01512         log_err("unexpected error removing bundle from pending list");
01513     }
01514 
01515     return erased;
01516 }
01517 
01518 //----------------------------------------------------------------------
01519 bool
01520 BundleDaemon::try_delete_from_pending(Bundle* bundle)
01521 {
01522     /*
01523      * Check to see if we should remove the bundle from the pending
01524      * list, after which all references to the bundle should be
01525      * cleaned up and the bundle will be deleted from the system.
01526      *
01527      * We do this only if:
01528      *
01529      * 1) We're configured for early deletion
01530      * 2) The bundle isn't queued on any lists other than the pending
01531      *    list. This covers the case where we have custody, since the
01532      *    bundle will be on the custody_bundles list
01533      * 3) The bundle isn't currently in flight, as recorded
01534      *    in the forwarding log.
01535      *
01536      * This allows a router (or the custody system) to maintain a
01537      * retention constraint by putting the bundle on a list, and
01538      * thereby adding a mapping.
01539      */
01540 
01541     if (! bundle->is_queued_on(pending_bundles_))
01542     {
01543         if (bundle->expiration_timer_ == NULL) {
01544             log_debug("try_delete_from_pending(*%p): bundle already expired",
01545                       bundle);
01546             return false;
01547         }
01548         
01549         log_err("try_delete_from_pending(*%p): bundle not in pending list!",
01550                 bundle);
01551         return false;
01552     }
01553 
01554     if (!params_.early_deletion_) {
01555         log_debug("try_delete_from_pending(*%p): not deleting because "
01556                   "early deletion disabled",
01557                   bundle);
01558         return false;
01559     }
01560 
01561     size_t num_mappings = bundle->num_mappings();
01562     if (num_mappings != 1) {
01563         log_debug("try_delete_from_pending(*%p): not deleting because "
01564                   "bundle has %zu mappings",
01565                   bundle, num_mappings);
01566         return false;
01567     }
01568     
01569     size_t num_in_flight = bundle->fwdlog_.get_count(ForwardingInfo::IN_FLIGHT);
01570     if (num_in_flight > 0) {
01571         log_debug("try_delete_from_pending(*%p): not deleting because "
01572                   "bundle in flight on %zu links",
01573                   bundle, num_in_flight);
01574         return false;
01575     }
01576 
01577     return delete_from_pending(bundle);
01578 }
01579 
01580 //----------------------------------------------------------------------
01581 bool
01582 BundleDaemon::delete_bundle(Bundle* bundle, status_report_reason_t reason)
01583 {
01584     // send a bundle deletion status report if we have custody or the
01585     // bundle's deletion status report request flag is set and a reason
01586     // for deletion is provided
01587     bool send_status = (bundle->local_custody_ ||
01588                        (bundle->deletion_rcpt_ &&
01589                         reason != BundleProtocol::REASON_NO_ADDTL_INFO));
01590         
01591     // check if we have custody, if so, remove it
01592     if (bundle->local_custody_) {
01593         release_custody(bundle);
01594     }
01595 
01596     // check if bundle is a fragment, if so, remove any fragmentation state
01597     if (bundle->is_fragment_) {
01598         fragmentmgr_->delete_fragment(bundle);
01599     }
01600 
01601     // delete the bundle from the pending list
01602     bool erased = true;
01603     if (bundle->is_queued_on(pending_bundles_)) {
01604         erased = delete_from_pending(bundle);
01605     }
01606 
01607     if (erased && send_status) {
01608         generate_status_report(bundle, BundleProtocol::STATUS_DELETED, reason);
01609     }
01610 
01611     // XXX/demmer should try to cancel transmission on any links where
01612     // the bundle is active
01613 
01614     return erased;
01615 }
01616 
01617 //----------------------------------------------------------------------
01618 Bundle*
01619 BundleDaemon::find_duplicate(Bundle* b)
01620 {
01621     oasys::ScopeLock l(pending_bundles_->lock(), 
01622                        "BundleDaemon::find_duplicate");
01623     BundleList::iterator iter;
01624     for (iter = pending_bundles_->begin();
01625          iter != pending_bundles_->end();
01626          ++iter)
01627     {
01628         Bundle* b2 = *iter;
01629         
01630         if ((b->source_.equals(b2->source_)) &&
01631             (b->creation_ts_.seconds_ == b2->creation_ts_.seconds_) &&
01632             (b->creation_ts_.seqno_   == b2->creation_ts_.seqno_) &&
01633             (b->is_fragment_          == b2->is_fragment_) &&
01634             (b->frag_offset_          == b2->frag_offset_) &&
01635             (b->orig_length_          == b2->orig_length_) &&
01636             (b->payload_.length()     == b2->payload_.length()))
01637         {
01638             return b2;
01639         }
01640     }
01641 
01642     return NULL;
01643 }
01644 
01645 //----------------------------------------------------------------------
01646 void
01647 BundleDaemon::handle_bundle_free(BundleFreeEvent* event)
01648 {
01649     Bundle* bundle = event->bundle_;
01650     event->bundle_ = NULL;
01651     ASSERT(bundle->refcount() == 0);
01652 
01653     bundle->lock_.lock("BundleDaemon::handle_bundle_free");
01654 
01655     if (bundle->in_datastore_) {
01656         actions_->store_del(bundle);
01657     }
01658     
01659     delete bundle;
01660 }
01661 
01662 //----------------------------------------------------------------------
01663 void
01664 BundleDaemon::handle_event(BundleEvent* event)
01665 {
01666     dispatch_event(event);
01667     
01668     if (! event->daemon_only_) {
01669         // dispatch the event to the router(s) and the contact manager
01670         router_->handle_event(event);
01671         contactmgr_->handle_event(event);
01672     }
01673 
01674     stats_.events_processed_++;
01675 
01676     if (event->processed_notifier_) {
01677         event->processed_notifier_->notify();
01678     }
01679 }
01680 
01681 //----------------------------------------------------------------------
01682 void
01683 BundleDaemon::load_registrations()
01684 {
01685     admin_reg_ = new AdminRegistration();
01686     {
01687         RegistrationAddedEvent e(admin_reg_, EVENTSRC_ADMIN);
01688         handle_event(&e);
01689     }
01690 
01691     EndpointID ping_eid(local_eid());
01692     bool ok = ping_eid.append_service_tag("ping");
01693     if (!ok) {
01694         log_crit("local eid (%s) scheme must be able to append service tags",
01695                  local_eid().c_str());
01696         exit(1);
01697     }
01698     
01699     ping_reg_ = new PingRegistration(ping_eid);
01700     {
01701         RegistrationAddedEvent e(ping_reg_, EVENTSRC_ADMIN);
01702         handle_event(&e);
01703     }
01704 
01705     Registration* reg;
01706     RegistrationStore* reg_store = RegistrationStore::instance();
01707     RegistrationStore::iterator* iter = reg_store->new_iterator();
01708 
01709     while (iter->next() == 0) {
01710         reg = reg_store->get(iter->cur_val());
01711         if (reg == NULL) {
01712             log_err("error loading registration %d from data store",
01713                     iter->cur_val());
01714             continue;
01715         }
01716         
01717         RegistrationAddedEvent e(reg, EVENTSRC_STORE);
01718         handle_event(&e);
01719     }
01720 
01721     delete iter;
01722 }
01723 
01724 //----------------------------------------------------------------------
01725 void
01726 BundleDaemon::load_bundles()
01727 {
01728     Bundle* bundle;
01729     BundleStore* bundle_store = BundleStore::instance();
01730     BundleStore::iterator* iter = bundle_store->new_iterator();
01731 
01732     log_notice("loading bundles from data store");
01733 
01734     u_int64_t total_size = 0;
01735     
01736     for (iter->begin(); iter->more(); iter->next()) {
01737         bundle = bundle_store->get(iter->cur_val());
01738         
01739         if (bundle == NULL) {
01740             log_err("error loading bundle %d from data store",
01741                     iter->cur_val());
01742             continue;
01743         }
01744 
01745         total_size += bundle->durable_size();
01746         
01747         BundleReceivedEvent e(bundle, EVENTSRC_STORE);
01748         handle_event(&e);
01749 
01750         // in the constructor, we disabled notifiers on the event
01751         // queue, so in case loading triggers other events, we just
01752         // let them queue up and handle them later when we're done
01753         // loading all the bundles
01754     }
01755     
01756     bundle_store->set_total_size(total_size);
01757 
01758     delete iter;
01759 }
01760 
01761 //----------------------------------------------------------------------
01762 void
01763 BundleDaemon::run()
01764 {
01765     if (! BundleTimestamp::check_local_clock()) {
01766         exit(1);
01767     }
01768     
01769     router_ = BundleRouter::create_router(BundleRouter::config_.type_.c_str());
01770     router_->initialize();
01771     
01772     load_registrations();
01773     load_bundles();
01774 
01775     BundleEvent* event;
01776 
01777     oasys::TimerSystem* timersys = oasys::TimerSystem::instance();
01778     
01779     struct pollfd pollfds[2];
01780     struct pollfd* event_poll = &pollfds[0];
01781     struct pollfd* timer_poll = &pollfds[1];
01782     
01783     event_poll->fd     = eventq_->read_fd();
01784     event_poll->events = POLLIN;
01785 
01786     timer_poll->fd     = timersys->notifier()->read_fd();
01787     timer_poll->events = POLLIN;
01788     
01789     while (1) {
01790         if (should_stop()) {
01791             break;
01792         }
01793 
01794         int timeout = timersys->run_expired_timers();
01795 
01796         if (eventq_->size() > 0) {
01797             bool ok = eventq_->try_pop(&event);
01798             ASSERT(ok);
01799             
01800             oasys::Time now;
01801             now.get_time();
01802         
01803             // handle the event
01804             handle_event(event);
01805 
01806             int elapsed = now.elapsed_ms();
01807             if (elapsed > 2000) {
01808                 log_warn("event %s took %d ms to process",
01809                          event->type_str(), elapsed);
01810             }
01811         
01812             // clean up the event
01813             delete event;
01814             
01815             continue; // no reason to poll
01816         }
01817         
01818         pollfds[0].revents = 0;
01819         pollfds[1].revents = 0;
01820 
01821         int cc = oasys::IO::poll_multiple(pollfds, 2, timeout);
01822         log_debug("poll returned %d", cc);
01823 
01824         if (cc == oasys::IOTIMEOUT) {
01825             log_debug("poll timeout");
01826             continue;
01827 
01828         } else if (cc <= 0) {
01829             log_err("unexpected return %d from poll_multiple!", cc);
01830             continue;
01831         }
01832 
01833         // if the event poll fired, we just go back to the top of the
01834         // loop to drain the queue
01835         if (event_poll->revents != 0) {
01836             log_debug("poll returned new event to handle");
01837         }
01838 
01839         // if the timer notifier fired, then someone just scheduled a
01840         // new timer, so we just continue, which will call
01841         // run_expired_timers and handle it
01842         if (timer_poll->revents != 0) {
01843             log_debug("poll returned new timers to handle");
01844             timersys->notifier()->clear();
01845         }
01846     }
01847 }
01848 
01849 } // namespace dtn

Generated on Thu Jun 7 16:56:48 2007 for DTN Reference Implementation by  doxygen 1.5.1