ProphetController.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2006 Baylor University
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 #include <netinet/in.h>
00018 #include "bundling/Bundle.h"
00019 #include "bundling/BundleRef.h"
00020 #include "bundling/BundleList.h"
00021 #include "bundling/BundleDaemon.h"
00022 #include "ProphetController.h"
00023 #include <oasys/thread/Lock.h>
00024 #include <oasys/util/Random.h>
00025 #include <oasys/util/ScratchBuffer.h>
00026 
00027 #include <queue>
00028 
00029 namespace dtn {
00030 
00031 template<>
00032 ProphetController* oasys::Singleton<ProphetController,false>::instance_ = NULL;
00033 
00034 void
00035 ProphetController::do_init(ProphetParams* params,
00036                            const BundleList* list,
00037                            BundleActions* actions,
00038                            const char* logpath) 
00039 {
00040     ASSERT(params != NULL);
00041     ASSERT(actions != NULL);
00042     ASSERT(list != NULL);
00043 
00044     params_ = params;
00045     actions_ = actions;
00046 
00047     bundles_ = new ProphetBundleQueue(list, actions, params,
00048                            *(QueueComp::queuecomp(params->qp_,
00049                                                 &pstats_,
00050                                                 &nodes_)));
00051 
00052     node_age_timer_ = new ProphetTableAgeTimer(&nodes_,
00053                                                params_->age_period_,
00054                                                params_->epsilon_);
00055     ack_age_timer_ = new ProphetAckAgeTimer(&acks_,params_->age_period_);
00056 
00057     lock_ = new oasys::SpinLock();
00058 
00059     set_logpath(logpath);
00060 }
00061 
00062 ProphetController::ProphetController()
00063     : oasys::Logger("ProphetController","/dtn/route/prophet/controller"),
00064       params_(NULL),
00065       node_age_timer_(NULL),
00066       ack_age_timer_(NULL),
00067       actions_(NULL),
00068       bundles_(NULL)
00069 {
00070     Prophet::UniqueID::init();
00071     encounters_.clear();
00072     prophet_eid_.assign(BundleDaemon::instance()->local_eid());
00073     ASSERT(prophet_eid_.append_service_tag("prophet"));
00074 }
00075 
00076 ProphetController::~ProphetController()
00077 {
00078     delete node_age_timer_;
00079     delete ack_age_timer_;
00080     delete bundles_;
00081     delete lock_;
00082 }
00083 
00084 void 
00085 ProphetController::shutdown()
00086 {
00087     {
00088         oasys::ScopeLock l(lock_,"destructor");
00089         enc_set::iterator it = encounters_.begin();
00090         while( it != encounters_.end() )
00091         {
00092             ProphetEncounter* pe = *it;
00093             pe->neighbor_gone();
00094             it++;
00095         }
00096         encounters_.clear();
00097     }
00098     node_age_timer_->cancel();
00099     ack_age_timer_->cancel();
00100 }
00101 
00102 // this will eventually turn into a nasty beast, what can I do to condense it?
00103 void
00104 ProphetController::dump_state(oasys::StringBuffer* buf)
00105 {
00106     oasys::ScopeLock l(lock_,"dump_state");
00107     buf->appendf("\n"
00108                  "ProphetRouter [%s] [%s] (%zu active, %zu known)\n"
00109                  "-------------\n",
00110                  Prophet::fs_to_str(params_->fs_),
00111                  Prophet::qp_to_str(params_->qp_),
00112                  encounters_.size(), nodes_.size());
00113 
00114     // iterate over active encounters
00115     for (enc_set::iterator it = encounters_.begin();
00116          it != encounters_.end();
00117          it++)
00118     {
00119          ProphetEncounter* pe = *it;
00120          pe->dump_state(buf);
00121     }
00122 
00123     // iterate over nodes
00124     buf->appendf("\n"
00125                  "Known peers\n"
00126                  "-----------\n");
00127     oasys::ScopeLock n(nodes_.lock(),"ProphetController::dump_state");
00128     for (ProphetTable::iterator i = nodes_.begin();
00129          i != nodes_.end();
00130          i++)
00131     {
00132         EndpointID eid = (*i).first;
00133         ProphetNode* node = (*i).second;
00134         buf->appendf("%02.2f  %c%c%c %s\n",
00135                      node->p_value(),
00136                      node->relay() ? 'R' : ' ',
00137                      node->custody() ? 'C' : ' ',
00138                      node->internet_gw() ? 'I' : ' ',
00139                      eid.c_str());
00140     }
00141 
00142     buf->appendf("\n R - relay   C - custody   I - internet gateway\n\n");
00143 }
00144 
00145 ProphetEncounter*
00146 ProphetController::find_instance(Link* link)
00147 {
00148     oasys::ScopeLock l(lock_,"find_instance");
00149     enc_set::iterator it = encounters_.begin();
00150     while( it != encounters_.end() )
00151     {
00152         if((*it)->next_hop()->remote_eid().equals(link->remote_eid()))
00153             return (ProphetEncounter*) (*it);
00154         else
00155             log_debug("find_instance: %s != %s",
00156                       (*it)->next_hop()->remote_eid().c_str(),
00157                       link->remote_eid().c_str());
00158         it++;
00159     }
00160     return NULL;
00161 }
00162 
00163 void
00164 ProphetController::new_neighbor(const ContactRef& contact)
00165 {
00166     log_info("NEW_NEIGHBOR signal from *%p",contact.object());
00167     Link* link = contact.object()->link();
00168     ProphetEncounter* pe = find_instance(link);
00169     if (pe == NULL && !link->remote_eid().equals(EndpointID::NULL_EID()))
00170     {
00171         pe = new ProphetEncounter(link, this);
00172         if (!reg(pe))
00173         {
00174           delete pe;
00175           return;
00176         }
00177         pe->start();
00178     }
00179 }
00180 
00181 void
00182 ProphetController::neighbor_gone(const ContactRef& contact)
00183 {
00184     Link* link = contact.object()->link();
00185     log_info("NEIGHBOR_GONE signal from *%p",contact.object());
00186     ProphetEncounter* pe = NULL;
00187     if((pe = find_instance(link)) != NULL)
00188     {
00189         pe->neighbor_gone(); // self deletes once stopped
00190         log_info("found and stopped ProphetEncounter instance");
00191     }
00192     else
00193     {
00194         log_info("did not find ProphetEncounter instance");
00195     }
00196 }
00197 
00198 void
00199 ProphetController::handle_bundle_received(Bundle* bundle,const ContactRef& contact)
00200 {
00201     log_debug("handle_bundle_received, *%p from *%p",bundle,contact.object());
00202 
00203     // Look up information on Bundle destination, else start a new record
00204     EndpointID routeid = Prophet::eid_to_routeid(bundle->dest_);
00205     ProphetNode* node = nodes_.find(routeid);
00206     if (node == NULL && !routeid.equals(BundleDaemon::instance()->local_eid()))
00207     {
00208         node = new ProphetNode(params_);
00209         node->set_eid(Prophet::eid_to_routeid(bundle->dest_));
00210         nodes_.update(node);
00211     }
00212 
00213     if (prophet_eid_.equals(bundle->dest_))
00214     {
00215         // attempt to read out Prophet control message
00216         ProphetTLV* pt = ProphetTLV::deserialize(bundle);
00217         if (pt != NULL)
00218         {
00219             log_debug("handle_bundle_received, got TLV size %d",pt->length());
00220             // demux to appropriate ProphetEncounter instance
00221             ProphetEncounter *pe = find_instance(contact->link());
00222             if (pe == NULL)
00223             {
00224                 // this is first contact, create a new instance
00225                 new_neighbor(contact);
00226                 if ((pe = find_instance(contact->link())) == NULL)
00227                 {
00228                     log_err("Unable to find or create ProphetEncounter to "
00229                             "handle Prophet control message *%p",bundle);
00230                     delete pt;
00231                 }
00232             }
00233 
00234             if (pe != NULL)
00235             {
00236                 // dispatch message to ProphetEncounter
00237                 log_debug("handle_bundle_received, dispatching TLV to instance %d",
00238                           pe->local_instance());
00239                 pe->receive_tlv(pt);
00240             }
00241 
00242             // our way of signalling Bundle Delivered
00243             actions_->delete_bundle(bundle,BundleProtocol::REASON_NO_ADDTL_INFO);
00244         }
00245     }
00246 
00247     // not a control message
00248     else
00249     {
00250         oasys::ScopeLock l(lock_,"handle_bundle_received");
00251         // signal each thread that a Bundle has arrived
00252         enc_set::iterator it = encounters_.begin();
00253         while( it != encounters_.end() )
00254         {
00255             ProphetEncounter* pe = *it;
00256             pe->handle_bundle_received(bundle);
00257             it++;
00258         }
00259 
00260         // enqueue with Prophet queueing policy
00261         bundles_->push(bundle);
00262     }
00263 }
00264 
00265 void
00266 ProphetController::handle_bundle_delivered(Bundle* b)
00267 {
00268     BundleRef bundle("handle_bundle_delivered");
00269     bundle = b;
00270     if (bundle.object() == NULL) return;
00271 
00272     // add to ack list
00273     acks_.insert(bundle.object());
00274 
00275     // drop from local store
00276     bundle = NULL;
00277     bundles_->drop_bundle(b);
00278 }
00279 
00280 void 
00281 ProphetController::handle_bundle_expired(Bundle* b)
00282 {
00283     BundleRef bundle("handle_bundle_expired");
00284     bundle = b;
00285     if (bundle.object() == NULL) return;
00286 
00287     // drop stats entry for this bundle
00288     pstats_.drop_bundle(bundle.object());
00289 
00290     // dequeue from Prophet's bundle store
00291     bundle = NULL;
00292     bundles_->drop_bundle(b);
00293 }
00294 
00295 void
00296 ProphetController::handle_link_state_change_request(const ContactRef& c)
00297 {
00298     // demux to appropriate instance
00299     ProphetEncounter* pe = find_instance(c.object()->link());
00300     if (pe != NULL)
00301     {
00302         // attempt to send queued bundles, if any
00303         pe->flush_pending();
00304     }
00305 }
00306 
00307 } // namespace dtn

Generated on Thu Jun 7 12:54:28 2007 for DTN Reference Implementation by  doxygen 1.5.1