00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifndef _DTN_PROPHET_LISTS_
00018 #define _DTN_PROPHET_LISTS_
00019
00020 #include "routing/Prophet.h"
00021 #include "routing/ProphetNode.h"
00022 #include <oasys/debug/Log.h>
00023 #include <oasys/util/Time.h>
00024 #include <oasys/thread/SpinLock.h>
00025 #include "naming/EndpointID.h"
00026 #include "bundling/BundleEvent.h"
00027 #include "bundling/BundleActions.h"
00028 #include <oasys/util/BoundedPriorityQueue.h>
00029 #include <oasys/util/URL.h>
00030
00031 #include <vector>
00032 #include <map>
00033 #include <set>
00034
00035 #define FOUR_BYTE_ALIGN(x) (((x) % 4) != 0) ? ((x) + (4 - ((x) % 4))) : (x)
00036
00041 namespace dtn {
00042
00047 struct ProphetParams : public ProphetNodeParams
00048 {
00049 ProphetParams()
00050 : ProphetNodeParams(),
00051 fs_(Prophet::GRTR),
00052 qp_(Prophet::FIFO),
00053 hello_interval_(Prophet::HELLO_INTERVAL),
00054 hello_dead_(Prophet::HELLO_DEAD),
00055 max_forward_(Prophet::DEFAULT_NUM_F_MAX),
00056 min_forward_(Prophet::DEFAULT_NUM_F_MIN),
00057 max_usage_(0xffffffff),
00058 age_period_(Prophet::AGE_PERIOD),
00059 relay_node_(false),
00060 custody_node_(false),
00061 internet_gw_(false),
00062 epsilon_(0.0039) {}
00063
00064 Prophet::fwd_strategy_t fs_;
00065 Prophet::q_policy_t qp_;
00066
00067 u_int8_t hello_interval_;
00068 u_int hello_dead_;
00069
00070 u_int max_forward_;
00071 u_int min_forward_;
00072 u_int max_usage_;
00073
00074 u_int age_period_;
00075
00076 bool relay_node_;
00077 bool custody_node_;
00078 bool internet_gw_;
00079 double epsilon_;
00080 };
00081
00087 template <class T>
00088 class PointerList : public std::vector<T*>
00089 {
00090 public:
00091 typedef std::vector<T*> List;
00092 typedef typename std::vector<T*>::iterator iterator;
00093 typedef typename std::vector<T*>::const_iterator const_iterator;
00094
00098 PointerList()
00099 : std::vector<T*>() {}
00100
00104 PointerList(const PointerList& a)
00105 : std::vector<T*>()
00106 {
00107 clear();
00108 copy_from(a);
00109 }
00110
00114 virtual ~PointerList()
00115 {
00116 clear();
00117 }
00118
00122 PointerList& operator= (const PointerList& a)
00123 {
00124 clear();
00125 copy_from(a);
00126 return *this;
00127 }
00128
00132 void erase(iterator i)
00133 {
00134 delete (*i);
00135 List::erase(i);
00136 }
00137
00142 void clear()
00143 {
00144 free();
00145 List::clear();
00146 }
00147 protected:
00148
00152 void free()
00153 {
00154 for(iterator i = List::begin();
00155 i != List::end();
00156 i++)
00157 {
00158 delete *i;
00159 }
00160 }
00161
00165 void copy_from(const PointerList& a)
00166 {
00167 for(const_iterator i = a.begin();
00168 i != a.end();
00169 i++)
00170 {
00171 push_back(new T(**i));
00172 }
00173 }
00174 };
00175
00176 typedef PointerList<ProphetNode> ProphetNodeList;
00177
00181 struct less_eid_
00182 {
00183 bool operator() (const EndpointID& a, const EndpointID& b) const
00184 {
00185 return a.str() < b.str();
00186 }
00187 };
00188
00192 class ProphetTable
00193 {
00194 public:
00195 typedef std::map<EndpointID,ProphetNode*,less_eid_> rib_table;
00196 typedef std::map<EndpointID,ProphetNode*,less_eid_>::iterator iterator;
00197
00201 ProphetTable();
00202
00206 ~ProphetTable();
00207
00212 ProphetNode* find(const EndpointID& eid) const;
00213
00218 void update(ProphetNode* node);
00219
00223
00224 double p_value(const Bundle* b) const
00225 {
00226 return p_value(Prophet::eid_to_routeid(b->dest_));
00227 }
00228 double p_value(const EndpointID& eid) const;
00230
00234 size_t dump_table(ProphetNodeList& list) const;
00235
00239 size_t size() const { return table_.size(); }
00240
00245 void clear() { free(); table_.clear(); }
00246
00250 iterator begin();
00251
00255 iterator end();
00256
00260 void truncate(double epsilon);
00261
00266 oasys::SpinLock* lock() { return lock_; }
00267 protected:
00268
00272 void free();
00273
00274 oasys::SpinLock* lock_;
00275 rib_table table_;
00276 };
00277
00281 class ProphetTableAgeTimer : public oasys::Timer,
00282 public oasys::Logger
00283 {
00284 public:
00285 ProphetTableAgeTimer(ProphetTable* table, u_int period, double epsilon) :
00286 oasys::Logger("ProphetTableAgeTimer","/dtn/router/prophet/timer"),
00287 table_(table), period_(period), epsilon_(epsilon)
00288 { reschedule(); }
00289 void timeout(const struct timeval& now);
00290 protected:
00291 void reschedule();
00292 ProphetTable* table_;
00293 u_int period_;
00294 double epsilon_;
00295 };
00296
00301 class ProphetDictionary
00302 {
00303 public:
00304 typedef std::map<u_int16_t,EndpointID> rribd;
00305 typedef rribd::const_iterator const_iterator;
00306
00307 ProphetDictionary(const EndpointID& sender = EndpointID::NULL_EID(),
00308 const EndpointID& receiver = EndpointID::NULL_EID());
00309 ProphetDictionary(const ProphetDictionary& pd);
00310 ~ProphetDictionary() {;}
00311
00315 u_int16_t find(const EndpointID& eid) const;
00316
00320 EndpointID find(u_int16_t id) const;
00321
00325 EndpointID sender() const { return find(0); }
00326
00330 EndpointID receiver() const { return find(1); }
00331
00335 bool is_assigned(const EndpointID& eid) const;
00336
00341 u_int16_t insert(const EndpointID& eid);
00342
00347 bool assign(const EndpointID& eid, u_int16_t sid);
00348
00352 const_iterator begin() const { return rribd_.begin(); }
00353
00357 const_iterator end() const { return rribd_.end(); }
00358
00362 size_t size() const { return ribd_.size(); }
00363
00367 size_t guess_ribd_size() const { return guess_; }
00368
00372 void clear();
00373
00377 void dump(oasys::StringBuffer *buf);
00378
00379 ProphetDictionary& operator= (const ProphetDictionary& d)
00380 {
00381 ribd_ = d.ribd_;
00382 rribd_ = d.rribd_;
00383 guess_ = d.guess_;
00384 return *this;
00385 }
00386 protected:
00387 typedef std::map<EndpointID,u_int16_t,less_eid_> ribd;
00388
00389 void update_guess(size_t len) {
00390 guess_ += FOUR_BYTE_ALIGN(len + Prophet::RoutingAddressStringSize);
00391 }
00392
00393 ribd ribd_;
00394 rribd rribd_;
00395 size_t guess_;
00396 };
00397
00398 struct BundleOfferComp : public std::less<BundleOffer*>
00399 {
00400 BundleOfferComp(ProphetDictionary* ribd, ProphetTable* local)
00401 : ribd_(ribd), nodes_(local) {}
00402
00403 bool operator()(const BundleOffer* a, const BundleOffer* b) const
00404 {
00405 const EndpointID ea = ribd_->find(a->sid());
00406 const EndpointID eb = ribd_->find(b->sid());
00407 return nodes_->p_value(ea) > nodes_->p_value(eb);
00408 }
00409
00410 ProphetDictionary* ribd_;
00411 ProphetTable* nodes_;
00412 };
00413
00414 struct BundleOfferSIDComp : public BundleOfferComp
00415 {
00416 BundleOfferSIDComp(ProphetDictionary* ribd,
00417 ProphetTable* local,
00418 u_int16_t sid)
00419 : BundleOfferComp(ribd,local), sid_(sid) {}
00420
00421 bool operator()(const BundleOffer* a,const BundleOffer* b) const
00422 {
00423
00424 if (a->sid() != b->sid())
00425 {
00426 if (a->sid() == sid_) return true;
00427 if (b->sid() == sid_) return true;
00428 }
00429
00430 return BundleOfferComp::operator()(a,b);
00431 }
00432 u_int16_t sid_;
00433 };
00434
00439 class BundleOfferList {
00440 public:
00441 typedef PointerList<BundleOffer> List;
00442 typedef PointerList<BundleOffer>::iterator iterator;
00443 typedef PointerList<BundleOffer>::const_iterator const_iterator;
00444
00445 BundleOfferList(BundleOffer::bundle_offer_t type =
00446 BundleOffer::UNDEFINED)
00447 : type_(type), lock_(new oasys::SpinLock())
00448 {
00449 }
00450
00451 BundleOfferList(const BundleOfferList& list)
00452 : list_(list.list_),
00453 type_(list.type_),
00454 lock_(new oasys::SpinLock())
00455 {
00456 }
00457
00458 ~BundleOfferList()
00459 {
00460 list_.clear();
00461 delete lock_;
00462 }
00463
00468 void sort(ProphetDictionary* ribd, ProphetTable* nodes, u_int16_t sid);
00469
00473 size_t size() const;
00474
00478 bool empty() const;
00479
00483 void clear();
00484
00488 void add_offer(BundleOffer* entry);
00489
00494 bool remove_bundle(u_int32_t cts, u_int16_t sid);
00495
00499 void add_offer(u_int32_t cts, u_int16_t sid,
00500 bool custody=false, bool accept=false, bool ack=false);
00501
00505 void add_offer(Bundle* b,u_int16_t sid);
00506
00510 BundleOffer* find(u_int32_t cts, u_int16_t sid) const;
00511
00512 BundleOffer::bundle_offer_t type() { return type_; }
00513
00517 iterator begin();
00518 iterator end();
00519 const_iterator begin() const;
00520 const_iterator end() const;
00521
00525 BundleOffer* front() const
00526 {
00527 oasys::ScopeLock l(lock_,"front");
00528 return list_.front();
00529 }
00530
00534 size_t guess_size()
00535 {
00536 return Prophet::BundleOfferEntrySize * list_.size();
00537 }
00538
00539 BundleOfferList& operator= (const BundleOfferList& a)
00540 {
00541 oasys::ScopeLock al(a.lock_,"operator=");
00542 oasys::ScopeLock l(lock_,"operator=");
00543 list_ = a.list_;
00544 type_ = a.type_;
00545 return *this;
00546 }
00547
00548 oasys::SpinLock* lock() { return lock_; }
00549
00550 void set_type(BundleOffer::bundle_offer_t type) {type_ = type;}
00551 BundleOffer::bundle_offer_t type() const { return type_; }
00552
00553 void dump(oasys::StringBuffer *buf);
00554
00555 protected:
00559 void push_back(BundleOffer* bo);
00560
00561 List list_;
00562 BundleOffer::bundle_offer_t type_;
00563 oasys::SpinLock* lock_;
00564 };
00565
00571 class ProphetAckList {
00572 public:
00573 struct less_ack_ {
00574 bool operator() (const ProphetAck* a, const ProphetAck* b) const {
00575 return *a < *b;
00576 }
00577 };
00578 typedef std::set<ProphetAck*,less_ack_> palist;
00579
00580 ProphetAckList();
00581 ProphetAckList(const ProphetAckList& a)
00582 : acks_(a.acks_) {;}
00583 ~ProphetAckList();
00584
00589 bool insert(const EndpointID& eid, u_int32_t cts, u_int32_t ets = 0);
00590
00594 bool insert(Bundle *b)
00595 {
00596 return insert(Prophet::eid_to_routeid(b->dest_),
00597 b->creation_ts_.seconds_,
00598 b->expiration_);
00599 }
00600
00605 bool insert(ProphetAck* ack);
00606
00610 size_t count(const EndpointID& eid) const;
00611
00615 size_t fetch(const EndpointIDPattern& eid,
00616 PointerList<ProphetAck>& list) const;
00617
00621 bool is_ackd(const EndpointID& eid, u_int32_t cts) const;
00622 bool is_ackd(Bundle* b) const
00623 {
00624 return is_ackd(
00625 Prophet::eid_to_routeid(b->dest_),
00626 b->creation_ts_.seconds_);
00627 }
00628
00635 size_t expire(u_int32_t older_than = 0);
00636
00640 size_t size() const { return acks_.size(); }
00641 protected:
00642 palist acks_;
00643 oasys::SpinLock* lock_;
00644 };
00645
00649 class ProphetAckAgeTimer :
00650 public oasys::Timer,
00651 public oasys::Logger
00652 {
00653 public:
00654 ProphetAckAgeTimer(ProphetAckList* list, u_int period) :
00655 oasys::Logger("ProphetAckAgeTimer","/dtn/router/prophet/timer"),
00656 list_(list), period_(period)
00657 { reschedule(); }
00658 void timeout(const struct timeval& now);
00659 protected:
00660 void reschedule();
00661 ProphetAckList* list_;
00662 u_int period_;
00663 };
00664
00665 struct ProphetStatsEntry {
00666 double p_max_;
00667 double mopr_;
00668 double lmopr_;
00669 };
00670
00671 class ProphetStats {
00672 public:
00673 ProphetStats() : dropped_(0), lock_(new oasys::SpinLock())
00674 {
00675 pstats_.clear();
00676 }
00677
00678 ~ProphetStats();
00679
00680 void update_stats(const Bundle* b, double p);
00681 double get_p_max(const Bundle* b);
00682 double get_mopr(const Bundle* b);
00683 double get_lmopr(const Bundle* b);
00684
00685 void drop_bundle(const Bundle* b);
00686 u_int dropped() { return dropped_; }
00687
00688 protected:
00689 typedef std::map<u_int32_t,ProphetStatsEntry*> pstats;
00690 typedef std::map<u_int32_t,ProphetStatsEntry*>::iterator iterator;
00691 typedef std::map<u_int32_t,ProphetStatsEntry*>::const_iterator
00692 const_iterator;
00693
00694 ProphetStatsEntry* find_entry(const Bundle*);
00695
00696 u_int dropped_;
00697 pstats pstats_;
00698 oasys::SpinLock* lock_;
00699 };
00700
00704 class FwdStrategy : public std::less<Bundle*>
00705 {
00706 public:
00707 virtual ~FwdStrategy() {}
00708 FwdStrategy(const FwdStrategy& fs)
00709 : fs_(fs.fs_)
00710 {}
00711 bool operator() (const Bundle*, const Bundle*) const
00712 {
00713 return false;
00714 }
00715 inline static FwdStrategy* strategy( Prophet::fwd_strategy_t fs,
00716 ProphetTable* local = NULL,
00717 ProphetTable* remote = NULL );
00718 protected:
00719 FwdStrategy(Prophet::fwd_strategy_t fs = Prophet::INVALID_FS)
00720 : fs_(fs)
00721 {}
00722 Prophet::fwd_strategy_t fs_;
00723 };
00724
00728 class FwdStrategyCompGRTRSORT : public FwdStrategy
00729 {
00730 public:
00731 FwdStrategyCompGRTRSORT (const FwdStrategyCompGRTRSORT& fsc)
00732 : FwdStrategy(fsc), local_(fsc.local_), remote_(fsc.remote_)
00733 {}
00734 bool operator() (const Bundle* a, const Bundle* b) const
00735 {
00736 return ((remote_->p_value(a) - local_->p_value(a)) <
00737 (remote_->p_value(b) - local_->p_value(b)));
00738 }
00739
00740 protected:
00741 friend class FwdStrategy;
00742 FwdStrategyCompGRTRSORT(ProphetTable* local,ProphetTable* remote)
00743 : local_(local), remote_(remote)
00744 {
00745 ASSERT(local != NULL);
00746 ASSERT(remote != NULL);
00747 }
00748
00749 ProphetTable* local_;
00750 ProphetTable* remote_;
00751 };
00752
00756 class FwdStrategyCompGRTRMAX : public FwdStrategy
00757 {
00758 public:
00759 FwdStrategyCompGRTRMAX(const FwdStrategyCompGRTRMAX& f)
00760 : FwdStrategy(f), remote_(f.remote_) {}
00761 bool operator() (const Bundle* a, const Bundle* b) const
00762 {
00763 return (remote_->p_value(a) < remote_->p_value(b));
00764 }
00765
00766 protected:
00767 friend class FwdStrategy;
00768 FwdStrategyCompGRTRMAX(ProphetTable* remote)
00769 : remote_(remote)
00770 {
00771 ASSERT(remote != NULL);
00772 }
00773
00774 ProphetTable* remote_;
00775 };
00776
00780 FwdStrategy*
00781 FwdStrategy::strategy( Prophet::fwd_strategy_t fs,
00782 ProphetTable* local,
00783 ProphetTable* remote )
00784 {
00785 FwdStrategy *f = NULL;
00786 switch (fs)
00787 {
00788 case Prophet::GRTR:
00789 case Prophet::GTMX:
00790 case Prophet::GRTR_PLUS:
00791 case Prophet::GTMX_PLUS:
00792 f = new FwdStrategy();
00793 break;
00794 case Prophet::GRTR_SORT:
00795
00796 f = new FwdStrategyCompGRTRSORT(local,remote);
00797 break;
00798 case Prophet::GRTR_MAX:
00799
00800 f = new FwdStrategyCompGRTRMAX(remote);
00801 break;
00802 default:
00803 PANIC("Invalid forwarding strategy: %d",(int)fs);
00804 break;
00805 }
00806 return f;
00807 };
00808
00812 class ProphetDecider : public oasys::Logger
00813 {
00814 public:
00815 inline static ProphetDecider* decider(
00816 Prophet::fwd_strategy_t fs,
00817 ProphetTable* local = NULL,
00818 ProphetTable* remote = NULL,
00819 Link* nexthop = NULL,
00820 u_int max_forward = 0,
00821 ProphetStats* stats = NULL
00822 );
00823 virtual ~ProphetDecider() {}
00824 virtual bool operator() (const Bundle*) const = 0;
00825 inline bool should_fwd(const Bundle* bundle) const;
00826 protected:
00827 ProphetDecider(Link* nexthop)
00828 : oasys::Logger("ProphetDecider","/dtn/route/decider"),
00829 next_hop_(nexthop),
00830 route_(Prophet::eid_to_route(nexthop->remote_eid()))
00831 {}
00832
00833 Link* next_hop_;
00834 EndpointIDPattern route_;
00835 };
00836
00841 class FwdDeciderGRTR : public ProphetDecider
00842 {
00843 public:
00844 bool operator() (const Bundle* b) const
00845 {
00846 if (!ProphetDecider::should_fwd(b))
00847 return false;
00848 if (route_.match(b->dest_))
00849 return true;
00850 if (local_->p_value(b) < remote_->p_value(b))
00851 {
00852 log_debug("remote p %0.2f local p %0.2f: ok to fwd *%p",
00853 remote_->p_value(b),local_->p_value(b),b);
00854 return true;
00855 }
00856 log_debug("remote p %0.2f local p %0.2f: do not fwd *%p",
00857 remote_->p_value(b),local_->p_value(b),b);
00858 return false;
00859 }
00860
00861 virtual ~FwdDeciderGRTR() {}
00862 protected:
00863 friend class ProphetDecider;
00864 FwdDeciderGRTR(ProphetTable* local, ProphetTable* remote, Link* nexthop)
00865 : ProphetDecider(nexthop), local_(local), remote_(remote)
00866 {
00867 ASSERT(local != NULL);
00868 ASSERT(remote != NULL);
00869 }
00870
00871 ProphetTable* local_;
00872 ProphetTable* remote_;
00873 };
00874
00875 class FwdDeciderGTMX : public FwdDeciderGRTR
00876 {
00877 public:
00878 bool operator() (const Bundle* b) const
00879 {
00880 if ( ! FwdDeciderGRTR::operator()(b) )
00881 return false;
00882 if (route_.match(b->dest_))
00883 return true;
00884 size_t num_fwd =
00885 b->fwdlog_.get_transmission_count(ForwardingInfo::COPY_ACTION);
00886 if (num_fwd < max_fwd_)
00887 {
00888 log_debug("NF %zu, max NF %d: ok to fwd *%p",num_fwd,max_fwd_,b);
00889 return true;
00890 }
00891 log_debug("NF %zu, max NF %d: do not fwd *%p",num_fwd,max_fwd_,b);
00892 return false;
00893 }
00894
00895 virtual ~FwdDeciderGTMX() {}
00896 protected:
00897 friend class ProphetDecider;
00898 FwdDeciderGTMX(ProphetTable* local, ProphetTable* remote,
00899 Link* nexthop, u_int max_forward)
00900 : FwdDeciderGRTR(local,remote,nexthop),
00901 max_fwd_(max_forward)
00902 {}
00903
00904 u_int max_fwd_;
00905 };
00906
00907 class FwdDeciderGRTRPLUS : public FwdDeciderGRTR
00908 {
00909 public:
00910 bool operator() (const Bundle* b) const
00911 {
00912 if ( ! FwdDeciderGRTR::operator()(b) )
00913 return false;
00914 if (route_.match(b->dest_))
00915 return true;
00916 bool ok = stats_->get_p_max(b) < remote_->p_value(b);
00917 log_debug("max P %0.2f, remote P %0.2f, %s fwd *%p",
00918 stats_->get_p_max(b),remote_->p_value(b),
00919 ok ? "ok to" : "do not", b);
00920 return ok;
00921 }
00922
00923 virtual ~FwdDeciderGRTRPLUS() {}
00924 protected:
00925 friend class ProphetDecider;
00926 FwdDeciderGRTRPLUS(ProphetTable* local, ProphetTable* remote,
00927 Link* nexthop, ProphetStats* stats)
00928 : FwdDeciderGRTR(local,remote,nexthop),
00929 stats_(stats)
00930 {
00931 ASSERT(stats != NULL);
00932 }
00933
00934 ProphetStats* stats_;
00935 };
00936
00937 class FwdDeciderGTMXPLUS : public FwdDeciderGRTRPLUS
00938 {
00939 public:
00940 bool operator() (const Bundle* b) const
00941 {
00942 if ( ! FwdDeciderGRTRPLUS::operator()(b) )
00943 return false;
00944 if (route_.match(b->dest_))
00945 return true;
00946 size_t num_fwd =
00947 b->fwdlog_.get_transmission_count(ForwardingInfo::COPY_ACTION);
00948 bool ok = ((stats_->get_p_max(b) < remote_->p_value(b)) &&
00949 (num_fwd < max_fwd_));
00950 log_debug("NF %zu, Max NF %d, max P %0.2f, remote P %0.2f, %s fwd *%p",
00951 num_fwd,max_fwd_,stats_->get_p_max(b),remote_->p_value(b),
00952 ok ? "ok to" : "do not", b);
00953 return ok;
00954 }
00955
00956 virtual ~FwdDeciderGTMXPLUS() {}
00957 protected:
00958 friend class ProphetDecider;
00959 FwdDeciderGTMXPLUS(ProphetTable* local, ProphetTable* remote,
00960 Link* nexthop, ProphetStats* stats, u_int max_forward)
00961 : FwdDeciderGRTRPLUS(local,remote,nexthop,stats), max_fwd_(max_forward)
00962 {}
00963
00964 u_int max_fwd_;
00965 };
00966
00970 ProphetDecider*
00971 ProphetDecider::decider( Prophet::fwd_strategy_t fs, ProphetTable* local,
00972 ProphetTable* remote, Link* nexthop,
00973 u_int max_forward, ProphetStats* stats )
00974 {
00975 ProphetDecider* pd = NULL;
00976 switch(fs) {
00977 case Prophet::GRTR:
00978 case Prophet::GRTR_SORT:
00979 case Prophet::GRTR_MAX:
00980 pd = (ProphetDecider*) new FwdDeciderGRTR(local,remote,nexthop);
00981 break;
00982 case Prophet::GTMX:
00983 pd = (ProphetDecider*) new
00984 FwdDeciderGTMX(local,remote,nexthop,max_forward);
00985 break;
00986 case Prophet::GRTR_PLUS:
00987 pd = (ProphetDecider*) new
00988 FwdDeciderGRTRPLUS(local,remote,nexthop,stats);
00989 break;
00990 case Prophet::GTMX_PLUS:
00991 pd = (ProphetDecider*) new
00992 FwdDeciderGTMXPLUS(local,remote,nexthop,stats,max_forward);
00993 break;
00994 default:
00995 PANIC("Invalid forwarding strategy: %d",(int)fs);
00996 break;
00997 }
00998 return pd;
00999 }
01000
01001
01002 bool
01003 ProphetDecider::should_fwd(const Bundle* bundle) const
01004 {
01005 ForwardingInfo info;
01006 bool found = bundle->fwdlog_.get_latest_entry(next_hop_, &info);
01007
01008 if (found)
01009 {
01010 ASSERT(info.state_ != ForwardingInfo::NONE);
01011 }
01012 else
01013 {
01014 ASSERT(info.state_ == ForwardingInfo::NONE);
01015 }
01016
01017 if (info.state_ == ForwardingInfo::TRANSMITTED ||
01018 info.state_ == ForwardingInfo::IN_FLIGHT)
01019 {
01020 log_debug("should_fwd bundle %d: "
01021 "skip %s due to forwarding log entry %s",
01022 bundle->bundleid_, next_hop_->name(),
01023 ForwardingInfo::state_to_str(
01024 (ForwardingInfo::state_t)info.state_));
01025 return false;
01026 }
01027
01028 if (info.state_ == ForwardingInfo::TRANSMIT_FAILED) {
01029 log_debug("should_fwd bundle %d: "
01030 "match %s: forwarding log entry %s TRANSMIT_FAILED %d",
01031 bundle->bundleid_, next_hop_->name(),
01032 ForwardingInfo::state_to_str((ForwardingInfo::state_t)
01033 info.state_),
01034 bundle->bundleid_);
01035
01036 } else {
01037 log_debug("should_fwd bundle %d: "
01038 "match %s: forwarding log entry %s",
01039 bundle->bundleid_, next_hop_->name(),
01040 ForwardingInfo::state_to_str((ForwardingInfo::state_t)
01041 info.state_));
01042 }
01043
01044 return true;
01045 }
01046
01051 class ProphetBundleOffer : public std::priority_queue<Bundle*>,
01052 public oasys::Logger
01053 {
01054 protected:
01055 typedef std::priority_queue<Bundle*> BundleQueue;
01056 public:
01057 ProphetBundleOffer(const BundleList& bundles,
01058 FwdStrategy* comp,
01059 ProphetDecider* decider)
01060 : BundleQueue(*comp),
01061 oasys::Logger("ProphetBundleOffer","/dtn/route/offer"),
01062 list_("ProphetBundleOffer"),comp_(comp),decide_(decider)
01063 {
01064 oasys::ScopeLock l(bundles.lock(),"ProphetBundleOffer constructor");
01065 for(BundleList::const_iterator i =
01066 (BundleList::const_iterator) bundles.begin();
01067 i != bundles.end();
01068 i++ )
01069 {
01070 push(*i);
01071 }
01072 }
01073 virtual ~ProphetBundleOffer() {}
01074 void push(Bundle* b)
01075 {
01076
01077 if(decide_->operator()(b))
01078 {
01079 BundleQueue::push(b);
01080 list_.push_back(b);
01081 log_debug("offering *%p",b);
01082 }
01083 else
01084 {
01085 log_debug("not offering *%p",b);
01086 }
01087 }
01088 void pop()
01089 {
01090 list_.erase(top());
01091 BundleQueue::pop();
01092 }
01093
01094 protected:
01095 BundleList list_;
01096 FwdStrategy* comp_;
01097 ProphetDecider* decide_;
01098 };
01099
01103 struct BundleSz {
01104 u_int operator() (const Bundle* b) const
01105 {
01106 return b->payload_.length();
01107 }
01108 };
01109
01110 struct QueueComp: public std::less<Bundle*>
01111 {
01112 inline
01113 static QueueComp* queuecomp(Prophet::q_policy_t qp,
01114 ProphetStats* ps,
01115 ProphetTable* pt);
01116 };
01117
01121 struct QueueCompFIFO : public QueueComp
01122 {
01123 bool operator() (const Bundle* a, const Bundle* b) const
01124 {
01125 return (a->bundleid_ < b->bundleid_);
01126 }
01127 };
01128
01132 struct QueueCompMOFO : public QueueComp
01133 {
01134 #define NUM_FWD(b) \
01135 (b)->fwdlog_.get_transmission_count(ForwardingInfo::COPY_ACTION)
01136 bool operator() (const Bundle* a, const Bundle* b) const
01137 {
01138 return (NUM_FWD(a) < NUM_FWD(b));
01139 }
01140 };
01141
01145 class QueueCompMOPR : public QueueComp
01146 {
01147 public:
01148 QueueCompMOPR(ProphetStats* pstats)
01149 : pstats_(pstats)
01150 {
01151 ASSERT(pstats_ != NULL);
01152 }
01153 #define MOPR(b) pstats_->get_mopr(b)
01154 bool operator() (const Bundle* a, const Bundle* b) const
01155 {
01156 return MOPR(a) < MOPR(b);
01157 }
01158 protected:
01159 ProphetStats* pstats_;
01160 };
01161
01165 class QueueCompLMOPR : public QueueComp
01166 {
01167 public:
01168 QueueCompLMOPR(ProphetStats* pstats)
01169 : pstats_(pstats)
01170 {
01171 ASSERT(pstats_ != NULL);
01172 }
01173 #define LMOPR(b) pstats_->get_lmopr(b)
01174 bool operator() (const Bundle* a, const Bundle* b) const
01175 {
01176 return (LMOPR(a) < LMOPR(b));
01177 }
01178 protected:
01179 ProphetStats* pstats_;
01180 };
01181
01185 struct QueueCompSHLI : public QueueComp
01186 {
01187 #define SHLI(b) (b)->expiration_
01188 bool operator() (const Bundle* a, const Bundle* b) const
01189 {
01190 return (SHLI(a) < SHLI(b));
01191 }
01192 };
01193
01194 struct QueueCompLEPR : public QueueComp
01195 {
01196 QueueCompLEPR(ProphetTable* nodes)
01197 : nodes_(nodes)
01198 {
01199 ASSERT(nodes!=NULL);
01200 }
01201 #define LEPR(b) nodes_->p_value(b)
01202 bool operator() (const Bundle* a, const Bundle* b) const
01203 {
01204 return (LEPR(a) < LEPR(b));
01205 }
01206 protected:
01207 ProphetTable* nodes_;
01208 };
01209
01210 QueueComp*
01211 QueueComp::queuecomp(Prophet::q_policy_t qp,
01212 ProphetStats* ps,
01213 ProphetTable* pt)
01214 {
01215 (void) ps;
01216 (void) pt;
01217 switch(qp) {
01218 case Prophet::FIFO:
01219 return new QueueCompFIFO();
01220 case Prophet::MOFO:
01221 return new QueueCompMOFO();
01222 case Prophet::MOPR:
01223 return new QueueCompMOPR(ps);
01224 case Prophet::LINEAR_MOPR:
01225 return new QueueCompLMOPR(ps);
01226 case Prophet::SHLI:
01227 return new QueueCompSHLI();
01228 case Prophet::LEPR:
01229 return new QueueCompLEPR(pt);
01230 case Prophet::INVALID_QP:
01231 default:
01232 PANIC("Unrecognized queue policy %d",qp);
01233 }
01234 }
01235
01236 struct ProphetBundleList
01237 {
01238 static Bundle* find(const BundleList& list,
01239 const EndpointID& dest,
01240 u_int32_t creation_ts);
01241 };
01242
01252 class ProphetBundleQueue :
01253 public oasys::BoundedPriorityQueue<Bundle*,BundleSz,QueueComp>
01254 {
01255 public:
01256 typedef oasys::BoundedPriorityQueue<Bundle*,BundleSz,QueueComp> BundleBPQ;
01257
01258 ProphetBundleQueue(const BundleList* list, BundleActions* actions,
01259 ProphetParams* params,
01260 QueueComp comp = QueueCompFIFO())
01261 : BundleBPQ(comp,params->max_usage_),
01262 bundles_("ProphetBundleQueue"),
01263 actions_(actions),
01264 params_(params)
01265 {
01266 ASSERT(list!=NULL);
01267 ASSERT(params!=NULL);
01268
01269 oasys::ScopeLock l(list->lock(),"ProphetBundleQueue constructor");
01270 for (BundleList::const_iterator i = list->begin();
01271 i != list->end();
01272 i++)
01273 {
01274 push(*i);
01275 }
01276 }
01277
01278 virtual ~ProphetBundleQueue() {}
01279
01280 oasys::SpinLock* lock() const { return bundles_.lock(); }
01281
01282 #define seq_ BundleBPQ::PriorityQueue::c
01283 #define comp_ BundleBPQ::PriorityQueue::comp
01284
01290 size_t bundle_list(BundleList& list) const
01291 {
01292 ASSERT(lock()->is_locked_by_me());
01293
01294 for(BundleList::const_iterator i =
01295 (BundleList::const_iterator)bundles_.begin();
01296 i!=bundles_.end();
01297 i++)
01298 {
01299 list.push_back(*i);
01300 }
01301 return list.size();
01302 }
01303
01304 void drop_bundle(Bundle* b)
01305 {
01306 BundleBPQ::Sequence::iterator i = std::find(seq_.begin(),seq_.end(),b);
01307 if (i != c.end())
01308 {
01309 erase_member(i);
01310 make_heap(seq_.begin(),seq_.end(),comp_);
01311 }
01312 }
01313
01314 void pop()
01315 {
01316
01317 }
01318
01319 void push(Bundle* b)
01320 {
01321
01322 if (bundles_.contains(b))
01323 return;
01324
01325 BundleBPQ::push(b);
01326 bundles_.push_back(b);
01327 }
01328
01329 void set_comp(QueueComp* c)
01330 {
01331 oasys::ScopeLock l(lock(),"set_comp");
01332 comp_ = *c;
01333 make_heap(seq_.begin(),seq_.end(),comp_);
01334 }
01335
01336 protected:
01337 void erase_member(iterator pos) {
01338 BundleBPQ::erase_member(pos);
01339 bundles_.erase(*pos);
01340
01341
01342 }
01343
01344 void enforce_bound() {
01345 oasys::ScopeLock l(lock(),"enforce_bound");
01346
01347 if (params_->qp_ != Prophet::LEPR) {
01348 BundleBPQ::enforce_bound();
01349 return;
01350 }
01351
01352
01353
01354
01355
01356 std::sort_heap(seq_.begin(),seq_.end(),comp_);
01357
01358
01359 std::reverse(seq_.begin(),seq_.end());
01360
01361
01362 for (BundleBPQ::Sequence::iterator i = seq_.begin();
01363 (i != seq_.end()) && (cur_size_ > max_size_);
01364 i++)
01365 {
01366 if (NUM_FWD(*i) < params_->min_forward_)
01367 continue;
01368 cur_size_ -= BundleSz()(*i);
01369 erase_member(i);
01370 }
01371
01372 make_heap(seq_.begin(),seq_.end(),comp_);
01373 }
01374
01375 BundleList bundles_;
01376 BundleActions* actions_;
01377 ProphetParams* params_;
01378 #undef seq_
01379 #undef comp_
01380 };
01381
01382 };
01383
01384 #endif // _DTN_PROPHET_LISTS_