00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <oasys/util/Time.h>
00019
00020 #include "Bundle.h"
00021 #include "BundleActions.h"
00022 #include "BundleEvent.h"
00023 #include "BundleDaemon.h"
00024 #include "BundleStatusReport.h"
00025 #include "BundleTimestamp.h"
00026 #include "CustodySignal.h"
00027 #include "ExpirationTimer.h"
00028 #include "FragmentManager.h"
00029 #include "contacts/Contact.h"
00030 #include "contacts/ContactManager.h"
00031 #include "reg/AdminRegistration.h"
00032 #include "reg/APIRegistration.h"
00033 #include "reg/PingRegistration.h"
00034 #include "reg/Registration.h"
00035 #include "reg/RegistrationTable.h"
00036 #include "routing/BundleRouter.h"
00037 #include "routing/RouteTable.h"
00038 #include "storage/BundleStore.h"
00039 #include "storage/RegistrationStore.h"
00040
00041 namespace dtn {
00042
00043 template <>
00044 BundleDaemon* oasys::Singleton<BundleDaemon, false>::instance_ = NULL;
00045
00046 bool
00047 BundleDaemon::is_simulator_ = false;
00048
00049 BundleDaemon::Params::Params()
00050 : early_deletion_(true),
00051 accept_custody_(true),
00052 reactive_frag_enabled_(true),
00053 retry_reliable_unacked_(true),
00054 test_permuted_delivery_(false) {}
00055
00056 BundleDaemon::Params BundleDaemon::params_;
00057
00058 bool BundleDaemon::shutting_down_ = false;
00059
00060
00061 BundleDaemon::BundleDaemon()
00062 : BundleEventHandler("BundleDaemon", "/dtn/bundle/daemon"),
00063 Thread("BundleDaemon", CREATE_JOINABLE)
00064 {
00065
00066
00067 local_eid_.assign("dtn://localhost.dtn");
00068
00069 memset(&stats_, 0, sizeof(stats_));
00070
00071 pending_bundles_ = new BundleList("pending_bundles");
00072 custody_bundles_ = new BundleList("custody_bundles");
00073
00074 contactmgr_ = new ContactManager();
00075 fragmentmgr_ = new FragmentManager();
00076 reg_table_ = new RegistrationTable();
00077
00078 router_ = 0;
00079
00080 app_shutdown_proc_ = NULL;
00081 app_shutdown_data_ = NULL;
00082
00083 rtr_shutdown_proc_ = 0;
00084 rtr_shutdown_data_ = 0;
00085 }
00086
00087
00088 BundleDaemon::~BundleDaemon()
00089 {
00090 delete pending_bundles_;
00091 delete custody_bundles_;
00092
00093 delete contactmgr_;
00094 delete fragmentmgr_;
00095 delete reg_table_;
00096 delete router_;
00097
00098 delete actions_;
00099 delete eventq_;
00100 }
00101
00102
00103 void
00104 BundleDaemon::do_init()
00105 {
00106 actions_ = new BundleActions();
00107 eventq_ = new oasys::MsgQueue<BundleEvent*>(logpath_);
00108 eventq_->notify_when_empty();
00109 BundleProtocol::init_default_processors();
00110 }
00111
00112
00113 void
00114 BundleDaemon::post(BundleEvent* event)
00115 {
00116 instance_->post_event(event);
00117 }
00118
00119
00120 void
00121 BundleDaemon::post_at_head(BundleEvent* event)
00122 {
00123 instance_->post_event(event, false);
00124 }
00125
00126
00127 bool
00128 BundleDaemon::post_and_wait(BundleEvent* event,
00129 oasys::Notifier* notifier,
00130 int timeout, bool at_back)
00131 {
00132
00133
00134
00135
00136 ASSERT(! oasys::Thread::start_barrier_enabled());
00137
00138 ASSERT(event->processed_notifier_ == NULL);
00139 event->processed_notifier_ = notifier;
00140 if (at_back) {
00141 post(event);
00142 } else {
00143 post_at_head(event);
00144 }
00145 return notifier->wait(NULL, timeout);
00146 }
00147
00148
00149 void
00150 BundleDaemon::post_event(BundleEvent* event, bool at_back)
00151 {
00152 log_debug("posting event (%p) with type %s (at %s)",
00153 event, event->type_str(), at_back ? "back" : "head");
00154 eventq_->push(event, at_back);
00155 }
00156
00157
00158 void
00159 BundleDaemon::get_routing_state(oasys::StringBuffer* buf)
00160 {
00161 router_->get_routing_state(buf);
00162 contactmgr_->dump(buf);
00163 }
00164
00165
00166 void
00167 BundleDaemon::get_bundle_stats(oasys::StringBuffer* buf)
00168 {
00169 buf->appendf("%zu pending -- "
00170 "%zu custody -- "
00171 "%u received -- "
00172 "%u delivered -- "
00173 "%u generated -- "
00174 "%u transmitted -- "
00175 "%u expired -- "
00176 "%u duplicate",
00177 pending_bundles()->size(),
00178 custody_bundles()->size(),
00179 stats_.bundles_received_,
00180 stats_.bundles_delivered_,
00181 stats_.bundles_generated_,
00182 stats_.bundles_transmitted_,
00183 stats_.bundles_expired_,
00184 stats_.duplicate_bundles_);
00185 }
00186
00187
00188 void
00189 BundleDaemon::get_daemon_stats(oasys::StringBuffer* buf)
00190 {
00191 buf->appendf("%zu pending_events -- "
00192 "%u processed_events",
00193 eventq_->size(),
00194 stats_.events_processed_);
00195 }
00196
00197
00198 void
00199 BundleDaemon::reset_stats()
00200 {
00201 memset(&stats_, 0, sizeof(stats_));
00202
00203 oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::reset_stats");
00204
00205 const LinkSet* links = contactmgr_->links();
00206 LinkSet::const_iterator iter;
00207 for (iter = links->begin(); iter != links->end(); ++iter) {
00208 (*iter)->reset_stats();
00209 }
00210 }
00211
00212
00213 void
00214 BundleDaemon::generate_status_report(Bundle* orig_bundle,
00215 status_report_flag_t flag,
00216 status_report_reason_t reason)
00217 {
00218 log_debug("generating return receipt status report, "
00219 "flag = 0x%x, reason = 0x%x", flag, reason);
00220
00221 Bundle* report = new Bundle();
00222 BundleStatusReport::create_status_report(report, orig_bundle,
00223 local_eid_, flag, reason);
00224
00225 BundleReceivedEvent e(report, EVENTSRC_ADMIN);
00226 handle_event(&e);
00227 }
00228
00229
00230 void
00231 BundleDaemon::generate_custody_signal(Bundle* bundle, bool succeeded,
00232 custody_signal_reason_t reason)
00233 {
00234 if (bundle->local_custody_) {
00235 log_err("send_custody_signal(*%p): already have local custody",
00236 bundle);
00237 return;
00238 }
00239
00240 if (bundle->custodian_.equals(EndpointID::NULL_EID())) {
00241 log_err("send_custody_signal(*%p): current custodian is NULL_EID",
00242 bundle);
00243 return;
00244 }
00245
00246 Bundle* signal = new Bundle();
00247 CustodySignal::create_custody_signal(signal, bundle, local_eid_,
00248 succeeded, reason);
00249
00250 BundleReceivedEvent e(signal, EVENTSRC_ADMIN);
00251 handle_event(&e);
00252 }
00253
00254
00255 void
00256 BundleDaemon::cancel_custody_timers(Bundle* bundle)
00257 {
00258 oasys::ScopeLock l(&bundle->lock_, "BundleDaemon::cancel_custody_timers");
00259
00260 CustodyTimerVec::iterator iter;
00261 for (iter = bundle->custody_timers_.begin();
00262 iter != bundle->custody_timers_.end();
00263 ++iter)
00264 {
00265 bool ok = (*iter)->cancel();
00266 if (!ok) {
00267 log_crit("unexpected error cancelling custody timer for bundle *%p",
00268 bundle);
00269 }
00270
00271
00272
00273 }
00274
00275 bundle->custody_timers_.clear();
00276 }
00277
00278
00279 void
00280 BundleDaemon::accept_custody(Bundle* bundle)
00281 {
00282 log_info("accept_custody *%p", bundle);
00283
00284 if (bundle->local_custody_) {
00285 log_err("accept_custody(*%p): already have local custody",
00286 bundle);
00287 return;
00288 }
00289
00290 if (bundle->custodian_.equals(local_eid_)) {
00291 log_err("send_custody_signal(*%p): "
00292 "current custodian is already local_eid",
00293 bundle);
00294 return;
00295 }
00296
00297
00298
00299 if (! bundle->custodian_.equals(EndpointID::NULL_EID())) {
00300 generate_custody_signal(bundle, true, BundleProtocol::CUSTODY_NO_ADDTL_INFO);
00301 }
00302
00303
00304
00305 bundle->custodian_.assign(local_eid_);
00306 bundle->local_custody_ = true;
00307 actions_->store_update(bundle);
00308
00309 custody_bundles_->push_back(bundle);
00310
00311
00312
00313 if (bundle->custody_rcpt_) {
00314 generate_status_report(bundle, BundleProtocol::STATUS_CUSTODY_ACCEPTED);
00315 }
00316 }
00317
00318
00319 void
00320 BundleDaemon::release_custody(Bundle* bundle)
00321 {
00322 log_info("release_custody *%p", bundle);
00323
00324 if (!bundle->local_custody_) {
00325 log_err("release_custody(*%p): don't have local custody",
00326 bundle);
00327 return;
00328 }
00329
00330 cancel_custody_timers(bundle);
00331
00332 bundle->custodian_.assign(EndpointID::NULL_EID());
00333 bundle->local_custody_ = false;
00334 actions_->store_update(bundle);
00335
00336 custody_bundles_->erase(bundle);
00337 }
00338
00339
00340 void
00341 BundleDaemon::deliver_to_registration(Bundle* bundle,
00342 Registration* registration)
00343 {
00344
00345
00346 bundle->owner_ = "daemon";
00347
00348 if (bundle->is_fragment_) {
00349 log_debug("deferring delivery of bundle *%p to registration %d (%s) "
00350 "since bundle is a fragment",
00351 bundle, registration->regid(),
00352 registration->endpoint().c_str());
00353
00354 fragmentmgr_->process_for_reassembly(bundle);
00355 return;
00356 }
00357
00358 log_debug("delivering bundle *%p to registration %d (%s)",
00359 bundle, registration->regid(),
00360 registration->endpoint().c_str());
00361
00362 registration->deliver_bundle(bundle);
00363 }
00364
00365
00366 void
00367 BundleDaemon::check_registrations(Bundle* bundle)
00368 {
00369 int num;
00370 log_debug("checking for matching registrations for bundle *%p", bundle);
00371
00372 RegistrationList matches;
00373 RegistrationList::iterator iter;
00374
00375 num = reg_table_->get_matching(bundle->dest_, &matches);
00376
00377 for (iter = matches.begin(); iter != matches.end(); ++iter)
00378 {
00379 Registration* registration = *iter;
00380 deliver_to_registration(bundle, registration);
00381 }
00382 }
00383
00384
00385 void
00386 BundleDaemon::handle_bundle_accept(BundleAcceptRequest* request)
00387 {
00388 *request->result_ =
00389 router_->accept_bundle(request->bundle_.object(), request->reason_);
00390
00391 log_info("BUNDLE_ACCEPT_REQUEST: bundle *%p %s (reason %s)",
00392 request->bundle_.object(),
00393 *request->result_ ? "accepted" : "not accepted",
00394 BundleStatusReport::reason_to_str(*request->reason_));
00395 }
00396
00397
00398 void
00399 BundleDaemon::handle_bundle_received(BundleReceivedEvent* event)
00400 {
00401 Bundle* bundle = event->bundleref_.object();
00402
00403
00404 const char* source_str = "";
00405 switch (event->source_) {
00406 case EVENTSRC_PEER:
00407 stats_.bundles_received_++;
00408 break;
00409
00410 case EVENTSRC_APP:
00411 stats_.bundles_received_++;
00412 source_str = " (from app)";
00413 break;
00414
00415 case EVENTSRC_STORE:
00416 source_str = " (from data store)";
00417 break;
00418
00419 case EVENTSRC_ADMIN:
00420 stats_.bundles_generated_++;
00421 source_str = " (generated)";
00422 break;
00423
00424 case EVENTSRC_FRAGMENTATION:
00425 stats_.bundles_generated_++;
00426 source_str = " (from fragmentation)";
00427 break;
00428
00429 default:
00430 NOTREACHED;
00431 }
00432
00433
00434
00435 if (log_enabled(oasys::LOG_DEBUG)) {
00436 oasys::StaticStringBuffer<1024> buf;
00437 buf.appendf("BUNDLE_RECEIVED%s: (%u bytes recvd)\n",
00438 source_str, event->bytes_received_);
00439 bundle->format_verbose(&buf);
00440 log_multiline(oasys::LOG_DEBUG, buf.c_str());
00441 } else {
00442 log_info("BUNDLE_RECEIVED%s *%p (%u bytes recvd)",
00443 source_str, bundle, event->bytes_received_);
00444 }
00445
00446
00447
00448
00449 if (bundle->expiration_ == 0) {
00450 log_warn("bundle id %d arrived with zero expiration time",
00451 bundle->bundleid_);
00452 }
00453
00454 u_int32_t now = BundleTimestamp::get_current_time();
00455 if ((bundle->creation_ts_.seconds_ > now) &&
00456 (bundle->creation_ts_.seconds_ - now > 30000))
00457 {
00458 log_warn("bundle id %d arrived with creation time in the future "
00459 "(%u > %u)",
00460 bundle->bundleid_, bundle->creation_ts_.seconds_, now);
00461 }
00462
00463
00464
00465
00466
00467 if (event->source_ == EVENTSRC_PEER) {
00468 ASSERT(event->bytes_received_ != 0);
00469 size_t payload_offset = BundleProtocol::payload_offset(&bundle->recv_blocks_);
00470 fragmentmgr_->try_to_convert_to_fragment(bundle, payload_offset,
00471 event->bytes_received_);
00472 }
00473
00474
00475 if (event->source_ == EVENTSRC_PEER) {
00476 status_report_reason_t
00477 reception_reason = BundleProtocol::REASON_NO_ADDTL_INFO,
00478 deletion_reason = BundleProtocol::REASON_NO_ADDTL_INFO;
00479
00480 bool accept_bundle = is_simulator_ ? true :
00481 BundleProtocol::validate(bundle,
00482 &reception_reason,
00483 &deletion_reason);
00484
00485
00486
00487
00488 if (bundle->receive_rcpt_ ||
00489 reception_reason != BundleProtocol::REASON_NO_ADDTL_INFO) {
00490 generate_status_report(bundle, BundleProtocol::STATUS_RECEIVED,
00491 reception_reason);
00492 }
00493
00494
00495
00496
00497 if (!accept_bundle) {
00498 delete_bundle(bundle, deletion_reason);
00499 event->daemon_only_ = true;
00500 return;
00501 }
00502 }
00503
00504
00505
00506
00507
00508
00509 Bundle* duplicate = find_duplicate(bundle);
00510 if (duplicate != NULL) {
00511 log_notice("got duplicate bundle: %s -> %s creation %u.%u",
00512 bundle->source_.c_str(),
00513 bundle->dest_.c_str(),
00514 bundle->creation_ts_.seconds_,
00515 bundle->creation_ts_.seqno_);
00516
00517 stats_.duplicate_bundles_++;
00518
00519 if (bundle->custody_requested_ && duplicate->local_custody_)
00520 {
00521 generate_custody_signal(bundle, false,
00522 BundleProtocol::CUSTODY_REDUNDANT_RECEPTION);
00523 }
00524
00525
00526
00527
00528
00529
00530 event->daemon_only_ = true;
00531 return;
00532 }
00533
00534
00535
00536
00537
00538
00539
00540
00541
00542
00543
00544
00545
00546
00547 bool ok_to_route =
00548 add_to_pending(bundle, (event->source_ != EVENTSRC_STORE));
00549
00550 if (!ok_to_route) {
00551 event->daemon_only_ = true;
00552 return;
00553 }
00554
00555
00556
00557
00558
00559
00560
00561 if (bundle->custody_requested_ && params_.accept_custody_)
00562 {
00563 if (event->source_ != EVENTSRC_STORE) {
00564 accept_custody(bundle);
00565
00566 } else if (bundle->local_custody_) {
00567 custody_bundles_->push_back(bundle);
00568 }
00569 }
00570
00571
00572
00573
00574 check_registrations(bundle);
00575
00576
00577
00578
00579
00580 }
00581
00582
00583 void
00584 BundleDaemon::handle_bundle_transmitted(BundleTransmittedEvent* event)
00585 {
00586 Bundle* bundle = event->bundleref_.object();
00587
00588 Link* link = event->link_;
00589
00590
00591
00592
00593
00594
00595
00596 BlockInfoVec* blocks = bundle->xmit_blocks_.find_blocks(link);
00597 ASSERT(blocks != NULL);
00598
00599 size_t total_len = BundleProtocol::total_length(blocks);
00600
00601 stats_.bundles_transmitted_++;
00602 link->stats()->bundles_transmitted_++;
00603 link->stats()->bundles_queued_--;
00604
00605 link->stats()->bytes_transmitted_ += event->bytes_sent_;
00606 link->stats()->bytes_queued_ -= total_len;
00607
00608 log_info("BUNDLE_TRANSMITTED id:%d (%u bytes_sent/%u reliable) -> %s (%s)",
00609 bundle->bundleid_,
00610 event->bytes_sent_,
00611 event->reliably_sent_,
00612 link->name(),
00613 link->nexthop());
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630 if (params_.retry_reliable_unacked_ &&
00631 link->is_reliable() &&
00632 (event->bytes_sent_ != event->reliably_sent_) &&
00633 (event->reliably_sent_ == 0))
00634 {
00635 bundle->fwdlog_.update(link, ForwardingInfo::TRANSMIT_FAILED);
00636 bundle->xmit_blocks_.delete_blocks(link);
00637 return;
00638 }
00639
00640
00641
00642
00643 bundle->fwdlog_.update(link, ForwardingInfo::TRANSMITTED);
00644
00645
00646
00647
00648
00649 ForwardingInfo fwdinfo;
00650 bool ok = bundle->fwdlog_.get_latest_entry(link, &fwdinfo);
00651 ASSERTF(ok, "no forwarding log entry for transmission");
00652 ASSERT(fwdinfo.state_ == ForwardingInfo::TRANSMITTED);
00653
00654
00655
00656
00657
00658
00659
00660 if (link->reliable_) {
00661 fragmentmgr_->try_to_reactively_fragment(bundle,
00662 BundleProtocol::payload_offset(blocks),
00663 event->reliably_sent_);
00664 } else {
00665 fragmentmgr_->try_to_reactively_fragment(bundle,
00666 BundleProtocol::payload_offset(blocks),
00667 event->bytes_sent_);
00668 }
00669
00670
00671
00672
00673
00674 bundle->xmit_blocks_.delete_blocks(link);
00675 blocks = NULL;
00676
00677
00678
00679
00680 if (bundle->forward_rcpt_) {
00681 generate_status_report(bundle, BundleProtocol::STATUS_FORWARDED);
00682 }
00683
00684
00685
00686
00687 if (bundle->local_custody_) {
00688 bundle->custody_timers_.push_back(
00689 new CustodyTimer(fwdinfo.timestamp_,
00690 fwdinfo.custody_timer_,
00691 bundle, link));
00692
00693
00694
00695
00696
00697
00698
00699 }
00700
00701
00702
00703
00704
00705
00706 try_delete_from_pending(bundle);
00707 }
00708
00709
00710 void
00711 BundleDaemon::handle_bundle_transmit_failed(BundleTransmitFailedEvent* event)
00712 {
00713
00714
00715
00716 Bundle* bundle = event->bundleref_.object();
00717
00718 Link* link = event->link_;
00719 bundle->xmit_blocks_.delete_blocks(link);
00720
00721 log_info("BUNDLE_TRANSMIT_FAILED id:%d -> %s (%s)",
00722 bundle->bundleid_,
00723 event->contact_->link()->name(),
00724 event->contact_->link()->nexthop());
00725
00726
00727
00728
00729
00730 bundle->fwdlog_.update(event->contact_->link(),
00731 ForwardingInfo::TRANSMIT_FAILED);
00732
00733
00734
00735
00736 }
00737
00738
00739 void
00740 BundleDaemon::handle_bundle_delivered(BundleDeliveredEvent* event)
00741 {
00742
00743 stats_.bundles_delivered_++;
00744
00745
00746
00747
00748 Bundle* bundle = event->bundleref_.object();
00749
00750 log_info("BUNDLE_DELIVERED id:%d (%zu bytes) -> regid %d (%s)",
00751 bundle->bundleid_, bundle->payload_.length(),
00752 event->registration_->regid(),
00753 event->registration_->endpoint().c_str());
00754
00755
00756
00757
00758 if (bundle->delivery_rcpt_)
00759 {
00760 generate_status_report(bundle, BundleProtocol::STATUS_DELIVERED);
00761 }
00762
00763
00764
00765
00766
00767
00768
00769
00770 if (bundle->custody_requested_)
00771 {
00772 if (bundle->local_custody_) {
00773 release_custody(bundle);
00774
00775 } else if (bundle->custodian_.equals(EndpointID::NULL_EID())) {
00776 log_info("custodial bundle *%p delivered before custody accepted",
00777 bundle);
00778
00779 } else {
00780 generate_custody_signal(bundle, true,
00781 BundleProtocol::CUSTODY_NO_ADDTL_INFO);
00782 }
00783 }
00784
00785
00786
00787
00788
00789
00790 try_delete_from_pending(bundle);
00791 }
00792
00793
00794 void
00795 BundleDaemon::handle_bundle_expired(BundleExpiredEvent* event)
00796 {
00797
00798 stats_.bundles_expired_++;
00799
00800 Bundle* bundle = event->bundleref_.object();
00801
00802 log_info("BUNDLE_EXPIRED *%p", bundle);
00803
00804
00805
00806
00807
00808
00809 delete_bundle(bundle, BundleProtocol::REASON_LIFETIME_EXPIRED);
00810
00811
00812 }
00813
00814
00815 void
00816 BundleDaemon::handle_bundle_send(BundleSendRequest* event)
00817 {
00818 Link *link = contactmgr_->find_link(event->link_.c_str());
00819
00820 if(!link) return;
00821
00822 BundleRef br = pending_bundles_->find(event->bundleid_);
00823
00824 if(!br.object()) {
00825 br = custody_bundles_->find(event->bundleid_);
00826
00827 if(!br.object()) {
00828 return;
00829 }
00830 }
00831
00832 ForwardingInfo::action_t fwd_action =
00833 ForwardingInfo::action_t(event->action_);
00834
00835 if(fwd_action == ForwardingInfo::INVALID_ACTION) {
00836 return;
00837 }
00838
00839 bool result = actions_->send_bundle(br.object(), link,
00840 fwd_action, CustodyTimerSpec::defaults_);
00841
00842 if(result == false) {
00843
00844
00845 }
00846 }
00847
00848
00849 void
00850 BundleDaemon::handle_bundle_cancel(BundleCancelRequest* event)
00851 {
00852 Link *link = contactmgr_->find_link(event->link_.c_str());
00853
00854 if(!link) {
00855 return;
00856 }
00857
00858 BundleRef br = pending_bundles_->find(event->bundleid_);
00859
00860 if(!br.object()) {
00861 br = custody_bundles_->find(event->bundleid_);
00862
00863 if(!br.object()) {
00864 return;
00865 }
00866 }
00867
00868 bool result = actions_->cancel_bundle(br.object(), link);
00869
00870 if(result == false) {
00871
00872
00873 }
00874 }
00875
00876
00877 void
00878 BundleDaemon::handle_bundle_inject(BundleInjectRequest* event)
00879 {
00880
00881
00882
00883 Link *link = contactmgr_->find_link(event->link_.c_str());
00884
00885 if(!link) {
00886 return;
00887 }
00888
00889
00890 Bundle *bundle=new Bundle();
00891 if(bundle->source_.assign(event->src_) &&
00892 bundle->dest_.assign(event->dest_)) {
00893
00894 if(! bundle->replyto_.assign(event->replyto_))
00895 bundle->replyto_.assign(EndpointID::NULL_EID());
00896
00897 if(! bundle->custodian_.assign(event->custodian_))
00898 bundle->custodian_.assign(EndpointID::NULL_EID());
00899
00900
00901 bundle->priority_ = event->priority_;
00902
00903
00904
00905 if(event->expiration_ == 0)
00906 bundle->expiration_ = 300;
00907 else
00908 bundle->expiration_ = event->expiration_;
00909
00910
00911 const u_char *payload = (const u_char*)event->payload_.c_str();
00912 bundle->payload_.set_data(payload,sizeof(payload));
00913
00914
00915 bool success = false;
00916 success = actions_->send_bundle(bundle, link,
00917 ForwardingInfo::action_t(event->action_),
00918 CustodyTimerSpec::defaults_);
00919 if(!success)
00920 delete bundle;
00921
00922 } else
00923 delete bundle;
00924 }
00925
00926
00927 void
00928 BundleDaemon::handle_bundle_query(BundleQueryRequest*)
00929 {
00930 BundleDaemon::post_at_head(new BundleReportEvent());
00931 }
00932
00933
00934 void
00935 BundleDaemon::handle_bundle_report(BundleReportEvent*)
00936 {
00937 }
00938
00939
00940 void
00941 BundleDaemon::handle_registration_added(RegistrationAddedEvent* event)
00942 {
00943 Registration* registration = event->registration_;
00944 log_info("REGISTRATION_ADDED %d %s",
00945 registration->regid(), registration->endpoint().c_str());
00946
00947 if (!reg_table_->add(registration,
00948 (event->source_ == EVENTSRC_APP) ? true : false))
00949 {
00950 log_err("error adding registration %d to table",
00951 registration->regid());
00952 }
00953
00954 oasys::ScopeLock l(pending_bundles_->lock(),
00955 "BundleDaemon::handle_registration_added");
00956 BundleList::iterator iter;
00957 for (iter = pending_bundles_->begin();
00958 iter != pending_bundles_->end();
00959 ++iter)
00960 {
00961 Bundle* bundle = *iter;
00962
00963 if (registration->endpoint().match(bundle->dest_))
00964 {
00965 deliver_to_registration(bundle, registration);
00966 }
00967 }
00968 }
00969
00970
00971 void
00972 BundleDaemon::handle_registration_removed(RegistrationRemovedEvent* event)
00973 {
00974 Registration* registration = event->registration_;
00975 log_info("REGISTRATION_REMOVED %d %s",
00976 registration->regid(), registration->endpoint().c_str());
00977
00978 if (!reg_table_->del(registration->regid())) {
00979 log_err("error removing registration %d from table",
00980 registration->regid());
00981 return;
00982 }
00983
00984 delete registration;
00985 }
00986
00987
00988 void
00989 BundleDaemon::handle_registration_expired(RegistrationExpiredEvent* event)
00990 {
00991 Registration* registration = reg_table_->get(event->regid_);
00992
00993 if (registration == NULL) {
00994 log_err("REGISTRATION_EXPIRED -- dead regid %d", event->regid_);
00995 return;
00996 }
00997
00998 registration->set_expired(true);
00999
01000 if (registration->active()) {
01001
01002
01003
01004 log_info("REGISTRATION_EXPIRED %d -- deferred until binding clears",
01005 event->regid_);
01006 } else {
01007
01008 log_info("REGISTRATION_EXPIRED %d", event->regid_);
01009 reg_table_->del(registration->regid());
01010 delete registration;
01011 }
01012 }
01013
01014
01015 void
01016 BundleDaemon::handle_link_available(LinkAvailableEvent* event)
01017 {
01018 Link* link = event->link_;
01019 ASSERT(link->isavailable());
01020
01021 log_info("LINK_AVAILABLE *%p", link);
01022 }
01023
01024
01025 void
01026 BundleDaemon::handle_link_unavailable(LinkUnavailableEvent* event)
01027 {
01028 Link* link = event->link_;
01029 ASSERT(!link->isavailable());
01030
01031 log_info("LINK UNAVAILABLE *%p", link);
01032 }
01033
01034
01035 void
01036 BundleDaemon::handle_link_state_change_request(LinkStateChangeRequest* request)
01037 {
01038 Link* link = request->link_;
01039 if (! link) {
01040 log_warn("LINK_STATE_CHANGE_REQUEST received invalid link");
01041 return;
01042 }
01043
01044 Link::state_t new_state = Link::state_t(request->state_);
01045 Link::state_t old_state = Link::state_t(request->old_state_);
01046 int reason = request->reason_;
01047
01048 if (link->contact() != request->contact_) {
01049 log_warn("stale LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for link *%p: "
01050 "contact %p != current contact %p",
01051 Link::state_to_str(old_state), Link::state_to_str(new_state),
01052 ContactEvent::reason_to_str(reason), link,
01053 request->contact_.object(), link->contact().object());
01054 return;
01055 }
01056
01057 log_info("LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for link *%p",
01058 Link::state_to_str(old_state), Link::state_to_str(new_state),
01059 ContactEvent::reason_to_str(reason), link);
01060
01061
01062 oasys::ScopeLock l;
01063 if (new_state == Link::OPEN)
01064 {
01065 l.set_lock(contactmgr_->lock(), "BundleDaemon::handle_link_state_change_request");
01066 }
01067
01068 switch(new_state) {
01069 case Link::UNAVAILABLE:
01070 if (link->state() != Link::AVAILABLE) {
01071 log_err("LINK_STATE_CHANGE_REQUEST *%p: "
01072 "tried to set state UNAVAILABLE in state %s",
01073 link, Link::state_to_str(link->state()));
01074 return;
01075 }
01076 link->set_state(new_state);
01077 post_at_head(new LinkUnavailableEvent(link,
01078 ContactEvent::reason_t(reason)));
01079 break;
01080
01081 case Link::AVAILABLE:
01082 if (link->state() == Link::UNAVAILABLE) {
01083 link->set_state(Link::AVAILABLE);
01084
01085 } else if (link->state() == Link::BUSY &&
01086 reason == ContactEvent::UNBLOCKED) {
01087 ASSERT(link->contact() != NULL);
01088 link->set_state(Link::OPEN);
01089
01090 } else if (link->state() == Link::OPEN &&
01091 reason == ContactEvent::UNBLOCKED) {
01092
01093
01094
01095 } else {
01096 log_err("LINK_STATE_CHANGE_REQUEST *%p: "
01097 "tried to set state AVAILABLE in state %s",
01098 link, Link::state_to_str(link->state()));
01099 return;
01100 }
01101
01102 post_at_head(new LinkAvailableEvent(link,
01103 ContactEvent::reason_t(reason)));
01104 break;
01105
01106 case Link::BUSY:
01107 log_err("LINK_STATE_CHANGE_REQUEST can't be used for state %s",
01108 Link::state_to_str(new_state));
01109 break;
01110
01111 case Link::OPENING:
01112 case Link::OPEN:
01113
01114 if (link->state() == Link::UNAVAILABLE) {
01115 link->set_state(Link::AVAILABLE);
01116 }
01117 actions_->open_link(link);
01118 break;
01119
01120 case Link::CLOSED:
01121
01122
01123
01124 if (! link->isopen() && ! link->isopening()) {
01125 log_err("LINK_STATE_CHANGE_REQUEST *%p: "
01126 "setting state CLOSED (%s) in unexpected state %s",
01127 link, ContactEvent::reason_to_str(reason),
01128 link->state_to_str(link->state()));
01129 break;
01130 }
01131
01132
01133 if (link->isopen()) {
01134 ASSERT(link->contact() != NULL);
01135 post_at_head(new ContactDownEvent(link->contact(),
01136 ContactEvent::reason_t(reason)));
01137 }
01138
01139
01140 actions_->close_link(link);
01141
01142
01143
01144 if (reason == ContactEvent::IDLE) {
01145 link->set_state(Link::AVAILABLE);
01146 } else {
01147 link->set_state(Link::UNAVAILABLE);
01148 post_at_head(new LinkUnavailableEvent(link,
01149 ContactEvent::reason_t(reason)));
01150 }
01151
01152 break;
01153
01154 default:
01155 PANIC("unhandled state %s", Link::state_to_str(new_state));
01156 }
01157 }
01158
01159
01160 void
01161 BundleDaemon::handle_link_create(LinkCreateRequest*)
01162 {
01163 NOTIMPLEMENTED;
01164 }
01165
01166
01167 void
01168 BundleDaemon::handle_link_query(LinkQueryRequest*)
01169 {
01170 BundleDaemon::post_at_head(new LinkReportEvent());
01171 }
01172
01173
01174 void
01175 BundleDaemon::handle_link_report(LinkReportEvent*)
01176 {
01177 }
01178
01179
01180 void
01181 BundleDaemon::handle_contact_up(ContactUpEvent* event)
01182 {
01183 const ContactRef& contact = event->contact_;
01184 Link* link = contact->link();
01185
01186
01187 oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_contact_up");
01188 if(link->contact() != contact)
01189 {
01190 log_info("CONTACT_UP *%p (contact %p) being ignored (old contact)", contact->link(), contact.object());
01191 return;
01192 }
01193
01194 log_info("CONTACT_UP *%p (contact %p)", contact->link(), contact.object());
01195 link->set_state(Link::OPEN);
01196 link->stats_.contacts_++;
01197 }
01198
01199
01200
01201
01202 void
01203 BundleDaemon::handle_contact_down(ContactDownEvent* event)
01204 {
01205 const ContactRef& contact = event->contact_;
01206 Link* link = contact->link();
01207 int reason = event->reason_;
01208
01209 log_info("CONTACT_DOWN *%p (%s) (contact %p)",
01210 link, ContactEvent::reason_to_str(reason), contact.object());
01211
01212
01213
01214 }
01215
01216
01217 void
01218 BundleDaemon::handle_contact_query(ContactQueryRequest*)
01219 {
01220 BundleDaemon::post_at_head(new ContactReportEvent());
01221 }
01222
01223
01224 void
01225 BundleDaemon::handle_contact_report(ContactReportEvent*)
01226 {
01227 }
01228
01229
01230 void
01231 BundleDaemon::handle_reassembly_completed(ReassemblyCompletedEvent* event)
01232 {
01233 log_info("REASSEMBLY_COMPLETED bundle id %d",
01234 event->bundle_->bundleid_);
01235
01236
01237 BundleRef ref("BundleDaemon::handle_reassembly_completed temporary");
01238 while ((ref = event->fragments_.pop_front()) != NULL) {
01239 try_delete_from_pending(ref.object());
01240 }
01241
01242
01243 post_at_head(new BundleReceivedEvent(event->bundle_.object(),
01244 EVENTSRC_FRAGMENTATION));
01245 }
01246
01247
01248
01249 void
01250 BundleDaemon::handle_route_add(RouteAddEvent* event)
01251 {
01252 log_info("ROUTE_ADD *%p", event->entry_);
01253 }
01254
01255
01256 void
01257 BundleDaemon::handle_route_del(RouteDelEvent* event)
01258 {
01259 log_info("ROUTE_DEL %s", event->dest_.c_str());
01260 }
01261
01262
01263 void
01264 BundleDaemon::handle_route_query(RouteQueryRequest*)
01265 {
01266 BundleDaemon::post_at_head(new RouteReportEvent());
01267 }
01268
01269
01270 void
01271 BundleDaemon::handle_route_report(RouteReportEvent*)
01272 {
01273 }
01274
01275
01276 void
01277 BundleDaemon::handle_custody_signal(CustodySignalEvent* event)
01278 {
01279 log_info("CUSTODY_SIGNAL: %s %u.%u %s (%s)",
01280 event->data_.orig_source_eid_.c_str(),
01281 event->data_.orig_creation_tv_.seconds_,
01282 event->data_.orig_creation_tv_.seqno_,
01283 event->data_.succeeded_ ? "succeeded" : "failed",
01284 CustodySignal::reason_to_str(event->data_.reason_));
01285
01286 BundleRef orig_bundle =
01287 custody_bundles_->find(event->data_.orig_source_eid_,
01288 event->data_.orig_creation_tv_);
01289
01290 if (orig_bundle == NULL) {
01291 log_warn("received custody signal for bundle %s %u.%u "
01292 "but don't have custody",
01293 event->data_.orig_source_eid_.c_str(),
01294 event->data_.orig_creation_tv_.seconds_,
01295 event->data_.orig_creation_tv_.seqno_);
01296 return;
01297 }
01298
01299
01300
01301 bool release = event->data_.succeeded_;
01302 if ((event->data_.succeeded_ == false) &&
01303 (event->data_.reason_ == BundleProtocol::CUSTODY_REDUNDANT_RECEPTION))
01304 {
01305 log_notice("releasing custody for bundle %s %u.%u "
01306 "due to redundant reception",
01307 event->data_.orig_source_eid_.c_str(),
01308 event->data_.orig_creation_tv_.seconds_,
01309 event->data_.orig_creation_tv_.seqno_);
01310
01311 release = true;
01312 }
01313
01314 if (release) {
01315 release_custody(orig_bundle.object());
01316 try_delete_from_pending(orig_bundle.object());
01317 }
01318 }
01319
01320
01321 void
01322 BundleDaemon::handle_custody_timeout(CustodyTimeoutEvent* event)
01323 {
01324 Bundle* bundle = event->bundle_.object();
01325 Link* link = event->link_;
01326
01327 log_info("CUSTODY_TIMEOUT *%p, *%p", bundle, link);
01328
01329
01330 oasys::ScopeLock l(&bundle->lock_, "BundleDaemon::handle_custody_timeout");
01331
01332 bool found = false;
01333 CustodyTimer* timer = NULL;
01334 CustodyTimerVec::iterator iter;
01335 for (iter = bundle->custody_timers_.begin();
01336 iter != bundle->custody_timers_.end();
01337 ++iter)
01338 {
01339 timer = *iter;
01340 if (timer->link_ == link)
01341 {
01342 if (timer->pending()) {
01343 log_err("multiple pending custody timers for link %s",
01344 link->nexthop());
01345 continue;
01346 }
01347
01348 found = true;
01349 bundle->custody_timers_.erase(iter);
01350 break;
01351 }
01352 }
01353
01354 if (!found) {
01355 log_err("custody timeout for *%p *%p: timer not found in bundle list",
01356 bundle, link);
01357 return;
01358 }
01359
01360 ASSERT(!timer->cancelled());
01361
01362 if (!pending_bundles_->contains(bundle)) {
01363 log_err("custody timeout for *%p *%p: bundle not in pending list",
01364 bundle, link);
01365 }
01366
01367
01368
01369
01370
01371 bool ok = bundle->fwdlog_.update(link, ForwardingInfo::CUSTODY_TIMEOUT);
01372 if (!ok) {
01373 log_err("custody timeout can't find ForwardingLog entry for link *%p",
01374 link);
01375 }
01376
01377 delete timer;
01378
01379
01380
01381 }
01382
01383
01384 void
01385 BundleDaemon::handle_shutdown_request(ShutdownRequest* request)
01386 {
01387 shutting_down_ = true;
01388
01389 (void)request;
01390
01391 log_notice("Received shutdown request");
01392
01393 oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_shutdown");
01394
01395 const LinkSet* links = contactmgr_->links();
01396 LinkSet::const_iterator iter;
01397 Link* link = NULL;
01398
01399
01400 for (iter = links->begin(); iter != links->end(); ++iter)
01401 {
01402 link = *iter;
01403 if (link->isopen()) {
01404 log_debug("Shutdown: closing link *%p\n", link);
01405 link->close();
01406 }
01407 }
01408
01409
01410 if (rtr_shutdown_proc_) {
01411 (*rtr_shutdown_proc_)(rtr_shutdown_data_);
01412 }
01413
01414
01415 if (app_shutdown_proc_) {
01416 (*app_shutdown_proc_)(app_shutdown_data_);
01417 }
01418
01419
01420 set_should_stop();
01421
01422
01423
01424 }
01425
01426
01427 void
01428 BundleDaemon::handle_status_request(StatusRequest* request)
01429 {
01430 (void)request;
01431 log_info("Received status request");
01432 }
01433
01434
01435 bool
01436 BundleDaemon::add_to_pending(Bundle* bundle, bool add_to_store)
01437 {
01438 log_debug("adding bundle *%p to pending list", bundle);
01439
01440 pending_bundles_->push_back(bundle);
01441
01442 if (add_to_store) {
01443 bundle->in_datastore_ = true;
01444 actions_->store_add(bundle);
01445 }
01446
01447 struct timeval now;
01448 gettimeofday(&now, 0);
01449
01450
01451 struct timeval expiration_time;
01452 expiration_time.tv_sec =
01453 BundleTimestamp::TIMEVAL_CONVERSION +
01454 bundle->creation_ts_.seconds_ +
01455 bundle->expiration_;
01456
01457 expiration_time.tv_usec = now.tv_usec;
01458
01459 long int when = expiration_time.tv_sec - now.tv_sec;
01460
01461 bool ok_to_route = true;
01462
01463 if (when > 0) {
01464 log_debug("scheduling expiration for bundle id %d at %u.%u "
01465 "(in %lu seconds)",
01466 bundle->bundleid_,
01467 (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
01468 when);
01469 } else {
01470 log_warn("scheduling IMMEDIATE expiration for bundle id %d: "
01471 "[expiration %u, creation time %u.%u, offset %u, now %u.%u]",
01472 bundle->bundleid_, bundle->expiration_,
01473 (u_int)bundle->creation_ts_.seconds_,
01474 (u_int)bundle->creation_ts_.seqno_,
01475 BundleTimestamp::TIMEVAL_CONVERSION,
01476 (u_int)now.tv_sec, (u_int)now.tv_usec);
01477 expiration_time = now;
01478 ok_to_route = false;
01479 }
01480
01481 bundle->expiration_timer_ = new ExpirationTimer(bundle);
01482 bundle->expiration_timer_->schedule_at(&expiration_time);
01483
01484 return ok_to_route;
01485 }
01486
01487
01488 bool
01489 BundleDaemon::delete_from_pending(Bundle* bundle)
01490 {
01491 log_debug("removing bundle *%p from pending list", bundle);
01492
01493
01494
01495 if (bundle->expiration_timer_) {
01496 log_debug("cancelling expiration timer for bundle id %d",
01497 bundle->bundleid_);
01498
01499 bool cancelled = bundle->expiration_timer_->cancel();
01500 if (!cancelled) {
01501 log_crit("unexpected error cancelling expiration timer "
01502 "for bundle *%p", bundle);
01503 }
01504
01505 bundle->expiration_timer_->bundleref_.release();
01506 bundle->expiration_timer_ = NULL;
01507 }
01508
01509 bool erased = pending_bundles_->erase(bundle);
01510
01511 if (!erased) {
01512 log_err("unexpected error removing bundle from pending list");
01513 }
01514
01515 return erased;
01516 }
01517
01518
01519 bool
01520 BundleDaemon::try_delete_from_pending(Bundle* bundle)
01521 {
01522
01523
01524
01525
01526
01527
01528
01529
01530
01531
01532
01533
01534
01535
01536
01537
01538
01539
01540
01541 if (! bundle->is_queued_on(pending_bundles_))
01542 {
01543 if (bundle->expiration_timer_ == NULL) {
01544 log_debug("try_delete_from_pending(*%p): bundle already expired",
01545 bundle);
01546 return false;
01547 }
01548
01549 log_err("try_delete_from_pending(*%p): bundle not in pending list!",
01550 bundle);
01551 return false;
01552 }
01553
01554 if (!params_.early_deletion_) {
01555 log_debug("try_delete_from_pending(*%p): not deleting because "
01556 "early deletion disabled",
01557 bundle);
01558 return false;
01559 }
01560
01561 size_t num_mappings = bundle->num_mappings();
01562 if (num_mappings != 1) {
01563 log_debug("try_delete_from_pending(*%p): not deleting because "
01564 "bundle has %zu mappings",
01565 bundle, num_mappings);
01566 return false;
01567 }
01568
01569 size_t num_in_flight = bundle->fwdlog_.get_count(ForwardingInfo::IN_FLIGHT);
01570 if (num_in_flight > 0) {
01571 log_debug("try_delete_from_pending(*%p): not deleting because "
01572 "bundle in flight on %zu links",
01573 bundle, num_in_flight);
01574 return false;
01575 }
01576
01577 return delete_from_pending(bundle);
01578 }
01579
01580
01581 bool
01582 BundleDaemon::delete_bundle(Bundle* bundle, status_report_reason_t reason)
01583 {
01584
01585
01586
01587 bool send_status = (bundle->local_custody_ ||
01588 (bundle->deletion_rcpt_ &&
01589 reason != BundleProtocol::REASON_NO_ADDTL_INFO));
01590
01591
01592 if (bundle->local_custody_) {
01593 release_custody(bundle);
01594 }
01595
01596
01597 if (bundle->is_fragment_) {
01598 fragmentmgr_->delete_fragment(bundle);
01599 }
01600
01601
01602 bool erased = true;
01603 if (bundle->is_queued_on(pending_bundles_)) {
01604 erased = delete_from_pending(bundle);
01605 }
01606
01607 if (erased && send_status) {
01608 generate_status_report(bundle, BundleProtocol::STATUS_DELETED, reason);
01609 }
01610
01611
01612
01613
01614 return erased;
01615 }
01616
01617
01618 Bundle*
01619 BundleDaemon::find_duplicate(Bundle* b)
01620 {
01621 oasys::ScopeLock l(pending_bundles_->lock(),
01622 "BundleDaemon::find_duplicate");
01623 BundleList::iterator iter;
01624 for (iter = pending_bundles_->begin();
01625 iter != pending_bundles_->end();
01626 ++iter)
01627 {
01628 Bundle* b2 = *iter;
01629
01630 if ((b->source_.equals(b2->source_)) &&
01631 (b->creation_ts_.seconds_ == b2->creation_ts_.seconds_) &&
01632 (b->creation_ts_.seqno_ == b2->creation_ts_.seqno_) &&
01633 (b->is_fragment_ == b2->is_fragment_) &&
01634 (b->frag_offset_ == b2->frag_offset_) &&
01635 (b->orig_length_ == b2->orig_length_) &&
01636 (b->payload_.length() == b2->payload_.length()))
01637 {
01638 return b2;
01639 }
01640 }
01641
01642 return NULL;
01643 }
01644
01645
01646 void
01647 BundleDaemon::handle_bundle_free(BundleFreeEvent* event)
01648 {
01649 Bundle* bundle = event->bundle_;
01650 event->bundle_ = NULL;
01651 ASSERT(bundle->refcount() == 0);
01652
01653 bundle->lock_.lock("BundleDaemon::handle_bundle_free");
01654
01655 if (bundle->in_datastore_) {
01656 actions_->store_del(bundle);
01657 }
01658
01659 delete bundle;
01660 }
01661
01662
01663 void
01664 BundleDaemon::handle_event(BundleEvent* event)
01665 {
01666 dispatch_event(event);
01667
01668 if (! event->daemon_only_) {
01669
01670 router_->handle_event(event);
01671 contactmgr_->handle_event(event);
01672 }
01673
01674 stats_.events_processed_++;
01675
01676 if (event->processed_notifier_) {
01677 event->processed_notifier_->notify();
01678 }
01679 }
01680
01681
01682 void
01683 BundleDaemon::load_registrations()
01684 {
01685 admin_reg_ = new AdminRegistration();
01686 {
01687 RegistrationAddedEvent e(admin_reg_, EVENTSRC_ADMIN);
01688 handle_event(&e);
01689 }
01690
01691 EndpointID ping_eid(local_eid());
01692 bool ok = ping_eid.append_service_tag("ping");
01693 if (!ok) {
01694 log_crit("local eid (%s) scheme must be able to append service tags",
01695 local_eid().c_str());
01696 exit(1);
01697 }
01698
01699 ping_reg_ = new PingRegistration(ping_eid);
01700 {
01701 RegistrationAddedEvent e(ping_reg_, EVENTSRC_ADMIN);
01702 handle_event(&e);
01703 }
01704
01705 Registration* reg;
01706 RegistrationStore* reg_store = RegistrationStore::instance();
01707 RegistrationStore::iterator* iter = reg_store->new_iterator();
01708
01709 while (iter->next() == 0) {
01710 reg = reg_store->get(iter->cur_val());
01711 if (reg == NULL) {
01712 log_err("error loading registration %d from data store",
01713 iter->cur_val());
01714 continue;
01715 }
01716
01717 RegistrationAddedEvent e(reg, EVENTSRC_STORE);
01718 handle_event(&e);
01719 }
01720
01721 delete iter;
01722 }
01723
01724
01725 void
01726 BundleDaemon::load_bundles()
01727 {
01728 Bundle* bundle;
01729 BundleStore* bundle_store = BundleStore::instance();
01730 BundleStore::iterator* iter = bundle_store->new_iterator();
01731
01732 log_notice("loading bundles from data store");
01733
01734 u_int64_t total_size = 0;
01735
01736 for (iter->begin(); iter->more(); iter->next()) {
01737 bundle = bundle_store->get(iter->cur_val());
01738
01739 if (bundle == NULL) {
01740 log_err("error loading bundle %d from data store",
01741 iter->cur_val());
01742 continue;
01743 }
01744
01745 total_size += bundle->durable_size();
01746
01747 BundleReceivedEvent e(bundle, EVENTSRC_STORE);
01748 handle_event(&e);
01749
01750
01751
01752
01753
01754 }
01755
01756 bundle_store->set_total_size(total_size);
01757
01758 delete iter;
01759 }
01760
01761
01762 void
01763 BundleDaemon::run()
01764 {
01765 if (! BundleTimestamp::check_local_clock()) {
01766 exit(1);
01767 }
01768
01769 router_ = BundleRouter::create_router(BundleRouter::config_.type_.c_str());
01770 router_->initialize();
01771
01772 load_registrations();
01773 load_bundles();
01774
01775 BundleEvent* event;
01776
01777 oasys::TimerSystem* timersys = oasys::TimerSystem::instance();
01778
01779 struct pollfd pollfds[2];
01780 struct pollfd* event_poll = &pollfds[0];
01781 struct pollfd* timer_poll = &pollfds[1];
01782
01783 event_poll->fd = eventq_->read_fd();
01784 event_poll->events = POLLIN;
01785
01786 timer_poll->fd = timersys->notifier()->read_fd();
01787 timer_poll->events = POLLIN;
01788
01789 while (1) {
01790 if (should_stop()) {
01791 break;
01792 }
01793
01794 int timeout = timersys->run_expired_timers();
01795
01796 if (eventq_->size() > 0) {
01797 bool ok = eventq_->try_pop(&event);
01798 ASSERT(ok);
01799
01800 oasys::Time now;
01801 now.get_time();
01802
01803
01804 handle_event(event);
01805
01806 int elapsed = now.elapsed_ms();
01807 if (elapsed > 2000) {
01808 log_warn("event %s took %d ms to process",
01809 event->type_str(), elapsed);
01810 }
01811
01812
01813 delete event;
01814
01815 continue;
01816 }
01817
01818 pollfds[0].revents = 0;
01819 pollfds[1].revents = 0;
01820
01821 int cc = oasys::IO::poll_multiple(pollfds, 2, timeout);
01822 log_debug("poll returned %d", cc);
01823
01824 if (cc == oasys::IOTIMEOUT) {
01825 log_debug("poll timeout");
01826 continue;
01827
01828 } else if (cc <= 0) {
01829 log_err("unexpected return %d from poll_multiple!", cc);
01830 continue;
01831 }
01832
01833
01834
01835 if (event_poll->revents != 0) {
01836 log_debug("poll returned new event to handle");
01837 }
01838
01839
01840
01841
01842 if (timer_poll->revents != 0) {
01843 log_debug("poll returned new timers to handle");
01844 timersys->notifier()->clear();
01845 }
01846 }
01847 }
01848
01849 }