ProphetLists.h

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2006 Baylor University
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 #ifndef _DTN_PROPHET_LISTS_
00018 #define _DTN_PROPHET_LISTS_
00019 
00020 #include "routing/Prophet.h"
00021 #include "routing/ProphetNode.h"
00022 #include <oasys/debug/Log.h>
00023 #include <oasys/util/Time.h>
00024 #include <oasys/thread/SpinLock.h>
00025 #include "naming/EndpointID.h"
00026 #include "bundling/BundleEvent.h"
00027 #include "bundling/BundleActions.h"
00028 #include <oasys/util/BoundedPriorityQueue.h>
00029 #include <oasys/util/URL.h>
00030 
00031 #include <vector>
00032 #include <map>
00033 #include <set>
00034 
00035 #define FOUR_BYTE_ALIGN(x) (((x) % 4) != 0) ? ((x) + (4 - ((x) % 4))) : (x)
00036 
00041 namespace dtn {
00042 
00047 struct ProphetParams : public ProphetNodeParams
00048 {
00049     ProphetParams()
00050         : ProphetNodeParams(),
00051           fs_(Prophet::GRTR),
00052           qp_(Prophet::FIFO),
00053           hello_interval_(Prophet::HELLO_INTERVAL),
00054           hello_dead_(Prophet::HELLO_DEAD),
00055           max_forward_(Prophet::DEFAULT_NUM_F_MAX),
00056           min_forward_(Prophet::DEFAULT_NUM_F_MIN),
00057           max_usage_(0xffffffff),
00058           age_period_(Prophet::AGE_PERIOD),
00059           relay_node_(false),
00060           custody_node_(false),
00061           internet_gw_(false),
00062           epsilon_(0.0039) {}
00063           
00064     Prophet::fwd_strategy_t fs_;      
00065     Prophet::q_policy_t     qp_;      
00066 
00067     u_int8_t hello_interval_; 
00068     u_int   hello_dead_;     
00069 
00070     u_int   max_forward_;    
00071     u_int   min_forward_;    
00072     u_int   max_usage_;      
00073 
00074     u_int   age_period_;     
00075 
00076     bool   relay_node_;     
00077     bool   custody_node_;   
00078     bool   internet_gw_;    
00079     double epsilon_;        
00080 };
00081 
00087 template <class T>
00088 class PointerList : public std::vector<T*>
00089 {
00090 public:
00091     typedef std::vector<T*> List;
00092     typedef typename std::vector<T*>::iterator iterator;
00093     typedef typename std::vector<T*>::const_iterator const_iterator;
00094 
00098     PointerList()
00099         : std::vector<T*>() {}
00100 
00104     PointerList(const PointerList& a)
00105         : std::vector<T*>()
00106     {
00107         clear();
00108         copy_from(a);
00109     }
00110 
00114     virtual ~PointerList()
00115     {
00116         clear();
00117     }
00118 
00122     PointerList& operator= (const PointerList& a)
00123     {
00124         clear();
00125         copy_from(a);
00126         return *this;
00127     }
00128 
00132     void erase(iterator i)
00133     {
00134         delete (*i);
00135         List::erase(i);
00136     }
00137 
00142     void clear()
00143     {
00144         free();
00145         List::clear();
00146     }
00147 protected:
00148 
00152     void free()
00153     {
00154         for(iterator i = List::begin();
00155             i != List::end();
00156             i++)
00157         {
00158             delete *i;
00159         }
00160     }
00161 
00165     void copy_from(const PointerList& a)
00166     {
00167         for(const_iterator i = a.begin();
00168             i != a.end();
00169             i++)
00170         {
00171             push_back(new T(**i));
00172         }
00173     }
00174 }; // template PointerList
00175 
00176 typedef PointerList<ProphetNode> ProphetNodeList;
00177 
00181 struct less_eid_ 
00182 {
00183     bool operator() (const EndpointID& a, const EndpointID& b) const
00184     {
00185         return a.str() < b.str();
00186     }
00187 };
00188 
00192 class ProphetTable
00193 {
00194 public:
00195     typedef std::map<EndpointID,ProphetNode*,less_eid_> rib_table;
00196     typedef std::map<EndpointID,ProphetNode*,less_eid_>::iterator iterator;
00197 
00201     ProphetTable();
00202 
00206     ~ProphetTable();
00207 
00212     ProphetNode* find(const EndpointID& eid) const;
00213 
00218     void update(ProphetNode* node);
00219 
00223 
00224     double p_value(const Bundle* b) const
00225     {
00226         return p_value(Prophet::eid_to_routeid(b->dest_));
00227     }
00228     double p_value(const EndpointID& eid) const;
00230 
00234     size_t dump_table(ProphetNodeList& list) const;
00235 
00239     size_t size() const { return table_.size(); }
00240 
00245     void clear() { free(); table_.clear(); }
00246 
00250     iterator begin();
00251 
00255     iterator end();
00256 
00260     void truncate(double epsilon);
00261 
00266     oasys::SpinLock* lock() { return lock_; }
00267 protected:
00268 
00272     void free();
00273 
00274     oasys::SpinLock* lock_;
00275     rib_table table_;
00276 }; // ProphetTable
00277 
00281 class ProphetTableAgeTimer : public oasys::Timer,
00282                              public oasys::Logger
00283 {
00284 public:
00285     ProphetTableAgeTimer(ProphetTable* table, u_int period, double epsilon) :
00286         oasys::Logger("ProphetTableAgeTimer","/dtn/router/prophet/timer"),
00287         table_(table), period_(period), epsilon_(epsilon)
00288     { reschedule(); }
00289     void timeout(const struct timeval& now);
00290 protected:
00291     void reschedule();
00292     ProphetTable* table_;
00293     u_int period_;
00294     double epsilon_;
00295 }; // ProphetTableAgeTimer 
00296 
00301 class ProphetDictionary
00302 {
00303 public:
00304     typedef std::map<u_int16_t,EndpointID> rribd;
00305     typedef rribd::const_iterator const_iterator;
00306 
00307     ProphetDictionary(const EndpointID& sender = EndpointID::NULL_EID(),
00308                       const EndpointID& receiver = EndpointID::NULL_EID());
00309     ProphetDictionary(const ProphetDictionary& pd);
00310     ~ProphetDictionary() {;}
00311 
00315     u_int16_t find(const EndpointID& eid) const;
00316 
00320     EndpointID find(u_int16_t id) const;
00321 
00325     EndpointID sender() const { return find(0); }
00326 
00330     EndpointID receiver() const { return find(1); }
00331 
00335     bool is_assigned(const EndpointID& eid) const;
00336 
00341     u_int16_t insert(const EndpointID& eid);
00342 
00347     bool assign(const EndpointID& eid, u_int16_t sid);
00348 
00352     const_iterator begin() const { return rribd_.begin(); }
00353     
00357     const_iterator end() const { return rribd_.end(); }
00358     
00362     size_t size() const { return ribd_.size(); }
00363 
00367     size_t guess_ribd_size() const { return guess_; }
00368 
00372     void clear();
00373 
00377     void dump(oasys::StringBuffer *buf);
00378 
00379     ProphetDictionary& operator= (const ProphetDictionary& d)
00380     {
00381         ribd_ = d.ribd_;
00382         rribd_ = d.rribd_;
00383         guess_ = d.guess_;
00384         return *this;
00385     }
00386 protected:
00387     typedef std::map<EndpointID,u_int16_t,less_eid_> ribd;
00388 
00389     void update_guess(size_t len) {
00390         guess_ += FOUR_BYTE_ALIGN(len + Prophet::RoutingAddressStringSize);
00391     }
00392 
00393     ribd      ribd_;
00394     rribd     rribd_;
00395     size_t    guess_;
00396 }; // ProphetDictionary
00397 
00398 struct BundleOfferComp : public std::less<BundleOffer*>
00399 {
00400     BundleOfferComp(ProphetDictionary* ribd, ProphetTable* local)
00401         : ribd_(ribd), nodes_(local) {}
00402 
00403     bool operator()(const BundleOffer* a, const BundleOffer* b) const
00404     {
00405         const EndpointID ea = ribd_->find(a->sid()); 
00406         const EndpointID eb = ribd_->find(b->sid());
00407         return nodes_->p_value(ea) > nodes_->p_value(eb);
00408     }
00409     
00410     ProphetDictionary* ribd_;
00411     ProphetTable* nodes_;
00412 };
00413 
00414 struct BundleOfferSIDComp : public BundleOfferComp
00415 {
00416     BundleOfferSIDComp(ProphetDictionary* ribd,
00417                        ProphetTable* local,
00418                        u_int16_t sid)
00419         : BundleOfferComp(ribd,local), sid_(sid) {}
00420 
00421     bool operator()(const BundleOffer* a,const BundleOffer* b) const
00422     {
00423         // prioritize based on SID
00424         if (a->sid() != b->sid())
00425         {
00426             if (a->sid() == sid_) return true;
00427             if (b->sid() == sid_) return true;
00428         }
00429         // otherwise use parent's ordering
00430         return BundleOfferComp::operator()(a,b);
00431     }
00432     u_int16_t sid_;
00433 };
00434 
00439 class BundleOfferList {
00440 public:
00441     typedef PointerList<BundleOffer> List;
00442     typedef PointerList<BundleOffer>::iterator iterator;
00443     typedef PointerList<BundleOffer>::const_iterator const_iterator;
00444 
00445     BundleOfferList(BundleOffer::bundle_offer_t type =
00446                     BundleOffer::UNDEFINED)
00447         : type_(type), lock_(new oasys::SpinLock())
00448     {
00449     }
00450 
00451     BundleOfferList(const BundleOfferList& list)
00452         : list_(list.list_),
00453           type_(list.type_),
00454           lock_(new oasys::SpinLock())
00455     {
00456     }
00457 
00458     ~BundleOfferList()
00459     {
00460         list_.clear();
00461         delete lock_;
00462     }
00463 
00468     void sort(ProphetDictionary* ribd, ProphetTable* nodes, u_int16_t sid);
00469 
00473     size_t size() const;
00474 
00478     bool empty() const;
00479 
00483     void clear();
00484 
00488     void add_offer(BundleOffer* entry);
00489 
00494     bool remove_bundle(u_int32_t cts, u_int16_t sid);
00495 
00499     void add_offer(u_int32_t cts, u_int16_t sid,
00500                    bool custody=false, bool accept=false, bool ack=false);
00501 
00505     void add_offer(Bundle* b,u_int16_t sid);
00506 
00510     BundleOffer* find(u_int32_t cts, u_int16_t sid) const;
00511 
00512     BundleOffer::bundle_offer_t type() { return type_; }
00513 
00517     iterator begin();
00518     iterator end();
00519     const_iterator begin() const;
00520     const_iterator end() const;
00521 
00525     BundleOffer* front() const
00526     {
00527         oasys::ScopeLock l(lock_,"front");
00528         return list_.front();
00529     }
00530 
00534     size_t guess_size()
00535     {
00536         return Prophet::BundleOfferEntrySize * list_.size();
00537     }
00538 
00539     BundleOfferList& operator= (const BundleOfferList& a)
00540     {
00541         oasys::ScopeLock al(a.lock_,"operator=");
00542         oasys::ScopeLock l(lock_,"operator=");
00543         list_ = a.list_;
00544         type_ = a.type_;
00545         return *this;
00546     }
00547 
00548     oasys::SpinLock* lock() { return lock_; }
00549 
00550     void set_type(BundleOffer::bundle_offer_t type) {type_ = type;}
00551     BundleOffer::bundle_offer_t type() const { return type_; }
00552 
00553     void dump(oasys::StringBuffer *buf);
00554 
00555 protected:
00559     void push_back(BundleOffer* bo);
00560 
00561     List list_;
00562     BundleOffer::bundle_offer_t type_;
00563     oasys::SpinLock* lock_;
00564 }; // BundleOfferList
00565 
00571 class ProphetAckList {
00572 public:
00573     struct less_ack_ {
00574         bool operator() (const ProphetAck* a, const ProphetAck* b) const {
00575             return *a < *b;
00576         }
00577     };
00578     typedef std::set<ProphetAck*,less_ack_> palist;
00579 
00580     ProphetAckList();
00581     ProphetAckList(const ProphetAckList& a) 
00582         : acks_(a.acks_) {;}
00583     ~ProphetAckList();
00584 
00589     bool insert(const EndpointID& eid, u_int32_t cts, u_int32_t ets = 0);
00590 
00594     bool insert(Bundle *b)
00595     {
00596         return insert(Prophet::eid_to_routeid(b->dest_),
00597                       b->creation_ts_.seconds_,
00598                       b->expiration_);
00599     }
00600 
00605     bool insert(ProphetAck* ack);
00606 
00610     size_t count(const EndpointID& eid) const;
00611 
00615     size_t fetch(const EndpointIDPattern& eid,
00616                  PointerList<ProphetAck>& list) const;
00617 
00621     bool is_ackd(const EndpointID& eid, u_int32_t cts) const;
00622     bool is_ackd(Bundle* b) const
00623     {
00624         return is_ackd(
00625                 Prophet::eid_to_routeid(b->dest_),
00626                 b->creation_ts_.seconds_);
00627     }
00628 
00635     size_t expire(u_int32_t older_than = 0);
00636 
00640     size_t size() const { return acks_.size(); }
00641 protected:
00642     palist acks_;
00643     oasys::SpinLock* lock_;
00644 }; // ProphetAckList
00645 
00649 class ProphetAckAgeTimer :
00650     public oasys::Timer,
00651     public oasys::Logger
00652 {
00653 public:
00654     ProphetAckAgeTimer(ProphetAckList* list, u_int period) :
00655         oasys::Logger("ProphetAckAgeTimer","/dtn/router/prophet/timer"),
00656         list_(list), period_(period)
00657     { reschedule(); }
00658     void timeout(const struct timeval& now);
00659 protected:
00660     void reschedule();
00661     ProphetAckList* list_;
00662     u_int period_;
00663 }; // ProphetAckAgeTimer
00664 
00665 struct ProphetStatsEntry {
00666     double p_max_;
00667     double mopr_;
00668     double lmopr_;
00669 };
00670 
00671 class ProphetStats {
00672 public:
00673     ProphetStats() : dropped_(0), lock_(new oasys::SpinLock())
00674     {
00675         pstats_.clear();
00676     }
00677 
00678     ~ProphetStats();
00679 
00680     void update_stats(const Bundle* b, double p);
00681     double get_p_max(const Bundle* b);
00682     double get_mopr(const Bundle* b);
00683     double get_lmopr(const Bundle* b);
00684 
00685     void drop_bundle(const Bundle* b);
00686     u_int dropped() { return dropped_; }
00687 
00688 protected:
00689     typedef std::map<u_int32_t,ProphetStatsEntry*> pstats;
00690     typedef std::map<u_int32_t,ProphetStatsEntry*>::iterator iterator;
00691     typedef std::map<u_int32_t,ProphetStatsEntry*>::const_iterator
00692         const_iterator;
00693 
00694     ProphetStatsEntry* find_entry(const Bundle*);
00695 
00696     u_int   dropped_;
00697     pstats pstats_;
00698     oasys::SpinLock* lock_;
00699 }; // ProphetStats
00700 
00704 class FwdStrategy : public std::less<Bundle*>
00705 {
00706 public:
00707     virtual ~FwdStrategy() {}
00708     FwdStrategy(const FwdStrategy& fs)
00709         : fs_(fs.fs_)
00710     {}
00711     bool operator() (const Bundle*, const Bundle*) const
00712     {
00713         return false;
00714     }
00715     inline static FwdStrategy* strategy( Prophet::fwd_strategy_t fs,
00716                                          ProphetTable* local = NULL,
00717                                          ProphetTable* remote = NULL );
00718 protected:
00719     FwdStrategy(Prophet::fwd_strategy_t fs = Prophet::INVALID_FS)
00720         : fs_(fs)
00721     {}
00722     Prophet::fwd_strategy_t fs_;
00723 };
00724 
00728 class FwdStrategyCompGRTRSORT : public FwdStrategy
00729 {
00730 public:
00731     FwdStrategyCompGRTRSORT (const FwdStrategyCompGRTRSORT& fsc)
00732         : FwdStrategy(fsc), local_(fsc.local_), remote_(fsc.remote_)
00733     {}
00734     bool operator() (const Bundle* a, const Bundle* b) const
00735     {
00736         return ((remote_->p_value(a) - local_->p_value(a)) <
00737                 (remote_->p_value(b) - local_->p_value(b)));
00738     }
00739 
00740 protected:
00741     friend class FwdStrategy;
00742     FwdStrategyCompGRTRSORT(ProphetTable* local,ProphetTable* remote)
00743         : local_(local), remote_(remote)
00744     {
00745         ASSERT(local != NULL);
00746         ASSERT(remote != NULL);
00747     }
00748 
00749     ProphetTable* local_;
00750     ProphetTable* remote_;
00751 };
00752 
00756 class FwdStrategyCompGRTRMAX : public FwdStrategy
00757 {
00758 public:
00759     FwdStrategyCompGRTRMAX(const FwdStrategyCompGRTRMAX& f)
00760         : FwdStrategy(f), remote_(f.remote_) {}
00761     bool operator() (const Bundle* a, const Bundle* b) const
00762     {
00763         return (remote_->p_value(a) < remote_->p_value(b));
00764     }
00765 
00766 protected:
00767     friend class FwdStrategy;
00768     FwdStrategyCompGRTRMAX(ProphetTable* remote)
00769         : remote_(remote)
00770     {
00771         ASSERT(remote != NULL);
00772     }
00773 
00774     ProphetTable* remote_;
00775 };
00776 
00780 FwdStrategy*
00781 FwdStrategy::strategy( Prophet::fwd_strategy_t fs,
00782                        ProphetTable* local,
00783                        ProphetTable* remote )
00784 {
00785     FwdStrategy *f = NULL;
00786     switch (fs) 
00787     {
00788         case Prophet::GRTR:
00789         case Prophet::GTMX:
00790         case Prophet::GRTR_PLUS:
00791         case Prophet::GTMX_PLUS:
00792             f = new FwdStrategy();
00793             break;
00794         case Prophet::GRTR_SORT:
00795             //f = (FwdStrategy*) new FwdStrategyCompGRTRSORT(local,remote);
00796             f = new FwdStrategyCompGRTRSORT(local,remote);
00797             break;
00798         case Prophet::GRTR_MAX:
00799             //f = (FwdStrategy*) new FwdStrategyCompGRTRMAX(remote);
00800             f = new FwdStrategyCompGRTRMAX(remote);
00801             break;
00802         default:
00803             PANIC("Invalid forwarding strategy: %d",(int)fs);
00804             break;
00805     }
00806     return f;
00807 };
00808 
00812 class ProphetDecider : public oasys::Logger
00813 {
00814 public:
00815     inline static ProphetDecider* decider(
00816                       Prophet::fwd_strategy_t fs,
00817                       ProphetTable* local = NULL,
00818                       ProphetTable* remote = NULL,
00819                       Link* nexthop = NULL,
00820                       u_int max_forward = 0,
00821                       ProphetStats* stats = NULL
00822                   );
00823     virtual ~ProphetDecider() {}
00824     virtual bool operator() (const Bundle*) const = 0;
00825     inline bool should_fwd(const Bundle* bundle) const;
00826 protected:
00827     ProphetDecider(Link* nexthop)
00828         : oasys::Logger("ProphetDecider","/dtn/route/decider"),
00829           next_hop_(nexthop),
00830           route_(Prophet::eid_to_route(nexthop->remote_eid()))
00831     {}
00832 
00833     Link* next_hop_;
00834     EndpointIDPattern route_;
00835 };
00836 
00841 class FwdDeciderGRTR : public ProphetDecider
00842 {
00843 public:
00844     bool operator() (const Bundle* b) const
00845     {
00846         if (!ProphetDecider::should_fwd(b))
00847             return false;
00848         if (route_.match(b->dest_))
00849             return true;
00850         if (local_->p_value(b) < remote_->p_value(b))
00851         {
00852             log_debug("remote p %0.2f local p %0.2f: ok to fwd *%p",
00853                       remote_->p_value(b),local_->p_value(b),b);
00854             return true;
00855         }
00856         log_debug("remote p %0.2f local p %0.2f: do not fwd *%p",
00857                   remote_->p_value(b),local_->p_value(b),b);
00858         return false;
00859     }
00860 
00861     virtual ~FwdDeciderGRTR() {}
00862 protected:
00863     friend class ProphetDecider;
00864     FwdDeciderGRTR(ProphetTable* local, ProphetTable* remote, Link* nexthop)
00865         : ProphetDecider(nexthop), local_(local), remote_(remote)
00866     {
00867         ASSERT(local != NULL);
00868         ASSERT(remote != NULL);
00869     }
00870 
00871     ProphetTable* local_;
00872     ProphetTable* remote_;
00873 };
00874 
00875 class FwdDeciderGTMX : public FwdDeciderGRTR
00876 {
00877 public:
00878     bool operator() (const Bundle* b) const
00879     {
00880         if ( ! FwdDeciderGRTR::operator()(b) )
00881             return false;
00882         if (route_.match(b->dest_))
00883             return true;
00884         size_t num_fwd = 
00885             b->fwdlog_.get_transmission_count(ForwardingInfo::COPY_ACTION);
00886         if (num_fwd < max_fwd_)
00887         {
00888             log_debug("NF %zu, max NF %d: ok to fwd *%p",num_fwd,max_fwd_,b);
00889             return true;
00890         }
00891         log_debug("NF %zu, max NF %d: do not fwd *%p",num_fwd,max_fwd_,b);
00892         return false;
00893     }
00894     
00895     virtual ~FwdDeciderGTMX() {}
00896 protected:
00897     friend class ProphetDecider;
00898     FwdDeciderGTMX(ProphetTable* local, ProphetTable* remote,
00899                    Link* nexthop, u_int max_forward)
00900         : FwdDeciderGRTR(local,remote,nexthop),
00901           max_fwd_(max_forward)
00902     {}
00903 
00904     u_int max_fwd_;
00905 };
00906 
00907 class FwdDeciderGRTRPLUS : public FwdDeciderGRTR
00908 {
00909 public:
00910     bool operator() (const Bundle* b) const
00911     {
00912         if ( ! FwdDeciderGRTR::operator()(b) )
00913             return false;
00914         if (route_.match(b->dest_))
00915             return true;
00916         bool ok = stats_->get_p_max(b) < remote_->p_value(b);
00917         log_debug("max P %0.2f, remote P %0.2f, %s fwd *%p",
00918                   stats_->get_p_max(b),remote_->p_value(b),
00919                   ok ? "ok to" : "do not", b);
00920         return ok;
00921     }
00922 
00923     virtual ~FwdDeciderGRTRPLUS() {}
00924 protected:
00925     friend class ProphetDecider;
00926     FwdDeciderGRTRPLUS(ProphetTable* local, ProphetTable* remote,
00927                        Link* nexthop, ProphetStats* stats)
00928         : FwdDeciderGRTR(local,remote,nexthop),
00929           stats_(stats)
00930     {
00931         ASSERT(stats != NULL);
00932     }
00933 
00934     ProphetStats* stats_;
00935 };
00936 
00937 class FwdDeciderGTMXPLUS : public FwdDeciderGRTRPLUS
00938 {
00939 public:
00940     bool operator() (const Bundle* b) const
00941     {
00942         if ( ! FwdDeciderGRTRPLUS::operator()(b) )
00943             return false;
00944         if (route_.match(b->dest_))
00945             return true;
00946         size_t num_fwd = 
00947             b->fwdlog_.get_transmission_count(ForwardingInfo::COPY_ACTION);
00948         bool ok = ((stats_->get_p_max(b) < remote_->p_value(b)) &&
00949                    (num_fwd < max_fwd_));
00950         log_debug("NF %zu, Max NF %d, max P %0.2f, remote P %0.2f, %s fwd *%p",
00951                   num_fwd,max_fwd_,stats_->get_p_max(b),remote_->p_value(b),
00952                   ok ? "ok to" : "do not", b);
00953         return ok;
00954     }
00955 
00956     virtual ~FwdDeciderGTMXPLUS() {}
00957 protected:
00958     friend class ProphetDecider;
00959     FwdDeciderGTMXPLUS(ProphetTable* local, ProphetTable* remote,
00960                        Link* nexthop, ProphetStats* stats, u_int max_forward)
00961         : FwdDeciderGRTRPLUS(local,remote,nexthop,stats), max_fwd_(max_forward)
00962     {}
00963 
00964     u_int max_fwd_;
00965 };
00966 
00970 ProphetDecider*
00971 ProphetDecider::decider( Prophet::fwd_strategy_t fs, ProphetTable* local,
00972                          ProphetTable* remote, Link* nexthop,
00973                          u_int max_forward, ProphetStats* stats )
00974 {
00975     ProphetDecider* pd = NULL;
00976     switch(fs) {
00977         case Prophet::GRTR:
00978         case Prophet::GRTR_SORT:
00979         case Prophet::GRTR_MAX:
00980             pd = (ProphetDecider*) new FwdDeciderGRTR(local,remote,nexthop);
00981             break;
00982         case Prophet::GTMX:
00983             pd = (ProphetDecider*) new
00984                 FwdDeciderGTMX(local,remote,nexthop,max_forward);
00985             break;
00986         case Prophet::GRTR_PLUS:
00987             pd = (ProphetDecider*) new
00988                 FwdDeciderGRTRPLUS(local,remote,nexthop,stats);
00989             break;
00990         case Prophet::GTMX_PLUS:
00991             pd = (ProphetDecider*) new
00992                 FwdDeciderGTMXPLUS(local,remote,nexthop,stats,max_forward);
00993             break;
00994         default:
00995             PANIC("Invalid forwarding strategy: %d",(int)fs);
00996             break;
00997     }
00998     return pd;
00999 }
01000 
01001 // borrowed (modified) from TableBasedRouter::should_fwd
01002 bool
01003 ProphetDecider::should_fwd(const Bundle* bundle) const
01004 {
01005     ForwardingInfo info;
01006     bool found = bundle->fwdlog_.get_latest_entry(next_hop_, &info);
01007 
01008     if (found)
01009     {
01010         ASSERT(info.state_ != ForwardingInfo::NONE);
01011     }
01012     else
01013     {
01014         ASSERT(info.state_ == ForwardingInfo::NONE);
01015     }
01016 
01017     if (info.state_ == ForwardingInfo::TRANSMITTED ||
01018         info.state_ == ForwardingInfo::IN_FLIGHT)
01019     {
01020         log_debug("should_fwd bundle %d: "
01021                   "skip %s due to forwarding log entry %s",
01022                   bundle->bundleid_, next_hop_->name(),
01023                   ForwardingInfo::state_to_str(
01024                       (ForwardingInfo::state_t)info.state_));
01025         return false;
01026     }
01027 
01028     if (info.state_ == ForwardingInfo::TRANSMIT_FAILED) {
01029         log_debug("should_fwd bundle %d: "
01030                   "match %s: forwarding log entry %s TRANSMIT_FAILED %d",
01031                   bundle->bundleid_, next_hop_->name(),
01032                   ForwardingInfo::state_to_str((ForwardingInfo::state_t)
01033                       info.state_),
01034                   bundle->bundleid_);
01035 
01036     } else {
01037         log_debug("should_fwd bundle %d: "
01038                   "match %s: forwarding log entry %s",
01039                   bundle->bundleid_, next_hop_->name(),
01040                   ForwardingInfo::state_to_str((ForwardingInfo::state_t)
01041                       info.state_));
01042     }
01043 
01044     return true;
01045 }
01046 
01051 class ProphetBundleOffer : public std::priority_queue<Bundle*>,
01052                            public oasys::Logger
01053 {
01054 protected:
01055     typedef std::priority_queue<Bundle*> BundleQueue;
01056 public:
01057     ProphetBundleOffer(const BundleList& bundles,
01058                        FwdStrategy* comp,
01059                        ProphetDecider* decider)
01060         : BundleQueue(*comp),
01061           oasys::Logger("ProphetBundleOffer","/dtn/route/offer"),
01062           list_("ProphetBundleOffer"),comp_(comp),decide_(decider)
01063     {
01064         oasys::ScopeLock l(bundles.lock(),"ProphetBundleOffer constructor");
01065         for(BundleList::const_iterator i =
01066                 (BundleList::const_iterator) bundles.begin();
01067             i != bundles.end(); 
01068             i++ )
01069         {
01070             push(*i);
01071         }
01072     } 
01073     virtual ~ProphetBundleOffer() {}
01074     void push(Bundle* b)
01075     {
01076         // not every bundle gets forwarded to every neighbor
01077         if(decide_->operator()(b))
01078         {
01079             BundleQueue::push(b);
01080             list_.push_back(b);
01081             log_debug("offering *%p",b);
01082         }
01083         else
01084         {
01085             log_debug("not offering *%p",b);
01086         }
01087     }
01088     void pop()
01089     {
01090         list_.erase(top());
01091         BundleQueue::pop();
01092     }
01093 
01094 protected:
01095     BundleList list_;
01096     FwdStrategy* comp_;
01097     ProphetDecider* decide_;
01098 };
01099 
01103 struct BundleSz {
01104     u_int operator() (const Bundle* b) const
01105     {
01106         return b->payload_.length();
01107     }
01108 };
01109 
01110 struct QueueComp: public std::less<Bundle*> 
01111 {
01112     inline
01113     static QueueComp* queuecomp(Prophet::q_policy_t qp,
01114                                 ProphetStats* ps,
01115                                 ProphetTable* pt);
01116 };
01117 
01121 struct QueueCompFIFO : public QueueComp
01122 {
01123     bool operator() (const Bundle* a, const Bundle* b) const
01124     {
01125         return (a->bundleid_ < b->bundleid_);
01126     }
01127 };
01128 
01132 struct QueueCompMOFO : public QueueComp
01133 {
01134 #define NUM_FWD(b) \
01135         (b)->fwdlog_.get_transmission_count(ForwardingInfo::COPY_ACTION)
01136     bool operator() (const Bundle* a, const Bundle* b) const
01137     {
01138         return (NUM_FWD(a) < NUM_FWD(b));
01139     }
01140 };
01141 
01145 class QueueCompMOPR : public QueueComp
01146 {
01147 public:
01148     QueueCompMOPR(ProphetStats* pstats)
01149         : pstats_(pstats)
01150     {
01151         ASSERT(pstats_ != NULL);
01152     }
01153 #define MOPR(b) pstats_->get_mopr(b)
01154     bool operator() (const Bundle* a, const Bundle* b) const
01155     {
01156         return MOPR(a) < MOPR(b);
01157     }
01158 protected:
01159     ProphetStats* pstats_;
01160 };
01161 
01165 class QueueCompLMOPR : public QueueComp
01166 {
01167 public:
01168     QueueCompLMOPR(ProphetStats* pstats)
01169         : pstats_(pstats)
01170     {
01171         ASSERT(pstats_ != NULL);
01172     }
01173 #define LMOPR(b) pstats_->get_lmopr(b)
01174     bool operator() (const Bundle* a, const Bundle* b) const
01175     {
01176         return (LMOPR(a) < LMOPR(b));
01177     }
01178 protected:
01179     ProphetStats* pstats_;
01180 };
01181 
01185 struct QueueCompSHLI : public QueueComp
01186 {
01187 #define SHLI(b) (b)->expiration_ 
01188     bool operator() (const Bundle* a, const Bundle* b) const
01189     {
01190         return (SHLI(a) < SHLI(b));
01191     }
01192 };
01193 
01194 struct QueueCompLEPR : public QueueComp
01195 {
01196     QueueCompLEPR(ProphetTable* nodes)
01197         : nodes_(nodes)
01198     {
01199         ASSERT(nodes!=NULL);
01200     }
01201 #define LEPR(b) nodes_->p_value(b)
01202     bool operator() (const Bundle* a, const Bundle* b) const
01203     {
01204         return (LEPR(a) < LEPR(b));
01205     }
01206 protected:
01207     ProphetTable* nodes_;
01208 };
01209 
01210 QueueComp*
01211 QueueComp::queuecomp(Prophet::q_policy_t qp,
01212                      ProphetStats* ps,
01213                      ProphetTable* pt)
01214 {
01215     (void) ps;
01216     (void) pt;
01217     switch(qp) {
01218         case Prophet::FIFO:
01219             return new QueueCompFIFO();
01220         case Prophet::MOFO:
01221             return new QueueCompMOFO();
01222         case Prophet::MOPR:
01223             return new QueueCompMOPR(ps);
01224         case Prophet::LINEAR_MOPR:
01225             return new QueueCompLMOPR(ps);
01226         case Prophet::SHLI:
01227             return new QueueCompSHLI();
01228         case Prophet::LEPR:
01229             return new QueueCompLEPR(pt);
01230         case Prophet::INVALID_QP:
01231         default:
01232             PANIC("Unrecognized queue policy %d",qp);
01233     }
01234 }
01235 
01236 struct ProphetBundleList
01237 {
01238     static Bundle* find(const BundleList& list,
01239                         const EndpointID& dest,
01240                         u_int32_t creation_ts);
01241 };
01242 
01252 class ProphetBundleQueue :
01253     public oasys::BoundedPriorityQueue<Bundle*,BundleSz,QueueComp> 
01254 {
01255 public:
01256     typedef oasys::BoundedPriorityQueue<Bundle*,BundleSz,QueueComp> BundleBPQ;
01257 
01258     ProphetBundleQueue(const BundleList* list, BundleActions* actions,
01259                        ProphetParams* params,
01260                        QueueComp comp = QueueCompFIFO())
01261         : BundleBPQ(comp,params->max_usage_),
01262           bundles_("ProphetBundleQueue"),
01263           actions_(actions),
01264           params_(params)
01265     {
01266         ASSERT(list!=NULL);
01267         ASSERT(params!=NULL);
01268         
01269         oasys::ScopeLock l(list->lock(),"ProphetBundleQueue constructor");
01270         for (BundleList::const_iterator i = list->begin();
01271              i != list->end();
01272              i++)
01273         {
01274             push(*i);
01275         }
01276     }
01277 
01278     virtual ~ProphetBundleQueue() {}
01279 
01280     oasys::SpinLock* lock() const { return bundles_.lock(); }
01281 
01282 #define seq_ BundleBPQ::PriorityQueue::c
01283 #define comp_ BundleBPQ::PriorityQueue::comp
01284 
01290     size_t bundle_list(BundleList& list) const
01291     {
01292         ASSERT(lock()->is_locked_by_me());
01293         // copy out a new sequence of internal heap
01294         for(BundleList::const_iterator i =
01295                 (BundleList::const_iterator)bundles_.begin();
01296             i!=bundles_.end();
01297             i++)
01298         {
01299             list.push_back(*i);
01300         }
01301         return list.size();
01302     }
01303 
01304     void drop_bundle(Bundle* b)
01305     {
01306         BundleBPQ::Sequence::iterator i = std::find(seq_.begin(),seq_.end(),b);
01307         if (i != c.end())
01308         {
01309             erase_member(i);
01310             make_heap(seq_.begin(),seq_.end(),comp_);
01311         }
01312     }
01313 
01314     void pop()
01315     {
01316         // no op
01317     }
01318 
01319     void push(Bundle* b)
01320     {
01321         // prevent duplicates in seq_
01322         if (bundles_.contains(b))
01323             return;
01324 
01325         BundleBPQ::push(b);
01326         bundles_.push_back(b);
01327     }
01328 
01329     void set_comp(QueueComp* c)
01330     {
01331         oasys::ScopeLock l(lock(),"set_comp");
01332         comp_ = *c;
01333         make_heap(seq_.begin(),seq_.end(),comp_);
01334     }
01335 
01336 protected:
01337     void erase_member(iterator pos) {
01338         BundleBPQ::erase_member(pos);
01339         bundles_.erase(*pos);
01340         //if(actions_ != NULL)
01341         //actions_->delete_bundle(*pos,BundleProtocol::REASON_DEPLETED_STORAGE);
01342     }
01343 
01344     void enforce_bound() {
01345         oasys::ScopeLock l(lock(),"enforce_bound");
01346         // business as usual, unless LEPR
01347         if (params_->qp_ != Prophet::LEPR) {
01348             BundleBPQ::enforce_bound();
01349             return;
01350         }
01351 
01352         // Apply LEPR queueing policy: things get ugly fast
01353         // due to the requirement of min_forward_
01354 
01355         // sort
01356         std::sort_heap(seq_.begin(),seq_.end(),comp_);
01357 
01358         // reverse
01359         std::reverse(seq_.begin(),seq_.end());
01360 
01361         // find suitable victims until threshold is reached
01362         for (BundleBPQ::Sequence::iterator i = seq_.begin();
01363              (i != seq_.end()) && (cur_size_ > max_size_);
01364              i++)
01365         {
01366             if (NUM_FWD(*i) < params_->min_forward_)
01367                 continue;
01368             cur_size_ -= BundleSz()(*i);
01369             erase_member(i);
01370         }
01371         // reheap the mangled mess
01372         make_heap(seq_.begin(),seq_.end(),comp_);
01373     }
01374 
01375     BundleList bundles_;
01376     BundleActions* actions_;
01377     ProphetParams* params_;
01378 #undef seq_
01379 #undef comp_
01380 };
01381 
01382 }; // dtn
01383 
01384 #endif // _DTN_PROPHET_LISTS_

Generated on Thu Jun 7 16:56:51 2007 for DTN Reference Implementation by  doxygen 1.5.1