00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include <oasys/util/Time.h>
00040
00041 #include "Bundle.h"
00042 #include "BundleActions.h"
00043 #include "BundleEvent.h"
00044 #include "BundleDaemon.h"
00045 #include "BundleStatusReport.h"
00046 #include "BundleTimestamp.h"
00047 #include "CustodySignal.h"
00048 #include "ExpirationTimer.h"
00049 #include "FragmentManager.h"
00050 #include "contacts/Contact.h"
00051 #include "contacts/ContactManager.h"
00052 #include "reg/AdminRegistration.h"
00053 #include "reg/APIRegistration.h"
00054 #include "reg/Registration.h"
00055 #include "reg/RegistrationTable.h"
00056 #include "routing/BundleRouter.h"
00057 #include "routing/RouteTable.h"
00058 #include "storage/BundleStore.h"
00059 #include "storage/RegistrationStore.h"
00060
00061 namespace dtn {
00062
00063 BundleDaemon* BundleDaemon::instance_ = NULL;
00064 BundleDaemon::Params BundleDaemon::params_;
00065
00066
00067 BundleDaemon::BundleDaemon()
00068 : BundleEventHandler("BundleDaemon", "/dtn/bundle/daemon"),
00069 Thread("BundleDaemon", CREATE_JOINABLE)
00070 {
00071
00072
00073 local_eid_.assign("dtn://localhost.dtn");
00074
00075 memset(&stats_, 0, sizeof(stats_));
00076
00077 pending_bundles_ = new BundleList("pending_bundles");
00078 custody_bundles_ = new BundleList("custody_bundles");
00079
00080 contactmgr_ = new ContactManager();
00081 fragmentmgr_ = new FragmentManager();
00082 reg_table_ = new RegistrationTable();
00083
00084 router_ = 0;
00085
00086 app_shutdown_proc_ = NULL;
00087 app_shutdown_data_ = NULL;
00088 }
00089
00090
00091 void
00092 BundleDaemon::do_init()
00093 {
00094 actions_ = new BundleActions();
00095 eventq_ = new oasys::MsgQueue<BundleEvent*>(logpath_);
00096 eventq_->notify_when_empty();
00097 }
00098
00099
00100 void
00101 BundleDaemon::post(BundleEvent* event)
00102 {
00103 instance_->post_event(event);
00104 }
00105
00106
00107 void
00108 BundleDaemon::post_at_head(BundleEvent* event)
00109 {
00110 instance_->post_event(event, false);
00111 }
00112
00113
00114 bool
00115 BundleDaemon::post_and_wait(BundleEvent* event,
00116 oasys::Notifier* notifier,
00117 int timeout, bool at_back)
00118 {
00119
00120
00121
00122
00123 ASSERT(! oasys::Thread::start_barrier_enabled());
00124
00125 ASSERT(event->processed_notifier_ == NULL);
00126 event->processed_notifier_ = notifier;
00127 if (at_back) {
00128 post(event);
00129 } else {
00130 post_at_head(event);
00131 }
00132 return notifier->wait(NULL, timeout);
00133 }
00134
00135
00136 void
00137 BundleDaemon::post_event(BundleEvent* event, bool at_back)
00138 {
00139 log_debug("posting event (%p) with type %s (at %s)",
00140 event, event->type_str(), at_back ? "back" : "head");
00141 eventq_->push(event, at_back);
00142 }
00143
00144
00145 void
00146 BundleDaemon::get_routing_state(oasys::StringBuffer* buf)
00147 {
00148 router_->get_routing_state(buf);
00149 contactmgr_->dump(buf);
00150 }
00151
00152
00153 void
00154 BundleDaemon::get_bundle_stats(oasys::StringBuffer* buf)
00155 {
00156 buf->appendf("%zu pending -- "
00157 "%zu custody -- "
00158 "%u received -- "
00159 "%u delivered -- "
00160 "%u generated -- "
00161 "%u transmitted -- "
00162 "%u expired -- "
00163 "%u duplicate",
00164 pending_bundles()->size(),
00165 custody_bundles()->size(),
00166 stats_.bundles_received_,
00167 stats_.bundles_delivered_,
00168 stats_.bundles_generated_,
00169 stats_.bundles_transmitted_,
00170 stats_.bundles_expired_,
00171 stats_.duplicate_bundles_);
00172 }
00173
00174
00175 void
00176 BundleDaemon::get_daemon_stats(oasys::StringBuffer* buf)
00177 {
00178 buf->appendf("%zu pending_events -- "
00179 "%u processed_events",
00180 eventq_->size(),
00181 stats_.events_processed_);
00182 }
00183
00184
00185 void
00186 BundleDaemon::reset_stats()
00187 {
00188 memset(&stats_, 0, sizeof(stats_));
00189
00190 oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::reset_stats");
00191
00192 const LinkSet* links = contactmgr_->links();
00193 LinkSet::const_iterator iter;
00194 for (iter = links->begin(); iter != links->end(); ++iter) {
00195 (*iter)->reset_stats();
00196 }
00197 }
00198
00199
00200 void
00201 BundleDaemon::generate_status_report(Bundle* orig_bundle,
00202 status_report_flag_t flag,
00203 status_report_reason_t reason)
00204 {
00205 log_debug("generating return receipt status report, "
00206 "flag = 0x%x, reason = 0x%x", flag, reason);
00207
00208 Bundle* report = new Bundle();
00209 BundleStatusReport::create_status_report(report, orig_bundle,
00210 local_eid_, flag, reason);
00211
00212 BundleReceivedEvent e(report, EVENTSRC_ADMIN, report->payload_.length());
00213 handle_event(&e);
00214 }
00215
00216
00217 void
00218 BundleDaemon::generate_custody_signal(Bundle* bundle, bool succeeded,
00219 custody_signal_reason_t reason)
00220 {
00221 if (bundle->local_custody_) {
00222 log_err("send_custody_signal(*%p): already have local custody",
00223 bundle);
00224 return;
00225 }
00226
00227 if (bundle->custodian_.equals(EndpointID::NULL_EID())) {
00228 log_err("send_custody_signal(*%p): current custodian is NULL_EID",
00229 bundle);
00230 return;
00231 }
00232
00233 Bundle* signal = new Bundle();
00234 CustodySignal::create_custody_signal(signal, bundle, local_eid_,
00235 succeeded, reason);
00236
00237 BundleReceivedEvent e(signal, EVENTSRC_ADMIN, signal->payload_.length());
00238 handle_event(&e);
00239 }
00240
00241
00242 void
00243 BundleDaemon::cancel_custody_timers(Bundle* bundle)
00244 {
00245 oasys::ScopeLock l(&bundle->lock_, "BundleDaemon::cancel_custody_timers");
00246
00247 CustodyTimerVec::iterator iter;
00248 for (iter = bundle->custody_timers_.begin();
00249 iter != bundle->custody_timers_.end();
00250 ++iter)
00251 {
00252 bool ok = (*iter)->cancel();
00253 if (!ok) {
00254 log_crit("unexpected error cancelling custody timer for bundle *%p",
00255 bundle);
00256 }
00257
00258
00259
00260 }
00261
00262 bundle->custody_timers_.clear();
00263 }
00264
00265
00266 void
00267 BundleDaemon::accept_custody(Bundle* bundle)
00268 {
00269 log_info("accept_custody *%p", bundle);
00270
00271 if (bundle->local_custody_) {
00272 log_err("accept_custody(*%p): already have local custody",
00273 bundle);
00274 return;
00275 }
00276
00277 if (bundle->custodian_.equals(local_eid_)) {
00278 log_err("send_custody_signal(*%p): "
00279 "current custodian is already local_eid",
00280 bundle);
00281 return;
00282 }
00283
00284
00285
00286 if (! bundle->custodian_.equals(EndpointID::NULL_EID())) {
00287 generate_custody_signal(bundle, true, BundleProtocol::CUSTODY_NO_ADDTL_INFO);
00288 }
00289
00290
00291
00292 bundle->custodian_.assign(local_eid_);
00293 bundle->local_custody_ = true;
00294 actions_->store_update(bundle);
00295
00296 custody_bundles_->push_back(bundle);
00297
00298
00299
00300 if (bundle->custody_rcpt_) {
00301 generate_status_report(bundle, BundleProtocol::STATUS_CUSTODY_ACCEPTED);
00302 }
00303 }
00304
00305
00306 void
00307 BundleDaemon::release_custody(Bundle* bundle)
00308 {
00309 log_info("release_custody *%p", bundle);
00310
00311 if (!bundle->local_custody_) {
00312 log_err("release_custody(*%p): don't have local custody",
00313 bundle);
00314 return;
00315 }
00316
00317 cancel_custody_timers(bundle);
00318
00319 bundle->custodian_.assign(EndpointID::NULL_EID());
00320 bundle->local_custody_ = false;
00321 actions_->store_update(bundle);
00322
00323 custody_bundles_->erase(bundle);
00324 }
00325
00326
00327 void
00328 BundleDaemon::deliver_to_registration(Bundle* bundle,
00329 Registration* registration)
00330 {
00331
00332
00333 bundle->owner_ = "daemon";
00334
00335 if (bundle->is_fragment_) {
00336 log_debug("deferring delivery of bundle *%p to registration %d (%s) "
00337 "since bundle is a fragment",
00338 bundle, registration->regid(),
00339 registration->endpoint().c_str());
00340
00341 fragmentmgr_->process_for_reassembly(bundle);
00342 return;
00343 }
00344
00345 log_debug("delivering bundle *%p to registration %d (%s)",
00346 bundle, registration->regid(),
00347 registration->endpoint().c_str());
00348
00349 registration->deliver_bundle(bundle);
00350 }
00351
00352
00353 void
00354 BundleDaemon::check_registrations(Bundle* bundle)
00355 {
00356 int num;
00357 log_debug("checking for matching registrations for bundle *%p", bundle);
00358
00359 RegistrationList matches;
00360 RegistrationList::iterator iter;
00361
00362 num = reg_table_->get_matching(bundle->dest_, &matches);
00363
00364 for (iter = matches.begin(); iter != matches.end(); ++iter)
00365 {
00366 Registration* registration = *iter;
00367 deliver_to_registration(bundle, registration);
00368 }
00369 }
00370
00371
00372 void
00373 BundleDaemon::handle_bundle_received(BundleReceivedEvent* event)
00374 {
00375 Bundle* bundle = event->bundleref_.object();
00376
00377
00378 const char* source_str = "";
00379 switch (event->source_) {
00380 case EVENTSRC_PEER:
00381 stats_.bundles_received_++;
00382 break;
00383
00384 case EVENTSRC_APP:
00385 stats_.bundles_received_++;
00386 source_str = " (from app)";
00387 break;
00388
00389 case EVENTSRC_STORE:
00390 source_str = " (from data store)";
00391 break;
00392
00393 case EVENTSRC_ADMIN:
00394 stats_.bundles_generated_++;
00395 source_str = " (generated)";
00396 break;
00397
00398 case EVENTSRC_FRAGMENTATION:
00399 stats_.bundles_generated_++;
00400 source_str = " (from fragmentation)";
00401 break;
00402
00403 default:
00404 NOTREACHED;
00405 }
00406
00407
00408
00409 if (log_enabled(oasys::LOG_DEBUG)) {
00410 oasys::StaticStringBuffer<1024> buf;
00411 buf.appendf("BUNDLE_RECEIVED%s: (%zu bytes recvd)\n",
00412 source_str, event->bytes_received_);
00413 bundle->format_verbose(&buf);
00414 log_multiline(oasys::LOG_DEBUG, buf.c_str());
00415 } else {
00416 log_info("BUNDLE_RECEIVED%s *%p (%zu bytes recvd)",
00417 source_str, bundle, event->bytes_received_);
00418 }
00419
00420
00421
00422
00423 if (bundle->expiration_ == 0) {
00424 log_warn("bundle id %d arrived with zero expiration time",
00425 bundle->bundleid_);
00426 }
00427
00428 u_int32_t now = BundleTimestamp::get_current_time();
00429 if ((bundle->creation_ts_.seconds_ > now) &&
00430 (bundle->creation_ts_.seconds_ - now > 30000))
00431 {
00432 log_warn("bundle id %d arrived with creation time in the future "
00433 "(%u > %u)",
00434 bundle->bundleid_, bundle->creation_ts_.seconds_, now);
00435 }
00436
00437
00438
00439
00440
00441 if (bundle->payload_.is_file_open()) {
00442 log_warn("bundle id %d arrived with payload file still open",
00443 bundle->bundleid_);
00444 bundle->payload_.close_file();
00445 }
00446
00447
00448
00449
00450 if (bundle->receive_rcpt_ && (event->source_ != EVENTSRC_STORE)) {
00451 generate_status_report(bundle, BundleProtocol::STATUS_RECEIVED);
00452 }
00453
00454
00455
00456
00457
00458 if (event->bytes_received_ != bundle->payload_.length()) {
00459 log_debug("partial bundle, making reactive fragment of %zu bytes",
00460 event->bytes_received_);
00461
00462 fragmentmgr_->convert_to_fragment(bundle, event->bytes_received_);
00463 }
00464
00465
00466
00467
00468
00469
00470 Bundle* duplicate = find_duplicate(bundle);
00471 if (duplicate != NULL) {
00472 log_notice("got duplicate bundle: %s -> %s creation %u.%u",
00473 bundle->source_.c_str(),
00474 bundle->dest_.c_str(),
00475 bundle->creation_ts_.seconds_,
00476 bundle->creation_ts_.seqno_);
00477
00478 stats_.duplicate_bundles_++;
00479
00480 if (bundle->custody_requested_ && duplicate->local_custody_)
00481 {
00482 generate_custody_signal(bundle, false,
00483 BundleProtocol::CUSTODY_REDUNDANT_RECEPTION);
00484 }
00485
00486
00487
00488
00489
00490
00491 event->daemon_only_ = true;
00492 return;
00493 }
00494
00495
00496
00497
00498
00499
00500
00501
00502
00503
00504 bool ok_to_route =
00505 add_to_pending(bundle, (event->source_ != EVENTSRC_STORE));
00506
00507 if (!ok_to_route) {
00508 event->daemon_only_ = true;
00509 return;
00510 }
00511
00512
00513
00514
00515
00516
00517
00518 if (bundle->custody_requested_ && params_.accept_custody_)
00519 {
00520 if (event->source_ != EVENTSRC_STORE) {
00521 accept_custody(bundle);
00522
00523 } else if (bundle->local_custody_) {
00524 custody_bundles_->push_back(bundle);
00525 }
00526 }
00527
00528
00529
00530
00531 check_registrations(bundle);
00532
00533
00534
00535
00536
00537 }
00538
00539
00540 void
00541 BundleDaemon::handle_bundle_transmitted(BundleTransmittedEvent* event)
00542 {
00543 Bundle* bundle = event->bundleref_.object();
00544 Link* link = event->contact_->link();
00545
00546
00547
00548
00549
00550
00551
00552 size_t total_len = BundleProtocol::formatted_length(bundle);
00553
00554 stats_.bundles_transmitted_++;
00555 link->stats()->bundles_transmitted_++;
00556 link->stats()->bundles_inflight_--;
00557
00558 link->stats()->bytes_transmitted_ += event->bytes_sent_;
00559 link->stats()->bytes_inflight_ -= total_len;
00560
00561 log_info("BUNDLE_TRANSMITTED id:%d (%zu bytes_sent/%zu reliable) -> %s (%s)",
00562 bundle->bundleid_,
00563 event->bytes_sent_,
00564 event->reliably_sent_,
00565 link->name(),
00566 link->nexthop());
00567
00568
00569
00570
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581
00582
00583 if (params_.retry_reliable_unacked_ &&
00584 link->is_reliable() &&
00585 (event->bytes_sent_ != event->reliably_sent_) &&
00586 (event->reliably_sent_ == 0))
00587 {
00588 bundle->fwdlog_.update(link, ForwardingInfo::TRANSMIT_FAILED);
00589 return;
00590 }
00591
00592
00593
00594
00595 bundle->fwdlog_.update(link,
00596 ForwardingInfo::TRANSMITTED);
00597
00598
00599
00600
00601
00602 ForwardingInfo fwdinfo;
00603 bool ok = bundle->fwdlog_.get_latest_entry(link, &fwdinfo);
00604 ASSERTF(ok, "no forwarding log entry for transmission");
00605 ASSERT(fwdinfo.state_ == ForwardingInfo::TRANSMITTED);
00606
00607
00608
00609
00610
00611
00612
00613 if (link->reliable_) {
00614 fragmentmgr_->try_to_reactively_fragment(bundle, event->reliably_sent_);
00615 } else {
00616 fragmentmgr_->try_to_reactively_fragment(bundle, event->bytes_sent_);
00617 }
00618
00619
00620
00621
00622 if (bundle->forward_rcpt_) {
00623 generate_status_report(bundle, BundleProtocol::STATUS_FORWARDED);
00624 }
00625
00626
00627
00628
00629 if (bundle->local_custody_) {
00630 bundle->custody_timers_.push_back(
00631 new CustodyTimer(fwdinfo.timestamp_,
00632 fwdinfo.custody_timer_,
00633 bundle, link));
00634
00635
00636
00637
00638
00639
00640
00641 }
00642
00643
00644
00645
00646
00647
00648 try_delete_from_pending(bundle);
00649 }
00650
00651
00652 void
00653 BundleDaemon::handle_bundle_transmit_failed(BundleTransmitFailedEvent* event)
00654 {
00655
00656
00657
00658 Bundle* bundle = event->bundleref_.object();
00659
00660 log_info("BUNDLE_TRANSMIT_FAILED id:%d -> %s (%s)",
00661 bundle->bundleid_,
00662 event->contact_->link()->name(),
00663 event->contact_->link()->nexthop());
00664
00665
00666
00667
00668
00669 bundle->fwdlog_.update(event->contact_->link(),
00670 ForwardingInfo::TRANSMIT_FAILED);
00671
00672
00673
00674
00675 }
00676
00677
00678 void
00679 BundleDaemon::handle_bundle_delivered(BundleDeliveredEvent* event)
00680 {
00681
00682 stats_.bundles_delivered_++;
00683
00684
00685
00686
00687 Bundle* bundle = event->bundleref_.object();
00688
00689 log_info("BUNDLE_DELIVERED id:%d (%zu bytes) -> regid %d (%s)",
00690 bundle->bundleid_, bundle->payload_.length(),
00691 event->registration_->regid(),
00692 event->registration_->endpoint().c_str());
00693
00694
00695
00696
00697 if (bundle->delivery_rcpt_)
00698 {
00699 generate_status_report(bundle, BundleProtocol::STATUS_DELIVERED);
00700 }
00701
00702
00703
00704
00705
00706
00707
00708
00709 if (bundle->custody_requested_)
00710 {
00711 if (bundle->local_custody_) {
00712 release_custody(bundle);
00713
00714 } else if (bundle->custodian_.equals(EndpointID::NULL_EID())) {
00715 log_info("custodial bundle *%p delivered before custody accepted",
00716 bundle);
00717
00718 } else {
00719 generate_custody_signal(bundle, true,
00720 BundleProtocol::CUSTODY_NO_ADDTL_INFO);
00721 }
00722 }
00723
00724
00725
00726
00727
00728
00729 try_delete_from_pending(bundle);
00730 }
00731
00732
00733 void
00734 BundleDaemon::handle_bundle_expired(BundleExpiredEvent* event)
00735 {
00736
00737 stats_.bundles_expired_++;
00738
00739 Bundle* bundle = event->bundleref_.object();
00740
00741 log_info("BUNDLE_EXPIRED *%p", bundle);
00742
00743 ASSERT(bundle->expiration_timer_ == NULL);
00744
00745
00746 if (bundle->local_custody_) {
00747 release_custody(bundle);
00748 }
00749
00750
00751 delete_from_pending(bundle, BundleProtocol::REASON_LIFETIME_EXPIRED);
00752
00753
00754
00755
00756
00757
00758
00759 }
00760
00761
00762 void
00763 BundleDaemon::handle_registration_added(RegistrationAddedEvent* event)
00764 {
00765 Registration* registration = event->registration_;
00766 log_info("REGISTRATION_ADDED %d %s",
00767 registration->regid(), registration->endpoint().c_str());
00768
00769 if (!reg_table_->add(registration,
00770 (event->source_ == EVENTSRC_APP) ? true : false))
00771 {
00772 log_err("error adding registration %d to table",
00773 registration->regid());
00774 }
00775
00776 oasys::ScopeLock l(pending_bundles_->lock(),
00777 "BundleDaemon::handle_registration_added");
00778 BundleList::iterator iter;
00779 for (iter = pending_bundles_->begin();
00780 iter != pending_bundles_->end();
00781 ++iter)
00782 {
00783 Bundle* bundle = *iter;
00784
00785 if (registration->endpoint().match(bundle->dest_))
00786 {
00787 deliver_to_registration(bundle, registration);
00788 }
00789 }
00790 }
00791
00792
00793 void
00794 BundleDaemon::handle_registration_removed(RegistrationRemovedEvent* event)
00795 {
00796 Registration* registration = event->registration_;
00797 log_info("REGISTRATION_REMOVED %d %s",
00798 registration->regid(), registration->endpoint().c_str());
00799
00800 if (!reg_table_->del(registration->regid())) {
00801 log_err("error removing registration %d from table",
00802 registration->regid());
00803 return;
00804 }
00805
00806 delete registration;
00807 }
00808
00809
00810 void
00811 BundleDaemon::handle_registration_expired(RegistrationExpiredEvent* event)
00812 {
00813 Registration* registration = reg_table_->get(event->regid_);
00814
00815 if (registration == NULL) {
00816 log_err("REGISTRATION_EXPIRED -- dead regid %d", event->regid_);
00817 return;
00818 }
00819
00820 registration->set_expired(true);
00821
00822 if (registration->active()) {
00823
00824
00825
00826 log_info("REGISTRATION_EXPIRED %d -- deferred until binding clears",
00827 event->regid_);
00828 } else {
00829
00830 log_info("REGISTRATION_EXPIRED %d", event->regid_);
00831 reg_table_->del(registration->regid());
00832 delete registration;
00833 }
00834 }
00835
00836
00837 void
00838 BundleDaemon::handle_link_available(LinkAvailableEvent* event)
00839 {
00840 Link* link = event->link_;
00841 ASSERT(link->isavailable());
00842
00843 log_info("LINK_AVAILABLE *%p", link);
00844 }
00845
00846
00847 void
00848 BundleDaemon::handle_link_unavailable(LinkUnavailableEvent* event)
00849 {
00850 Link* link = event->link_;
00851 ASSERT(!link->isavailable());
00852
00853 log_info("LINK UNAVAILABLE *%p", link);
00854 }
00855
00856
00857 void
00858 BundleDaemon::handle_link_state_change_request(LinkStateChangeRequest* request)
00859 {
00860
00861
00862
00863
00864 Link* link = request->link_;
00865 Link::state_t new_state = request->state_;
00866 Link::state_t old_state = request->old_state_;
00867 ContactEvent::reason_t reason = request->reason_;
00868
00869 if (link->contact() != request->contact_) {
00870 log_warn("stale LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for link *%p: "
00871 "contact %p != current contact %p",
00872 Link::state_to_str(old_state), Link::state_to_str(new_state),
00873 ContactEvent::reason_to_str(reason), link,
00874 request->contact_.object(), link->contact().object());
00875 return;
00876 }
00877
00878 if (old_state != link->state()) {
00879 log_warn("stale LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for link *%p: "
00880 "old_state != current state",
00881 Link::state_to_str(old_state), Link::state_to_str(new_state),
00882 ContactEvent::reason_to_str(reason), link);
00883 return;
00884 }
00885
00886 log_info("LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for link *%p",
00887 Link::state_to_str(old_state), Link::state_to_str(new_state),
00888 ContactEvent::reason_to_str(reason), link);
00889
00890 switch(new_state) {
00891 case Link::UNAVAILABLE:
00892 if (link->state() != Link::AVAILABLE) {
00893 log_err("LINK_STATE_CHANGE_REQUEST *%p: "
00894 "tried to set state UNAVAILABLE in state %s",
00895 link, Link::state_to_str(link->state()));
00896 return;
00897 }
00898 link->set_state(new_state);
00899 post_at_head(new LinkUnavailableEvent(link, reason));
00900 break;
00901
00902 case Link::AVAILABLE:
00903 if (link->state() == Link::UNAVAILABLE) {
00904 link->set_state(Link::AVAILABLE);
00905
00906 } else if (link->state() == Link::BUSY &&
00907 reason == ContactEvent::UNBLOCKED) {
00908 ASSERT(link->contact() != NULL);
00909 link->set_state(Link::OPEN);
00910
00911 } else if (link->state() == Link::OPEN &&
00912 reason == ContactEvent::UNBLOCKED) {
00913
00914
00915
00916 } else {
00917 log_err("LINK_STATE_CHANGE_REQUEST *%p: "
00918 "tried to set state AVAILABLE in state %s",
00919 link, Link::state_to_str(link->state()));
00920 return;
00921 }
00922
00923 post_at_head(new LinkAvailableEvent(link, reason));
00924 break;
00925
00926 case Link::BUSY:
00927 log_err("LINK_STATE_CHANGE_REQUEST can't be used for state %s",
00928 Link::state_to_str(new_state));
00929 break;
00930
00931 case Link::OPENING:
00932 case Link::OPEN:
00933
00934 if (link->state() == Link::UNAVAILABLE) {
00935 link->set_state(Link::AVAILABLE);
00936 }
00937 actions_->open_link(link);
00938 break;
00939
00940 case Link::CLOSED:
00941
00942
00943
00944 if (! link->isopen() && ! link->isopening()) {
00945 log_err("LINK_STATE_CHANGE_REQUEST *%p: "
00946 "setting state CLOSED (%s) in unexpected state %s",
00947 link, ContactEvent::reason_to_str(reason),
00948 link->state_to_str(link->state()));
00949 break;
00950 }
00951
00952
00953 if (link->isopen()) {
00954 ASSERT(link->contact() != NULL);
00955 post_at_head(new ContactDownEvent(link->contact(), reason));
00956 }
00957
00958
00959 actions_->close_link(link);
00960
00961
00962
00963 if (reason == ContactEvent::IDLE) {
00964 link->set_state(Link::AVAILABLE);
00965 } else {
00966 link->set_state(Link::UNAVAILABLE);
00967 post_at_head(new LinkUnavailableEvent(link, reason));
00968 }
00969
00970 break;
00971
00972 default:
00973 PANIC("unhandled state %s", Link::state_to_str(new_state));
00974 }
00975 }
00976
00977
00978 void
00979 BundleDaemon::handle_contact_up(ContactUpEvent* event)
00980 {
00981 const ContactRef& contact = event->contact_;
00982 log_info("CONTACT_UP *%p (contact %p)", contact->link(), contact.object());
00983
00984 Link* link = contact->link();
00985 ASSERT(link->contact() == contact);
00986 link->set_state(Link::OPEN);
00987 link->stats_.contacts_++;
00988 }
00989
00990
00991 void
00992 BundleDaemon::handle_contact_down(ContactDownEvent* event)
00993 {
00994 const ContactRef& contact = event->contact_;
00995 Link* link = contact->link();
00996 ContactEvent::reason_t reason = event->reason_;
00997
00998 log_info("CONTACT_DOWN *%p (%s) (contact %p)",
00999 link, ContactEvent::reason_to_str(reason), contact.object());
01000
01001
01002
01003 }
01004
01005
01006 void
01007 BundleDaemon::handle_reassembly_completed(ReassemblyCompletedEvent* event)
01008 {
01009 log_info("REASSEMBLY_COMPLETED bundle id %d",
01010 event->bundle_->bundleid_);
01011
01012
01013 BundleRef ref("BundleDaemon::handle_reassembly_completed temporary");
01014 while ((ref = event->fragments_.pop_front()) != NULL) {
01015 try_delete_from_pending(ref.object());
01016 }
01017
01018
01019 post_at_head(new BundleReceivedEvent(event->bundle_.object(),
01020 EVENTSRC_FRAGMENTATION,
01021 event->bundle_->payload_.length()));
01022 }
01023
01024
01025
01026 void
01027 BundleDaemon::handle_route_add(RouteAddEvent* event)
01028 {
01029 log_info("ROUTE_ADD *%p", event->entry_);
01030 }
01031
01032
01033 void
01034 BundleDaemon::handle_route_del(RouteDelEvent* event)
01035 {
01036 log_info("ROUTE_DEL %s", event->dest_.c_str());
01037 }
01038
01039
01040 void
01041 BundleDaemon::handle_custody_signal(CustodySignalEvent* event)
01042 {
01043 log_info("CUSTODY_SIGNAL: %s %u.%u %s (%s)",
01044 event->data_.orig_source_eid_.c_str(),
01045 event->data_.orig_creation_tv_.seconds_,
01046 event->data_.orig_creation_tv_.seqno_,
01047 event->data_.succeeded_ ? "succeeded" : "failed",
01048 CustodySignal::reason_to_str(event->data_.reason_));
01049
01050 BundleRef orig_bundle =
01051 custody_bundles_->find(event->data_.orig_source_eid_,
01052 event->data_.orig_creation_tv_);
01053
01054 if (orig_bundle == NULL) {
01055 log_warn("received custody signal for bundle %s %u.%u "
01056 "but don't have custody",
01057 event->data_.orig_source_eid_.c_str(),
01058 event->data_.orig_creation_tv_.seconds_,
01059 event->data_.orig_creation_tv_.seqno_);
01060 return;
01061 }
01062
01063
01064
01065 bool release = event->data_.succeeded_;
01066 if ((event->data_.succeeded_ == false) &&
01067 (event->data_.reason_ == BundleProtocol::CUSTODY_REDUNDANT_RECEPTION))
01068 {
01069 log_notice("releasing custody for bundle %s %u.%u "
01070 "due to redundant reception",
01071 event->data_.orig_source_eid_.c_str(),
01072 event->data_.orig_creation_tv_.seconds_,
01073 event->data_.orig_creation_tv_.seqno_);
01074
01075 release = true;
01076 }
01077
01078 if (release) {
01079 release_custody(orig_bundle.object());
01080 try_delete_from_pending(orig_bundle.object());
01081 }
01082 }
01083
01084
01085 void
01086 BundleDaemon::handle_custody_timeout(CustodyTimeoutEvent* event)
01087 {
01088 Bundle* bundle = event->bundle_.object();
01089 Link* link = event->link_;
01090
01091 log_info("CUSTODY_TIMEOUT *%p, *%p", bundle, link);
01092
01093
01094 oasys::ScopeLock l(&bundle->lock_, "BundleDaemon::handle_custody_timeout");
01095
01096 bool found = false;
01097 CustodyTimer* timer;
01098 CustodyTimerVec::iterator iter;
01099 for (iter = bundle->custody_timers_.begin();
01100 iter != bundle->custody_timers_.end();
01101 ++iter)
01102 {
01103 timer = *iter;
01104 if (timer->link_ == link)
01105 {
01106 if (timer->pending()) {
01107 log_err("multiple pending custody timers for link %s",
01108 link->nexthop());
01109 continue;
01110 }
01111
01112 found = true;
01113 bundle->custody_timers_.erase(iter);
01114 break;
01115 }
01116 }
01117
01118 if (!found) {
01119 log_err("custody timeout for *%p *%p: timer not found in bundle list",
01120 bundle, link);
01121 return;
01122 }
01123
01124 ASSERT(!timer->cancelled());
01125
01126 if (!pending_bundles_->contains(bundle)) {
01127 log_err("custody timeout for *%p *%p: bundle not in pending list",
01128 bundle, link);
01129 }
01130
01131
01132
01133
01134
01135 bundle->fwdlog_.add_entry(link, ForwardingInfo::INVALID_ACTION,
01136 ForwardingInfo::CUSTODY_TIMEOUT,
01137 CustodyTimerSpec::defaults_);
01138
01139 delete timer;
01140
01141
01142
01143 }
01144
01145
01146 void
01147 BundleDaemon::handle_shutdown_request(ShutdownRequest* request)
01148 {
01149 (void)request;
01150
01151 log_notice("Received shutdown request");
01152
01153 oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_shutdown");
01154
01155 const LinkSet* links = contactmgr_->links();
01156 LinkSet::const_iterator iter;
01157 Link* link = NULL;
01158
01159
01160 for (iter = links->begin(); iter != links->end(); ++iter)
01161 {
01162 link = *iter;
01163 if (link->isopen()) {
01164 log_debug("Shutdown: closing link *%p\n", link);
01165 link->close();
01166 }
01167 }
01168
01169
01170 if (app_shutdown_proc_) {
01171 (*app_shutdown_proc_)(app_shutdown_data_);
01172 }
01173
01174
01175 set_should_stop();
01176
01177
01178
01179 }
01180
01181
01182 void
01183 BundleDaemon::handle_status_request(StatusRequest* request)
01184 {
01185 (void)request;
01186 log_info("Received status request");
01187 }
01188
01189
01190 bool
01191 BundleDaemon::add_to_pending(Bundle* bundle, bool add_to_store)
01192 {
01193 log_debug("adding bundle *%p to pending list", bundle);
01194
01195 pending_bundles_->push_back(bundle);
01196
01197 if (add_to_store) {
01198 bundle->in_datastore_ = true;
01199 actions_->store_add(bundle);
01200 }
01201
01202
01203 struct timeval expiration_time;
01204 expiration_time.tv_sec =
01205 BundleTimestamp::TIMEVAL_CONVERSION +
01206 bundle->creation_ts_.seconds_ +
01207 bundle->expiration_;
01208
01209 expiration_time.tv_usec = 0;
01210
01211 struct timeval now;
01212 gettimeofday(&now, 0);
01213 long int when = expiration_time.tv_sec - now.tv_sec;
01214
01215 bool ok_to_route = true;
01216
01217 if (when > 0) {
01218 log_debug("scheduling expiration for bundle id %d at %u.%u "
01219 "(in %lu seconds)",
01220 bundle->bundleid_,
01221 (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
01222 when);
01223 } else {
01224 log_warn("scheduling IMMEDIATE expiration for bundle id %d: "
01225 "[expiration %u, creation time %u.%u, offset %u, now %u.%u]",
01226 bundle->bundleid_, bundle->expiration_,
01227 (u_int)bundle->creation_ts_.seconds_,
01228 (u_int)bundle->creation_ts_.seqno_,
01229 BundleTimestamp::TIMEVAL_CONVERSION,
01230 (u_int)now.tv_sec, (u_int)now.tv_usec);
01231
01232 ok_to_route = false;
01233 }
01234
01235 bundle->expiration_timer_ = new ExpirationTimer(bundle);
01236 bundle->expiration_timer_->schedule_at(&expiration_time);
01237
01238 return ok_to_route;
01239 }
01240
01241
01242 void
01243 BundleDaemon::delete_from_pending(Bundle* bundle,
01244 status_report_reason_t reason)
01245 {
01246 log_debug("removing bundle *%p from pending list", bundle);
01247
01248
01249
01250 if (bundle->expiration_timer_) {
01251 log_debug("cancelling expiration timer for bundle id %d",
01252 bundle->bundleid_);
01253
01254 bool cancelled = bundle->expiration_timer_->cancel();
01255 if (!cancelled) {
01256 log_crit("unexpected error cancelling expiration timer "
01257 "for bundle *%p", bundle);
01258 }
01259
01260 bundle->expiration_timer_->bundleref_.release();
01261 bundle->expiration_timer_ = NULL;
01262 }
01263
01264 bool erased = pending_bundles_->erase(bundle);
01265
01266 if (erased) {
01267 if (bundle->deletion_rcpt_ &&
01268 (reason != BundleProtocol::REASON_NO_ADDTL_INFO))
01269 {
01270 generate_status_report(bundle,
01271 BundleProtocol::STATUS_DELETED,
01272 reason);
01273 }
01274 } else {
01275 log_err("unexpected error removing bundle from pending list");
01276 }
01277 }
01278
01279
01280 void
01281 BundleDaemon::try_delete_from_pending(Bundle* bundle)
01282 {
01283
01284
01285
01286
01287
01288
01289
01290
01291
01292
01293
01294
01295
01296
01297
01298
01299
01300
01301
01302 if (! bundle->is_queued_on(pending_bundles_))
01303 {
01304 if (bundle->expiration_timer_ == NULL) {
01305 log_debug("try_delete_from_pending(*%p): bundle already expired",
01306 bundle);
01307 return;
01308 }
01309
01310 log_err("try_delete_from_pending(*%p): bundle not in pending list!",
01311 bundle);
01312 return;
01313 }
01314
01315 if (!params_.early_deletion_) {
01316 log_debug("try_delete_from_pending(*%p): not deleting because "
01317 "early deletion disabled",
01318 bundle);
01319 return;
01320 }
01321
01322 size_t num_mappings = bundle->num_mappings();
01323 if (num_mappings != 1) {
01324 log_debug("try_delete_from_pending(*%p): not deleting because "
01325 "bundle has %zu mappings",
01326 bundle, num_mappings);
01327 return;
01328 }
01329
01330 size_t num_in_flight = bundle->fwdlog_.get_count(ForwardingInfo::IN_FLIGHT);
01331 if (num_in_flight > 0) {
01332 log_debug("try_delete_from_pending(*%p): not deleting because "
01333 "bundle in flight on %zu links",
01334 bundle, num_in_flight);
01335 return;
01336 }
01337
01338 delete_from_pending(bundle, BundleProtocol::REASON_NO_ADDTL_INFO);
01339 }
01340
01341
01342 Bundle*
01343 BundleDaemon::find_duplicate(Bundle* b)
01344 {
01345 oasys::ScopeLock l(pending_bundles_->lock(),
01346 "BundleDaemon::find_duplicate");
01347 BundleList::iterator iter;
01348 for (iter = pending_bundles_->begin();
01349 iter != pending_bundles_->end();
01350 ++iter)
01351 {
01352 Bundle* b2 = *iter;
01353
01354 if ((b->source_.equals(b2->source_)) &&
01355 (b->creation_ts_.seconds_ == b2->creation_ts_.seconds_) &&
01356 (b->creation_ts_.seqno_ == b2->creation_ts_.seqno_) &&
01357 (b->is_fragment_ == b2->is_fragment_) &&
01358 (b->frag_offset_ == b2->frag_offset_) &&
01359 (b->orig_length_ == b2->orig_length_) &&
01360 (b->payload_.length() == b2->payload_.length()))
01361 {
01362 return b2;
01363 }
01364 }
01365
01366 return NULL;
01367 }
01368
01369
01370 void
01371 BundleDaemon::handle_bundle_free(BundleFreeEvent* event)
01372 {
01373 Bundle* bundle = event->bundle_;
01374 event->bundle_ = NULL;
01375 ASSERT(bundle->refcount() == 0);
01376
01377 bundle->lock_.lock("BundleDaemon::handle_bundle_free");
01378
01379 if (bundle->in_datastore_) {
01380 actions_->store_del(bundle);
01381 }
01382
01383 delete bundle;
01384 }
01385
01386
01387 void
01388 BundleDaemon::handle_event(BundleEvent* event)
01389 {
01390 dispatch_event(event);
01391
01392 if (! event->daemon_only_) {
01393
01394 router_->handle_event(event);
01395 contactmgr_->handle_event(event);
01396 }
01397
01398 stats_.events_processed_++;
01399 }
01400
01401
01402 void
01403 BundleDaemon::load_registrations()
01404 {
01405 admin_reg_ = new AdminRegistration();
01406 {
01407 RegistrationAddedEvent e(admin_reg_, EVENTSRC_ADMIN);
01408 handle_event(&e);
01409 }
01410
01411 Registration* reg;
01412 RegistrationStore* reg_store = RegistrationStore::instance();
01413 RegistrationStore::iterator* iter = reg_store->new_iterator();
01414
01415 while (iter->next() == 0) {
01416 reg = reg_store->get(iter->cur_val());
01417 if (reg == NULL) {
01418 log_err("error loading registration %d from data store",
01419 iter->cur_val());
01420 continue;
01421 }
01422
01423 RegistrationAddedEvent e(reg, EVENTSRC_STORE);
01424 handle_event(&e);
01425 }
01426
01427 delete iter;
01428 }
01429
01430
01431 void
01432 BundleDaemon::load_bundles()
01433 {
01434 Bundle* bundle;
01435 BundleStore* bundle_store = BundleStore::instance();
01436 BundleStore::iterator* iter = bundle_store->new_iterator();
01437
01438 log_notice("loading bundles from data store");
01439 for (iter->begin(); iter->more(); iter->next()) {
01440 bundle = bundle_store->get(iter->cur_val());
01441 if (bundle == NULL) {
01442 log_err("error loading bundle %d from data store",
01443 iter->cur_val());
01444 continue;
01445 }
01446
01447 BundleReceivedEvent e(bundle, EVENTSRC_STORE);
01448 handle_event(&e);
01449
01450
01451
01452
01453
01454 }
01455
01456 delete iter;
01457 }
01458
01459
01460 void
01461 BundleDaemon::run()
01462 {
01463 if (! BundleTimestamp::check_local_clock()) {
01464 exit(1);
01465 }
01466
01467 router_ = BundleRouter::create_router(BundleRouter::Config.type_.c_str());
01468 router_->initialize();
01469
01470 load_registrations();
01471 load_bundles();
01472
01473 BundleEvent* event;
01474
01475 oasys::TimerSystem* timersys = oasys::TimerSystem::instance();
01476
01477 struct pollfd pollfds[2];
01478 struct pollfd* event_poll = &pollfds[0];
01479 struct pollfd* timer_poll = &pollfds[1];
01480
01481 event_poll->fd = eventq_->read_fd();
01482 event_poll->events = POLLIN;
01483
01484 timer_poll->fd = timersys->notifier()->read_fd();
01485 timer_poll->events = POLLIN;
01486
01487 while (1) {
01488 if (should_stop()) {
01489 break;
01490 }
01491
01492 int timeout = timersys->run_expired_timers();
01493
01494 if (eventq_->size() > 0) {
01495 bool ok = eventq_->try_pop(&event);
01496 ASSERT(ok);
01497
01498 oasys::Time now;
01499 now.get_time();
01500
01501
01502 handle_event(event);
01503
01504 int elapsed = now.elapsed_ms();
01505 if (elapsed > 2000) {
01506 log_warn("event %s took %d ms to process",
01507 event->type_str(), elapsed);
01508 }
01509
01510
01511 delete event;
01512
01513 continue;
01514 }
01515
01516 pollfds[0].revents = 0;
01517 pollfds[1].revents = 0;
01518
01519 int cc = oasys::IO::poll_multiple(pollfds, 2, timeout);
01520 log_debug("poll returned %d", cc);
01521
01522 if (cc == oasys::IOTIMEOUT) {
01523 log_debug("poll timeout");
01524 continue;
01525
01526 } else if (cc <= 0) {
01527 log_err("unexpected return %d from poll_multiple!", cc);
01528 continue;
01529 }
01530
01531
01532
01533 if (event_poll->revents != 0) {
01534 log_debug("poll returned new event to handle");
01535 }
01536
01537
01538
01539
01540 if (timer_poll->revents != 0) {
01541 log_debug("poll returned new timers to handle");
01542 timersys->notifier()->clear();
01543 }
01544 }
01545 }
01546
01547 }