ProphetEncounter.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2006 Baylor University
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 #include <netinet/in.h>
00018 #include "bundling/Bundle.h"
00019 #include "bundling/BundleRef.h"
00020 #include "bundling/BundleList.h"
00021 #include "bundling/BundleDaemon.h"
00022 #include "ProphetEncounter.h"
00023 #include "ProphetController.h"
00024 #include <oasys/thread/Lock.h>
00025 #include <oasys/util/Random.h>
00026 #include <oasys/util/ScratchBuffer.h>
00027 
00028 namespace dtn {
00029 
00030 ProphetEncounter::ProphetEncounter(Link* nexthop,
00031                                    ProphetOracle* oracle)
00032     : oasys::Thread("ProphetEncounter",oasys::Thread::DELETE_ON_EXIT),
00033       oasys::Logger("ProphetEncounter","/dtn/route"),
00034       oracle_(oracle),
00035       remote_instance_(0),
00036       local_instance_(Prophet::UniqueID::instance()->instance_id()),
00037       tid_(0),
00038       timeout_(0),
00039       next_hop_(nexthop),
00040       synsender_(false),initiator_(false),
00041       synsent_(false),synrcvd_(false),
00042       estab_(false),
00043       neighbor_gone_(false),
00044       state_(WAIT_NB),
00045       cmdqueue_("/dtn/route"),
00046       to_be_fwd_("ProphetEncounter to be forwarded"),
00047       outbound_tlv_(NULL),
00048       state_lock_(new oasys::SpinLock()),
00049       otlv_lock_(new oasys::SpinLock())
00050 {
00051     // validate inputs and assumptions
00052     ASSERT(oracle != NULL);
00053     ASSERT(local_instance_ != 0);
00054     ASSERT(nexthop != NULL);
00055     ASSERT(nexthop->local() != "");
00056     ASSERT(!nexthop->remote_eid().equals(EndpointID::NULL_EID()));
00057 
00058     logpath_appendf("/encounter-%d",local_instance_);
00059 
00060     // validate ProphetOracle contract
00061     ASSERT(oracle_->params() != NULL);
00062     ASSERT(oracle_->bundles() != NULL);
00063     ASSERT(oracle_->nodes() != NULL);
00064     ASSERT(oracle_->actions() != NULL);
00065     ASSERT(oracle_->acks() != NULL);
00066     ASSERT(oracle_->stats() != NULL);
00067     
00068     // set poll timeout
00069     // convert from units of 100ms to ms
00070     timeout_ = oracle->params()->hello_interval_ * 100;
00071 
00072     // zero out timers
00073     data_sent_.get_time();
00074     data_rcvd_.get_time();
00075     
00076     // initialize lists
00077     offers_.set_type(BundleOffer::OFFER);
00078     requests_.set_type(BundleOffer::RESPONSE);
00079 
00080     ack_count_ = 0;
00081 }
00082 
00083 ProphetEncounter::~ProphetEncounter() {
00084     // clean up the memory we're responsible for
00085     while(cmdqueue_.size() != 0)
00086     {
00087         ProphetEncounter::PEMsg pm;
00088         ASSERT(cmdqueue_.try_pop(&pm));
00089         if(pm.tlv_ != NULL)
00090             delete pm.tlv_;
00091     }
00092     oasys::ScopeLockIf l(otlv_lock_,"destructor",outbound_tlv_ != NULL);
00093     if (outbound_tlv_ != NULL) 
00094     {
00095         delete outbound_tlv_;
00096         outbound_tlv_ = NULL;
00097     }
00098     l.unlock();
00099     delete state_lock_;
00100     delete otlv_lock_;
00101 }
00102 
00103 bool
00104 ProphetEncounter::operator< (const ProphetEncounter& p) const
00105 {
00106     return local_instance_ < p.local_instance_;
00107 }
00108 
00109 bool
00110 ProphetEncounter::operator< (u_int16_t inst) const
00111 {
00112     return local_instance_ < inst;
00113 }
00114 
00115 ProphetEncounter::prophet_state_t
00116 ProphetEncounter::get_state(const char *where)
00117 {
00118     oasys::ScopeLock l(state_lock_, where);
00119     log_debug("get_state(%s) == %s",where,state_to_str(state_));
00120     return state_;
00121 }
00122 
00123 void
00124 ProphetEncounter::set_state(prophet_state_t new_state)
00125 {
00126     oasys::ScopeLock l(state_lock_,"ProphetEncounter::set_state");
00127     log_debug("set_state from %s to %s",
00128                state_to_str(state_),
00129                state_to_str(new_state));
00130 
00131     prophet_state_t oldstate = state_;
00132     state_ = new_state; 
00133 
00134     switch(new_state) {
00135     case WAIT_NB:
00136         // always legal
00137         synsent_ = false;
00138         synrcvd_ = false;
00139         estab_ = false;
00140         ack_count_ = 0;
00141         timeout_ = oracle_->params()->hello_interval_ * 100;
00142         break;
00143     case SYNSENT:
00144         ASSERT(oldstate == WAIT_NB ||
00145                oldstate == SYNSENT);
00146         synsent_ = true;
00147         break;
00148     case SYNRCVD:
00149         ASSERT(oldstate == WAIT_NB ||
00150                oldstate == SYNSENT ||
00151                oldstate == SYNRCVD);
00152         synrcvd_ = true;
00153         break;
00154     case ESTAB:
00155         ASSERT(oldstate == SYNRCVD ||
00156                oldstate == ESTAB ||
00157                oldstate == SYNSENT);
00158         estab_ = true;
00159         if (initiator_ == true)
00160         {
00161             set_state(CREATE_DR);
00162         }
00163         else
00164         {
00165             set_state(WAIT_DICT);
00166         }
00167         break;
00168     case WAIT_DICT:
00169         ASSERT(oldstate == ESTAB ||
00170                oldstate == WAIT_DICT ||
00171                oldstate == WAIT_RIB ||
00172                oldstate == OFFER ||
00173                oldstate == WAIT_INFO);
00174         reset_ribd();
00175         break;
00176     case WAIT_RIB:
00177         ASSERT(oldstate == WAIT_DICT ||
00178                oldstate == WAIT_RIB);
00179         break;
00180     case OFFER:
00181         ASSERT(oldstate == OFFER ||
00182                oldstate == WAIT_RIB);
00183         send_bundle_offer();
00184         break;
00185     case CREATE_DR:
00186         ASSERT(oldstate == ESTAB ||
00187                oldstate == WAIT_INFO ||
00188                oldstate == WAIT_DICT ||
00189                oldstate == SEND_DR ||
00190                oldstate == REQUEST);
00191         reset_ribd();
00192         break;
00193     case SEND_DR:
00194         ASSERT(oldstate == CREATE_DR ||
00195                oldstate == SEND_DR);
00196         break;
00197     case REQUEST:
00198         ASSERT(oldstate == REQUEST ||
00199                oldstate == SEND_DR);
00200         break;
00201     case WAIT_INFO:
00202         ASSERT(oldstate == REQUEST ||
00203                oldstate == OFFER);
00204         // On the first phase of INFO_EXCHANGE,
00205         // synsender_ is initiator and !synsender_ is
00206         // listener.  During this phase, immediately
00207         // switch roles and continue.
00208         if ((synsender_ && initiator_) ||
00209             (!synsender_ && !initiator_))
00210         {
00211             switch_info_role();
00212         }
00213         break;
00214     default:
00215         PANIC("Unknown Prophet state: %d", (int)new_state);
00216         break;
00217     }
00218 }
00219 
00220 void
00221 ProphetEncounter::reset_link()
00222 {
00223     log_debug("reset_link");
00224     neighbor_gone();
00225 }
00226 
00227 void
00228 ProphetEncounter::handle_bundle_received(Bundle* bundle)
00229 {
00230     BundleRef b("handle_bundle_received");
00231     b = bundle;
00232     log_debug("handle_bundle_received *%p",bundle);
00233 
00234     // Is this bundle destined for the attached peer?
00235     EndpointIDPattern route = Prophet::eid_to_route(next_hop_->remote_eid());
00236     if (route.match(b->dest_))
00237     {
00238         // short-circuit the protocol and forward to attached peer
00239         if (should_fwd(b.object()))
00240         {
00241             fwd_to_nexthop(b.object());
00242         }
00243     }
00244 
00245     if (get_state("handle_bundle_received") == REQUEST)
00246     {
00247         u_int32_t cts = b->creation_ts_.seconds_;
00248         u_int16_t sid = ribd_.find(Prophet::eid_to_routeid(b->dest_));
00249         requests_.remove_bundle(cts,sid);
00250 
00251         // no requests remaining means the end of bundle exchange
00252         // so signal as such with 0-sized bundle request
00253         if (requests_.size() == 0)
00254         {
00255             enqueue_bundle_tlv(requests_);
00256             send_prophet_tlv();
00257             set_state(WAIT_INFO);
00258         }
00259     }
00260 }
00261 
00262 void
00263 ProphetEncounter::receive_tlv(ProphetTLV* pt)
00264 {
00265     log_debug("receive_tlv");
00266     // alert thread to new bundle
00267     cmdqueue_.push_back(PEMsg(PEMSG_PROPHET_TLV_RECEIVED,pt));
00268 }
00269 
00270 void
00271 ProphetEncounter::neighbor_gone()
00272 {
00273     log_debug("neighbor_gone");
00274     // alert thread to status change
00275     cmdqueue_.push_back(PEMsg(PEMSG_NEIGHBOR_GONE,NULL));
00276 }
00277 
00278 bool
00279 ProphetEncounter::should_fwd(Bundle* b)
00280 {
00281     BundleRef bundle("should_fwd");
00282     ForwardingInfo info;
00283 
00284     bundle = b;
00285     bool found = bundle->fwdlog_.get_latest_entry(next_hop_,&info);
00286 
00287     if (found) {
00288         ASSERT(info.state_ != ForwardingInfo::NONE);
00289     } else {
00290         ASSERT(info.state_ == ForwardingInfo::NONE);
00291     }
00292 
00293     if (info.state_ == ForwardingInfo::TRANSMITTED ||
00294         info.state_ == ForwardingInfo::IN_FLIGHT)
00295     {
00296         log_debug("should_fwd bundle %d: "
00297                   "skip %s due to forwarding log entry %s",
00298                   bundle->bundleid_, next_hop_->name(),
00299                   ForwardingInfo::state_to_str(
00300                       static_cast<ForwardingInfo::state_t>(info.state_)));
00301         return false;
00302     }
00303 
00304     if (info.state_ == ForwardingInfo::TRANSMIT_FAILED) {
00305         log_debug("should_fwd bundle %d: "
00306                   "match %s: forwarding log entry %s TRANSMIT_FAILED %d",
00307                   bundle->bundleid_, next_hop_->name(),
00308                   ForwardingInfo::state_to_str(
00309                       static_cast<ForwardingInfo::state_t>(info.state_)),
00310                   bundle->bundleid_); 
00311     } else {
00312         log_debug("should_fwd bundle %d: "
00313                   "match %s: forwarding log entry %s",
00314                   bundle->bundleid_, next_hop_->name(),
00315                   ForwardingInfo::state_to_str(
00316                       static_cast<ForwardingInfo::state_t>(info.state_)));
00317     }
00318 
00319     return true;
00320 }
00321 
00322 void
00323 ProphetEncounter::fwd_to_nexthop(Bundle* b,bool add_front)
00324 {
00325     BundleRef bundle("fwd_to_nexthop");
00326     bundle = b;
00327     log_debug("fwd_to_nexthop *%p at %s",b,add_front?"front":"back");
00328 
00329     // ProphetEncounter only exists if link is open
00330     ASSERT(next_hop_->isopen());
00331 
00332     if(bundle!=NULL)
00333     {
00334         if(add_front)
00335             //XXX/wilson this is naive, if more than one
00336             // gets enqueued we're busted
00337             
00338             // give priority to Prophet control messages
00339             to_be_fwd_.push_front(b);
00340         else
00341             to_be_fwd_.push_back(b);
00342     }
00343 
00344     // fill the pipe with however many bundles are pending
00345     while (next_hop_->isbusy() == false &&
00346            to_be_fwd_.size() > 0)
00347     {
00348         BundleRef ref("ProphetEncounter fwd_to_nexthop");
00349         ref = to_be_fwd_.pop_front();
00350 
00351         if (ref.object() == NULL)
00352         {
00353             log_err("Unexpected NULL pointer in to_be_fwd_ list");
00354             continue;
00355         }
00356 
00357         Bundle* b = ref.object();
00358         log_debug("sending *%p to *%p", b, next_hop_);
00359 
00360         bool ok = oracle_->actions()->send_bundle(b,next_hop_,
00361                                                   ForwardingInfo::COPY_ACTION,
00362                                                   CustodyTimerSpec());
00363         ASSERTF(ok == true,"failed to send bundle");
00364 
00365         // update Prophet stats on this bundle
00366         oracle_->stats()->update_stats(b,remote_nodes_.p_value(b));
00367     } 
00368 
00369     oasys::ScopeLock l(to_be_fwd_.lock(),"fwd_to_nexthop");
00370     for(BundleList::iterator i = to_be_fwd_.begin();
00371             i != to_be_fwd_.end(); i++)
00372     {
00373         log_debug("can't forward *%p to *%p because link is busy",
00374                    *i, link);
00375     }
00376 }
00377 
00378 void 
00379 ProphetEncounter::handle_prophet_tlv(ProphetTLV* pt)
00380 {
00381     ASSERT(pt != NULL);
00382 
00383     data_rcvd_.get_time();
00384 
00385     oasys::StringBuffer buf;
00386     pt->dump(&buf);
00387     log_debug("handle_prophet_tlv (tid %u,%s,%zu entries)\n"
00388               "Inbound TLV\n\n%s\n",
00389               pt->transaction_id(),
00390               Prophet::result_to_str(pt->result()),
00391               pt->num_tlv(),buf.c_str());
00392 
00393     tid_ = pt->transaction_id();
00394 
00395     BaseTLV* tlv = NULL;
00396     bool ok = true;
00397 
00398     while (neighbor_gone_ == false &&
00399            (tlv = pt->get_tlv()) != NULL &&
00400            ok != false)
00401     {
00402         if (estab_ == false && tlv->typecode() != Prophet::HELLO_TLV)
00403         {
00404             delete tlv;
00405             handle_bad_protocol(pt->transaction_id());
00406             ok = false;
00407             break;
00408         }
00409     
00410         switch(tlv->typecode()) {
00411             case Prophet::HELLO_TLV:
00412                 ok = handle_hello_tlv((HelloTLV*)tlv,pt);
00413                 break;
00414             case Prophet::RIBD_TLV:
00415                 ok = handle_ribd_tlv((RIBDTLV*)tlv,pt);
00416                 break;
00417             case Prophet::RIB_TLV:
00418                 ok = handle_rib_tlv((RIBTLV*)tlv,pt);
00419                 break;
00420             case Prophet::BUNDLE_TLV:
00421                 ok = handle_bundle_tlv((BundleTLV*)tlv,pt);
00422                 break;
00423             case Prophet::UNKNOWN_TLV:
00424             case Prophet::ERROR_TLV:
00425             default:
00426                 PANIC("Unexpected TLV type received by ProphetEncounter: %d",
00427                       tlv->typecode());
00428         }
00429         delete tlv;
00430     }
00431     if (ok != true)
00432     {
00433         log_debug("killing off %zu unread TLVs",pt->list().size());
00434         // free up memory
00435         while ((tlv = pt->get_tlv()) != NULL)
00436             delete tlv;
00437     }
00438 }
00439 
00440 bool
00441 ProphetEncounter::handle_hello_tlv(HelloTLV* ht,ProphetTLV* pt)
00442 {
00443     ASSERT(ht != NULL);
00444     ASSERT(ht->typecode() == Prophet::HELLO_TLV);
00445     log_debug("handle_hello_tlv %s",Prophet::hf_to_str(ht->hf()));
00446 
00447     // Establish truth table based on section 5.2
00448     bool hello_a = (remote_instance_ == pt->sender_instance());
00449     bool hello_b = hello_a &&
00450                    (remote_addr_ == next_hop_->nexthop());
00451     bool hello_c = (local_instance_ == pt->receiver_instance());
00452 
00453     log_info("received HF %s in state %s",Prophet::hf_to_str(ht->hf()),
00454              state_to_str(get_state("handle_hello_tlv")));
00455 
00456     // negotiate to minimum timer (in 100ms units)
00457     u_int timeout = std::min(
00458                     (u_int)oracle_->params()->hello_interval_,
00459                     (u_int)ht->timer(),
00460                     std::less<u_int>());
00461     
00462     if ((timeout * 100) != timeout_) 
00463     {
00464         // timeout_ is in ms 
00465         timeout_ = timeout * 100;
00466     }
00467 
00468     if (ht->hf() == Prophet::RSTACK)
00469     {
00470         if (hello_a && hello_c && synsent_ == false)
00471         {
00472             handle_neighbor_gone();
00473             return false;
00474         }
00475         else
00476             return true; // discard, no further processing
00477     }
00478     
00479     prophet_state_t state = get_state("handle_hello_tlv");
00480     switch(state) {
00481 
00482         case WAIT_NB:
00483 
00484             if (ht->hf() == Prophet::SYN)
00485             {
00486                 update_peer_verifier(pt->sender_instance());
00487                 enqueue_hello(Prophet::SYNACK,
00488                               pt->transaction_id(),
00489                               Prophet::Success);
00490                 send_prophet_tlv();
00491                 set_state(SYNRCVD);
00492                 return true;
00493             }
00494             return false;
00495 
00496         case SYNSENT:
00497 
00498             if (ht->hf() == Prophet::SYNACK)
00499             {
00500                 if (hello_c)
00501                 {
00502                     update_peer_verifier(pt->sender_instance());
00503                     enqueue_hello(Prophet::ACK,
00504                                   pt->transaction_id(),
00505                                   Prophet::Success);
00506                     send_prophet_tlv();
00507                     set_state(ESTAB);
00508                 }
00509                 else
00510                 {
00511                     enqueue_hello(Prophet::RSTACK,
00512                                   pt->transaction_id(),
00513                                   Prophet::Failure);
00514                     send_prophet_tlv();
00515                     set_state(SYNSENT);
00516                 }
00517             }
00518             else
00519             if (ht->hf() == Prophet::SYN) 
00520             {
00521                 update_peer_verifier(pt->sender_instance());
00522                 enqueue_hello(Prophet::SYNACK,
00523                               pt->transaction_id(),
00524                               Prophet::Success);
00525                 send_prophet_tlv();
00526                 set_state(SYNRCVD);
00527             }
00528             else
00529             if (ht->hf() == Prophet::ACK)
00530             {
00531                 enqueue_hello(Prophet::RSTACK,
00532                               pt->transaction_id(),
00533                               Prophet::Failure);
00534                 send_prophet_tlv();
00535                 set_state(SYNSENT);
00536             }
00537 
00538             return true;
00539 
00540         case SYNRCVD:
00541 
00542             if (ht->hf() == Prophet::SYNACK)
00543             {
00544                 if (hello_c) 
00545                 {
00546                     update_peer_verifier(pt->sender_instance());
00547                     enqueue_hello(Prophet::ACK,
00548                                   pt->transaction_id(),
00549                                   Prophet::Success);
00550                     send_prophet_tlv();
00551                     set_state(ESTAB);
00552                 }
00553                 else
00554                 {
00555                     enqueue_hello(Prophet::RSTACK,
00556                                   pt->transaction_id(),
00557                                   Prophet::Failure);
00558                     send_prophet_tlv();
00559                     set_state(SYNRCVD);
00560                 }
00561             }
00562             else
00563             if (ht->hf() == Prophet::SYN)
00564             {
00565                 update_peer_verifier(pt->sender_instance());
00566                 enqueue_hello(Prophet::SYNACK,
00567                               pt->transaction_id(),
00568                               Prophet::Success);
00569                 send_prophet_tlv();
00570                 set_state(SYNRCVD);
00571             }
00572             else
00573             if (ht->hf() == Prophet::ACK)
00574             {
00575                 if (hello_b && hello_c) 
00576                 {
00577                     enqueue_hello(Prophet::ACK,
00578                                   pt->transaction_id(),
00579                                   Prophet::Success);
00580                     send_prophet_tlv();
00581                     set_state(ESTAB);
00582                 }
00583                 else
00584                 {
00585                     enqueue_hello(Prophet::RSTACK,
00586                                   pt->transaction_id(),
00587                                   Prophet::Failure);
00588                     send_prophet_tlv();
00589                     set_state(SYNRCVD);
00590                 }
00591             }
00592             
00593             return true;
00594 
00595         case ESTAB:
00596 
00597             if (ht->hf() == Prophet::SYN || ht->hf() == Prophet::SYNACK)
00598             {
00599                 // Section 5.2.1, Note 2: No more than two ACKs should be
00600                 // sent within any time period of length defined by the timer.
00601                 // Thus, one ACK MUST be sent every time the timer expires.
00602                 // In addition, one further ACK may be sent between timer
00603                 // expirations if the incoming message is a SYN or SYNACK.
00604                 // This additional ACK allows the Hello functions to reach
00605                 // synchronisation more quickly.
00606                 if (ack_count_ < 2)
00607                 {
00608                     ++ack_count_;
00609                     enqueue_hello(Prophet::ACK,
00610                                   pt->transaction_id(),
00611                                   Prophet::Success);
00612                     send_prophet_tlv();
00613                 }
00614             }
00615             else
00616             if (ht->hf() == Prophet::ACK)
00617             {
00618                 if (hello_b && hello_c)
00619                 {
00620                     // Section 5.2.1, Note 3: No more than one ACK should
00621                     // be sent within any time period of length defined by
00622                     // the timer.
00623                     if (ack_count_ < 1) 
00624                     {
00625                         ++ack_count_;
00626                         enqueue_hello(Prophet::ACK,
00627                                       pt->transaction_id(),
00628                                       Prophet::Success);
00629                         send_prophet_tlv();
00630                     }
00631                 }
00632                 else
00633                 {
00634                     enqueue_hello(Prophet::RSTACK,
00635                                   pt->transaction_id(),
00636                                   Prophet::Failure);
00637                     send_prophet_tlv();
00638                 }
00639             }
00640             return true;
00641 
00642         case WAIT_DICT:
00643         case WAIT_RIB:
00644         case OFFER:
00645 
00646             if (ht->hf() == Prophet::ACK)
00647             {
00648                 set_state(WAIT_DICT);
00649                 return true;
00650             }
00651             return false;
00652 
00653         case CREATE_DR:
00654         case SEND_DR:
00655 
00656             return false;
00657 
00658         case REQUEST:
00659 
00660             if (ht->hf() == Prophet::ACK)
00661             {
00662                 set_state(CREATE_DR);
00663                 return true;
00664             }
00665             return false;
00666 
00667         default:
00668             if (ht->hf() == Prophet::ACK)
00669             {
00670                 return true;
00671             }
00672             log_err("Unrecognized state %s and HF %d",
00673                     state_to_str(state),ht->hf());
00674             return false;
00675     }
00676 
00677     return false;
00678 }
00679 
00680 bool
00681 ProphetEncounter::handle_bad_protocol(u_int32_t tid)
00682 {
00683     log_debug("handle_bad_protocol");
00684 
00685     // Section 5.2, Note 1: No more than two SYN or SYNACK messages should
00686     // be sent within any time period of length defined by the timer.
00687     oasys::Time now;
00688     now.get_time();
00689     if ((now - data_sent_).in_milliseconds() < (timeout_ / 2))
00690     {
00691         log_debug("flow control engaged, skipping");
00692         return false;
00693     }
00694 
00695     prophet_state_t state = get_state("handle_bad_protocol");
00696     if (state == SYNSENT)
00697     {
00698         enqueue_hello(Prophet::SYN,
00699                       tid, Prophet::Failure);
00700         send_prophet_tlv();
00701     }
00702     else
00703     if (state == SYNRCVD)
00704     {
00705         enqueue_hello(Prophet::SYNACK,
00706                       tid, Prophet::Failure);
00707         send_prophet_tlv();
00708     }
00709     return false;
00710 }
00711 
00712 bool
00713 ProphetEncounter::handle_ribd_tlv(RIBDTLV* rt,ProphetTLV* pt)
00714 {
00715     log_debug("handle_ribd_tlv");
00716     (void)pt;
00717 
00718     prophet_state_t state = get_state("handle_ribd_tlv");
00719     if (state == WAIT_DICT ||
00720         state == WAIT_RIB)
00721     {
00722         ProphetDictionary remote = rt->ribd();
00723         log_debug("handle_ribd_tlv has %zu entries",remote.size());
00724         oasys::StringBuffer buf("handle_ribd_tlv\n");
00725         for(ProphetDictionary::const_iterator i = remote.begin();
00726             i != remote.end();
00727             i++)
00728         {
00729             u_int16_t sid = (*i).first;
00730             EndpointID eid = Prophet::eid_to_routeid((*i).second);
00731             ASSERTF(ribd_.assign(eid,sid) == true,
00732                     "failed to assign %d to %s",sid,eid.c_str());
00733         }
00734         ribd_.dump(&buf);
00735         log_debug("\n%s\n",buf.c_str());
00736         set_state(WAIT_RIB);
00737         return true;
00738     }
00739     else
00740     if (state == OFFER)
00741     {
00742         // resend bundle offer
00743         set_state(OFFER);
00744         return true;
00745     }
00746 
00747     return false;
00748 }
00749 
00750 bool
00751 ProphetEncounter::handle_rib_tlv(RIBTLV* rt,ProphetTLV* pt)
00752 {
00753     log_debug("handle_rib_tlv");
00754     (void)pt;
00755 
00756     EndpointID remote = Prophet::eid_to_routeid(next_hop_->remote_eid());
00757     prophet_state_t state = get_state("handle_rib_tlv");
00758     if (state == WAIT_RIB)
00759     {
00760         RIBTLV::List rib = rt->nodes();
00761         log_debug("handle_rib_tlv has %zu entries",rib.size());
00762 
00763         double peer_pvalue = 0.0;
00764 
00765         // look up previous information on peer
00766         ProphetNode* node = oracle_->nodes()->find(remote);
00767 
00768         // else create new node
00769         if (node == NULL)
00770         {
00771             node = new ProphetNode(oracle_->params());
00772             node->set_eid(remote);
00773         }
00774 
00775         node->set_relay(rt->relay_node());
00776         node->set_custody(rt->custody_node());
00777         node->set_internet_gw(rt->internet_gateway());
00778         
00779         // apply direct contact algorithm
00780         node->update_pvalue();
00781         peer_pvalue = node->p_value();
00782 
00783         // update node table
00784         oracle_->nodes()->update(node);
00785 
00786         // now iterate over the rest of the RIB
00787         for(RIBTLV::iterator i = rib.begin();
00788             i != rib.end();
00789             i++)
00790         {
00791             RIBNode* rib = (*i);
00792             u_int16_t sid = rib->sid_;
00793             EndpointID eid = Prophet::eid_to_routeid(ribd_.find(sid));
00794             ASSERT(eid.equals(EndpointID::NULL_EID()) == false);
00795             node = new ProphetNode(oracle_->params());
00796             node->set_eid(eid);
00797             node->set_relay(rib->relay());
00798             node->set_custody(rib->custody());
00799             node->set_internet_gw(rib->internet_gw());
00800 
00801             // apply transitive contact algorithm
00802             node->update_transitive(peer_pvalue,rib->p_value());
00803 
00804             // update node table
00805             oracle_->nodes()->update(node);
00806 
00807             // keep mirror copy of remote's RIB
00808             ProphetNode* rn = new ProphetNode(*node);
00809             rn->set_pvalue(rib->p_value());
00810             remote_nodes_.update(rn);
00811         }
00812 
00813         set_state(OFFER);
00814         
00815         return true;
00816     }
00817     
00818     return false;
00819 }
00820 
00821 bool
00822 ProphetEncounter::handle_bundle_tlv(BundleTLV* bt,ProphetTLV* pt)
00823 {
00824     log_debug("handle_bundle_tlv");
00825 
00826     prophet_state_t state = get_state("handle_bundle_tlv");
00827     BundleList list("handle_bundle_tlv");
00828     if (state == WAIT_RIB)
00829     {
00830         enqueue_hello(Prophet::ACK,
00831                       pt->transaction_id(),
00832                       Prophet::Failure);
00833         send_prophet_tlv();
00834         set_state(WAIT_DICT);
00835         return false;
00836     }
00837     else
00838     if (state == OFFER)
00839     {
00840         // grab a list of Bundles from main Prophet store
00841         oasys::ScopeLock l(oracle_->bundles()->lock(),
00842                            "ProphetEncounter::handle_bundle_tlv");
00843         oracle_->bundles()->bundle_list(list);
00844 
00845         // pull in the bundle request from this TLV
00846         size_t num = bt->list().size();
00847         log_debug("handle_bundle_tlv(OFFER) received list of %zu elements",
00848                   num);
00849 
00850         // list size 0 has special meaning
00851         if (num == 0)
00852         {
00853             set_state(WAIT_INFO);
00854         }
00855         else
00856         if (list.size() > 0)
00857         {
00858             oasys::ScopeLock l(requests_.lock(),"handle_bundle_tlv");
00859             requests_ = bt->list();
00860             ASSERT(requests_.type() == BundleOffer::RESPONSE);
00861 
00862             // track which items to delete but delay until after loop
00863             std::vector<std::pair<u_int32_t,u_int16_t> > to_delete;
00864             for (BundleOfferList::iterator i = requests_.begin();
00865                 i != requests_.end(); i++)
00866             {
00867                 BundleOffer* bo = (*i);
00868                 ASSERT (bo != NULL);
00869                 u_int32_t cts = (*i)->creation_ts();
00870                 u_int16_t sid = (*i)->sid();
00871                 EndpointID eid = ribd_.find(sid);
00872 
00873                 ASSERTF(eid.equals(EndpointID::NULL_EID()) == false,
00874                         "null eid found for sid %d",sid);
00875 
00876                 // find any Bundles with destination that matches
00877                 // this routeid and with creation ts that matches cts
00878                 BundleRef b("handle_bundle_tlv");
00879                 b = ProphetBundleList::find(list,eid,cts);
00880 
00881                 if (b.object() != NULL &&
00882                     // don't send ACK'd bundles
00883                     !oracle_->acks()->is_ackd(b.object()))
00884                 {
00885 
00886                     if(should_fwd(b.object()))
00887                     {
00888                         // enqueue to send over the link to peer
00889                         fwd_to_nexthop(b.object());
00890 
00891                         // remove from list so as only to forward once
00892                         to_delete.push_back(
00893                                 std::pair<u_int32_t,u_int16_t>(cts,sid));
00894                     }
00895                 }
00896             }
00897 
00898             for(std::vector<std::pair<u_int32_t,u_int16_t> >::iterator
00899                     i = to_delete.begin(); i != to_delete.end(); i++)
00900             {
00901                 requests_.remove_bundle((*i).first,(*i).second);
00902             }
00903 
00904             set_state(OFFER);
00905         }
00906         return true;
00907     }
00908     else
00909     if (state == SEND_DR)
00910     {
00911         // grab list of bundles
00912         oasys::ScopeLock l(oracle_->bundles()->lock(),
00913                            "ProphetEncounter::handle_bundle_tlv");
00914         oracle_->bundles()->bundle_list(list);
00915 
00916         // read out the Bundle offer from the TLV
00917         ASSERT(offers_.type() == BundleOffer::OFFER);
00918         offers_ = bt->list();
00919         log_debug("handle_bundle_tlv(SEND_DR) received list of %zu elements",
00920                    requests_.size());
00921 
00922         // prepare a new request
00923         requests_.clear();
00924         requests_.set_type(BundleOffer::RESPONSE);
00925 
00926         oasys::ScopeLock l2(offers_.lock(),"handle_bundle_tlv");
00927         for (BundleOfferList::iterator i = offers_.begin();
00928              i != offers_.end();
00929              i++)
00930         {
00931             u_int32_t cts = (*i)->creation_ts();
00932             u_int16_t sid = (*i)->sid();
00933             EndpointID eid = ribd_.find(sid);
00934 
00935             ASSERTF(eid.equals(EndpointID::NULL_EID()) == false,
00936                     "failed on eid lookup for sid %d",sid);
00937 
00938             if ((*i)->ack())
00939             {
00940                 u_int32_t ets = 0;
00941                 // must delete any ACK'd bundles
00942                 BundleRef b("handle_bundle_tlv");
00943                 b = ProphetBundleList::find(list,eid,cts);
00944                 if (b.object() != NULL)
00945                 {
00946                     ets = b->expiration_;
00947                     oracle_->bundles()->drop_bundle(b.object());
00948 
00949                     // list is now invalid, reload!
00950                     list.clear();
00951                     oracle_->bundles()->bundle_list(list);
00952                 }
00953                 
00954                 // preserve this ACK for future encounters
00955                 oracle_->acks()->insert(eid,cts,ets);
00956             }
00957             else
00958             // don't request if I already have it!
00959             if (ProphetBundleList::find(list,eid,cts) == NULL)
00960             {
00961                 bool accept = true;
00962                 //XXX/wilson
00963                 // need to to something intelligent here w.r.t. custody
00964                 bool custody = false;
00965                 requests_.add_offer(cts,sid,custody,accept,false);
00966                 log_debug("requesting bundle, cts %d, sid %d",cts,sid);
00967             }
00968         }
00969         // request in order of most likely to deliver
00970         requests_.sort(&ribd_,oracle_->nodes(),synsender_ ? 0 : 1);
00971         ASSERT(requests_.type() == BundleOffer::RESPONSE);
00972         enqueue_bundle_tlv(requests_,
00973                            pt->transaction_id(),
00974                            Prophet::Success);
00975         send_prophet_tlv();
00976         set_state(REQUEST);
00977         if(requests_.size() == 0)
00978         {
00979             set_state(WAIT_INFO);
00980         }
00981     }
00982     return false;
00983 }
00984 
00985 void
00986 ProphetEncounter::handle_neighbor_gone()
00987 {
00988     log_debug("handle_neighbor_gone");
00989     neighbor_gone_ = true;
00990     log_info("*%p - %u received NEIGHBOR_GONE signal",
00991              next_hop_,local_instance_);
00992 }
00993 
00994 void
00995 ProphetEncounter::handle_poll_timeout()
00996 {
00997     log_debug("handle_poll_timeout");
00998 
00999     prophet_state_t state = get_state("handle_poll_timeout");
01000     switch(state) {
01001         case SYNSENT:
01002         case WAIT_NB:
01003             if (synsender_ == true)
01004             {
01005                 enqueue_hello(Prophet::SYN);
01006                 send_prophet_tlv();
01007             }
01008             break;
01009         case SYNRCVD:
01010             enqueue_hello(Prophet::SYNACK,
01011                           tid_,
01012                           Prophet::Success);
01013             send_prophet_tlv();
01014             break;
01015         case ESTAB:
01016             // Section 5.2.1, Note 2: No more than two ACKs should be
01017             // sent within any time period of length defined by the timer.
01018             // Thus, one ACK MUST be sent every time the timer expires.
01019             // In addition, one further ACK may be sent between timer
01020             // expirations if the incoming message is a SYN or SYNACK.
01021             // This additional ACK allows the Hello functions to reach
01022             // synchronisation more quickly.
01023             if (ack_count_ >= 2)
01024             {
01025                 ack_count_ = 0;
01026             }
01027             else 
01028             {
01029                 ++ack_count_;
01030                 enqueue_hello(Prophet::ACK);
01031                 send_prophet_tlv();
01032             }
01033             break;
01034         case WAIT_DICT:
01035             enqueue_hello(Prophet::ACK,
01036                           tid_,
01037                           Prophet::Success);
01038             send_prophet_tlv();
01039             break;
01040         case WAIT_RIB:
01041             enqueue_hello(Prophet::ACK,
01042                           tid_,
01043                           Prophet::Success);
01044             send_prophet_tlv();
01045             set_state(WAIT_DICT);
01046             break;
01047         case CREATE_DR:
01048             send_dictionary();
01049             set_state(SEND_DR);
01050             break;
01051         case SEND_DR:
01052             send_dictionary();
01053             break;
01054         case REQUEST:
01055             if(requests_.size() == 0)
01056             {
01057                 set_state(WAIT_INFO);
01058             }
01059             else
01060             {
01061                 enqueue_bundle_tlv(requests_,
01062                                    tid_,
01063                                    Prophet::Success);
01064                 send_prophet_tlv();
01065             }
01066             break;
01067         case OFFER:
01068             send_bundle_offer();
01069             break;
01070         case WAIT_INFO:
01071             // After switching from 1st phase (see set_state), wait for
01072             // 1/2 hello_dead before switching phases
01073             {
01074                 oasys::Time now;
01075                 now.get_time();
01076 
01077                 u_int timeout = oracle_->params()->hello_dead_ * timeout_ / 2;
01078                 u_int diff = (now - data_sent_).in_milliseconds();
01079                 if ( diff <= timeout )
01080                 {
01081                     log_debug("wait_info: timediff %u timeout %u",
01082                               diff,timeout);
01083                 }
01084                 else
01085                 {
01086                     switch_info_role();
01087                 }
01088             }
01089             break;
01090         default:
01091             break;
01092     }
01093 }
01094 
01095 void
01096 ProphetEncounter::reset_ribd()
01097 {
01098     EndpointID local(BundleDaemon::instance()->local_eid()),
01099                remote(next_hop_->remote_eid());
01100     ribd_.clear();
01101     if (synsender_ == true)
01102     { 
01103         ASSERT(ribd_.assign(local,0));
01104         ASSERT(ribd_.assign(remote,1));
01105     }
01106     else
01107     {
01108         ASSERT(ribd_.assign(local,1));
01109         ASSERT(ribd_.assign(remote,0));
01110     }
01111 }
01112 
01113 void
01114 ProphetEncounter::switch_info_role()
01115 {
01116     ASSERT(get_state("switch_info_role") == WAIT_INFO);
01117     if (synsender_ == true)
01118     {
01119         if (initiator_ == true)
01120         {
01121             initiator_ = false;
01122             set_state(WAIT_DICT);
01123         }
01124         else // initiator_ == false
01125         {
01126             initiator_ = true;
01127             set_state(CREATE_DR);
01128         }
01129     }
01130     else // synsender_ == false
01131     {
01132         if (initiator_ == false)
01133         {
01134             initiator_ = true;
01135             set_state(CREATE_DR);
01136         }
01137         else // initiator_ == true
01138         {
01139             initiator_ = false;
01140             set_state(WAIT_DICT);
01141         }
01142     }
01143 }
01144 
01145 void
01146 ProphetEncounter::dump_state(oasys::StringBuffer* buf)
01147 {
01148     buf->appendf("%05d  %10s  %s\n",remote_instance_,
01149                  state_to_str(state_),
01150                  next_hop_->remote_eid().c_str());
01151 }
01152 
01153 void
01154 ProphetEncounter::send_bundle_offer()
01155 {
01156     log_debug("send_bundle_offer");
01157     ASSERT(oracle_ != NULL);
01158     ASSERT(oracle_->bundles() != NULL);
01159 
01160     prophet_state_t state = get_state("send_bundle_offer");
01161     ASSERT( state == WAIT_RIB ||
01162             state == OFFER );
01163 
01164     bool update_dictionary = false;
01165 
01166     // reset to sanity
01167     offers_.set_type(BundleOffer::OFFER);
01168 
01169     // Grab a list of Bundles from queueing policy enforcer
01170     oasys::ScopeLock l(oracle_->bundles()->lock(),
01171                        "ProphetEncounter::send_bundle_offer");
01172     BundleList list("send_bundle_offer");
01173     oracle_->bundles()->bundle_list(list);
01174 
01175     if (list.size() > 0)
01176     {
01177         // Create ordering based on priority_queue using forwarding strategy 
01178         FwdStrategy* fs = FwdStrategy::strategy(
01179                                 oracle_->params()->fs_,
01180                                 oracle_->nodes(),
01181                                 &remote_nodes_);
01182 
01183         // Create strategy-based decider for whether to forward a bundle
01184         ProphetDecider* d = ProphetDecider::decider(
01185                                 oracle_->params()->fs_,
01186                                 oracle_->nodes(),
01187                                 &remote_nodes_,
01188                                 next_hop_,
01189                                 oracle_->params()->max_forward_,
01190                                 oracle_->stats());
01191 
01192         // Combine into priority_queue for bundle offer ordering
01193         ProphetBundleOffer offer(list,fs,d);
01194 
01195         while(!offer.empty())
01196         {
01197             BundleRef b("send_bundle_offer");
01198             b = offer.top();
01199             offer.pop();
01200             if (b.object() == NULL)
01201                 continue;
01202 
01203             EndpointID eid = Prophet::eid_to_routeid(b->dest_);
01204             u_int16_t sid = ribd_.find(eid);
01205 
01206             // either not found or initiator's EID
01207             if (sid == 0) 
01208             {
01209                 if (ribd_.is_assigned(eid) == true &&
01210                     synsender_ == true)
01211                 {
01212                     // local destination, don't offer
01213                     continue;
01214                 }
01215 
01216                 sid = ribd_.insert(eid);
01217                 update_dictionary = true;
01218             }
01219             else
01220             if (sid == 1 && synsender_ == false)
01221             {
01222                 // local destination, don't offer
01223                 continue;
01224             }
01225 
01226             // add bundle listing to the offer
01227             offers_.add_offer(b.object(),sid);
01228             log_debug("offering bundle *%p (sid %d)",b.object(),sid);
01229         }
01230 
01231         delete fs;
01232         delete d;
01233     }
01234 
01235     // Now append all known ACKs
01236     PointerList<ProphetAck> acklist;
01237     oracle_->acks()->fetch(EndpointIDPattern("dtn://*"),acklist);
01238 
01239     for (PointerList<ProphetAck>::iterator i = acklist.begin();
01240          i != acklist.end();
01241          i++)
01242     {
01243         ProphetAck* pa = *i;
01244 
01245         EndpointID eid = Prophet::eid_to_routeid(pa->dest_id_);
01246         u_int16_t sid = ribd_.find(eid);
01247 
01248         if (sid == 0)
01249         {
01250             if (ribd_.is_assigned(eid) == false)
01251             {
01252                 sid = ribd_.insert(eid);
01253             }
01254 
01255             if (sid != 0) 
01256             {
01257                 update_dictionary = true;
01258             }
01259         }
01260         
01261         // add ACK to the list
01262         offers_.add_offer(pa->cts_,sid,false,false,true);
01263         log_debug("appending ACK to bundle offer: cts %d, sid %d",
01264                   pa->cts_,sid);
01265     }
01266 
01267     if (update_dictionary)
01268         send_dictionary();
01269 
01270     enqueue_bundle_tlv(offers_);
01271     send_prophet_tlv();
01272 }
01273 
01274 void
01275 ProphetEncounter::send_dictionary()
01276 {
01277     log_debug("send_dictionary");
01278     ASSERT(oracle_ != NULL);
01279     ASSERT(oracle_->nodes() != NULL);
01280     ASSERT(next_hop_ != NULL);
01281 
01282     // list of all known ProphetNodes, indexed by peer endpoint id
01283     ProphetNodeList nodes;
01284     // list of predictability values for each node, indexed by string id
01285     RIBTLV::List rib;
01286     // EIDs for peer endpoints in this exchange
01287     EndpointID local(BundleDaemon::instance()->local_eid()),
01288                remote(next_hop_->remote_eid());
01289 
01290     // ASSERT protocol invariants
01291     if (synsender_ == true)
01292     {
01293         ASSERT(ribd_.find(local) == 0);
01294         ASSERT(ribd_.find(remote) == 1);
01295     }
01296     else
01297     {
01298         ASSERT(ribd_.find(remote) == 0);
01299         ASSERT(ribd_.find(local) == 1);
01300     }
01301 
01302     // ask ProphetController for snapshot of master node list
01303     oracle_->nodes()->dump_table(nodes);
01304     
01305     // walk over the master node list and create SIDs for each
01306     for(ProphetNodeList::iterator i = nodes.begin();
01307         i != nodes.end();
01308         i++)
01309     {
01310         ProphetNode* node = *i;
01311 
01312         u_int16_t sid;
01313 
01314         if (ribd_.is_assigned(node->remote_eid()))
01315         {
01316             sid = ribd_.find(node->remote_eid());
01317         }
01318         else
01319         {
01320             sid = ribd_.insert(node->remote_eid());
01321         }
01322 
01323         ASSERT(ribd_.is_assigned(node->remote_eid()));
01324 
01325         if (sid == 0 || sid == 1)
01326         {
01327             // implicit peer endpoints
01328             continue;
01329         }
01330 
01331         ASSERT(sid != 0);
01332         rib.push_back(new RIBNode(node,sid));
01333     }
01334 
01335     // ask ProphetController for list of active bundles
01336     oasys::ScopeLock l(oracle_->bundles()->lock(),
01337                        "ProphetEncounter::send_dictionary");
01338     BundleList list("send_dictionary");
01339     oracle_->bundles()->bundle_list(list);
01340     oasys::ScopeLock bl(list.lock(),"send_dictionary");
01341     for (BundleList::iterator i = list.begin();
01342          i != list.end();
01343          i++)
01344     {
01345         u_int16_t sid;
01346         EndpointID routeid = Prophet::eid_to_routeid((*i)->dest_);
01347         if (ribd_.is_assigned(routeid))
01348             continue;
01349 
01350         // shouldn't get here unless node got truncated
01351         // (p-value less than epsilon)
01352         ASSERT((sid = ribd_.insert(routeid)) != 0);
01353         ProphetNode* node = new ProphetNode(oracle_->params());
01354         node->set_eid(routeid);
01355         oracle_->nodes()->update(node);
01356         rib.push_back(new RIBNode(node,sid));
01357     }
01358 
01359     u_int32_t tid = Prophet::UniqueID::tid();
01360     enqueue_ribd(ribd_,tid,Prophet::NoSuccessAck);
01361     enqueue_rib(rib,tid,Prophet::NoSuccessAck);
01362     send_prophet_tlv();
01363 }
01364 
01365 ProphetTLV*
01366 ProphetEncounter::outbound_tlv(u_int32_t tid,
01367                                Prophet::header_result_t result)
01368 {
01369     oasys::ScopeLock l(otlv_lock_,"outbound_tlv");
01370     if (outbound_tlv_ == NULL)
01371     {
01372         outbound_tlv_ = new ProphetTLV(result,
01373                               local_instance_,
01374                               remote_instance_,
01375                               tid);
01376     }
01377     else
01378     {
01379         if (outbound_tlv_->transaction_id() != tid)
01380         {
01381             log_err("mismatched tid: TLV %u tid %u",
01382                     outbound_tlv_->transaction_id(),
01383                     tid);
01384             return NULL;
01385         }
01386 
01387         if (outbound_tlv_->result() != result)
01388         {
01389             log_err("mismatched result field: TLV %s result %s",
01390                     Prophet::result_to_str(outbound_tlv_->result()),
01391                     Prophet::result_to_str(result));
01392             return NULL;
01393         }
01394     }
01395     return outbound_tlv_;
01396 }
01397 
01398 bool
01399 ProphetEncounter::send_prophet_tlv()
01400 {
01401     oasys::ScopeLock l(otlv_lock_,"send_prophet_tlv");
01402     if (neighbor_gone_ == true) return false;
01403     if (outbound_tlv_ == NULL) return false;
01404 
01405     ASSERT(outbound_tlv_->num_tlv() > 0);
01406 
01407     bool retval = false;
01408     // encapsulate ProphetTLV into Bundle and queue up
01409     BundleRef b("ProphetEncounter send_prophet_tlv");
01410     b = new Bundle();
01411     if (outbound_tlv_->create_bundle(b.object(),
01412                           BundleDaemon::instance()->local_eid(),
01413                           next_hop_->remote_eid()))
01414     {
01415         oasys::StringBuffer buf;
01416         outbound_tlv_->dump(&buf);
01417         log_debug("Outbound TLV\n"
01418                   "\n%s\n",buf.c_str());
01419 
01420         // enqueue before non-control bundles
01421         fwd_to_nexthop(b.object(),true);
01422 
01423         // update timestamp
01424         data_sent_.get_time();
01425 
01426         retval = true;
01427     }
01428     else
01429     {
01430         log_err("Failed to write out ProphetTLV");
01431         retval = false;
01432     }
01433 
01434     delete outbound_tlv_;
01435     outbound_tlv_ = NULL;
01436     return retval;
01437 }
01438 
01439 void
01440 ProphetEncounter::enqueue_hello(Prophet::hello_hf_t hf,
01441                                 u_int32_t tid,
01442                                 Prophet::header_result_t result)
01443 {
01444     log_debug("enqueue_hello %s %u %s",
01445                Prophet::hf_to_str(hf), tid,
01446                Prophet::result_to_str(result));
01447 
01448     oasys::ScopeLock l(otlv_lock_,"enqueue_hello");
01449     ProphetTLV* tlv = outbound_tlv(tid,result);
01450     HelloTLV *ht = new HelloTLV(hf,
01451                                 oracle_->params()->hello_interval_,
01452                                 BundleDaemon::instance()->local_eid(),
01453                                 logpath_);
01454 
01455     ASSERT(tlv != NULL);
01456     tlv->add_tlv(ht);
01457 }
01458 
01459 void
01460 ProphetEncounter::enqueue_ribd(const ProphetDictionary& ribd,
01461                                u_int32_t tid,
01462                                Prophet::header_result_t result)
01463 {
01464     log_debug("enqueue_ribd (%zu entries) %u %s",
01465                ribd.size(),tid,
01466                Prophet::result_to_str(result));
01467 
01468     oasys::ScopeLock l(otlv_lock_,"enqueue_ribd");
01469     ProphetTLV* tlv = outbound_tlv(tid,result);
01470     RIBDTLV *rt = new RIBDTLV(ribd,logpath_);
01471 
01472     ASSERT(tlv != NULL);
01473     tlv->add_tlv(rt);
01474 }
01475 
01476 void
01477 ProphetEncounter::enqueue_rib(const RIBTLV::List& nodes,
01478                               u_int32_t tid,
01479                               Prophet::header_result_t result)
01480 {
01481     log_debug("enqueue_rib (%zu entries)",nodes.size());
01482 
01483     oasys::ScopeLock l(otlv_lock_,"enqueue_rib");
01484     ProphetTLV* tlv = outbound_tlv(tid,result);
01485     RIBTLV *rt = new RIBTLV(nodes,
01486                             oracle_->params()->relay_node_,
01487                             oracle_->params()->custody_node_,
01488                             oracle_->params()->internet_gw_,
01489                             logpath_);
01490 
01491     ASSERT(tlv != NULL);
01492     tlv->add_tlv(rt);
01493 }
01494 
01495 void
01496 ProphetEncounter::enqueue_bundle_tlv(const BundleOfferList& bundles,
01497                                      u_int32_t tid,
01498                                      Prophet::header_result_t result)
01499 {
01500     log_debug("enqueue_bundle_tlv (%zu entries)",bundles.size());
01501 
01502     oasys::ScopeLock l(otlv_lock_,"enqueue_bundle_tlv");
01503     ProphetTLV* tlv = outbound_tlv(tid,result);
01504     BundleTLV* bt = new BundleTLV(bundles,logpath_);
01505 
01506     ASSERT(tlv != NULL);
01507     tlv->add_tlv(bt);
01508 }
01509 
01510 void
01511 ProphetEncounter::process_command()
01512 {
01513     PEMsg msg;
01514     bool ok = cmdqueue_.try_pop(&msg);
01515 
01516     // shouldn't get here unless a message is waiting
01517     ASSERT( ok == true );
01518 
01519     // dispatch command
01520     switch(msg.type_) {
01521         case PEMSG_PROPHET_TLV_RECEIVED:
01522             log_debug("processing PEMSG_PROPHET_TLV_RECEIVED");
01523             handle_prophet_tlv(msg.tlv_);
01524             delete msg.tlv_;
01525             break;
01526         case PEMSG_NEIGHBOR_GONE:
01527             log_debug("processing PEMSG_NEIGHBOR_GONE");
01528             handle_neighbor_gone();
01529             break;
01530         case PEMSG_HELLO_INTERVAL_CHANGED:
01531             log_debug("processing PEMSG_HELLO_INTERVAL_CHANGED");
01532             handle_hello_interval_changed();
01533             break;
01534         default:
01535             PANIC("invalid PEMsg typecode %d",msg.type_);
01536     }
01537 }
01538 
01539 void
01540 ProphetEncounter::handle_hello_interval_changed() {
01541     timeout_ = oracle_->params()->hello_interval_ * 100;
01542 }
01543 
01544 void
01545 ProphetEncounter::run() {
01546 
01547     ASSERT(oracle_ != NULL); 
01548     EndpointID local(BundleDaemon::instance()->local_eid()),
01549                remote(next_hop_->remote_eid());
01550     
01551     //XXX protocol seems broken, need a tie-breaker 
01552     // tie-breaker for Hello sequence
01553     synsender_ = (local.str() < remote.str());
01554 
01555     // who starts the oscillation for Information Exchange
01556     // (this toggles back and forth for the duration of the peering)
01557     initiator_ = synsender_;
01558 
01559     log_debug("synsender_ == %s", synsender_ ? "true" : "false");
01560 
01561     remote_nodes_.clear();
01562     if (synsender_ == true)
01563     { 
01564         enqueue_hello(Prophet::SYN);
01565         send_prophet_tlv();
01566         set_state(SYNSENT);
01567     }
01568 
01569     /*
01570        ProphetEncounter lives for the duration of an encounter between
01571        two Prophet nodes.  First the Hello sequence is completed, then
01572        the Information Exchange phase.  As soon as inactivity lasts 
01573        beyond HELLO_DEAD*HELLO_INTERVAL seconds, this thread goes away
01574        and all its state with it.  The RIB exchange persists in that 
01575        ProphetController keeps a master list of Prophet nodes. 
01576     */
01577 
01578     u_int timeout = timeout_;
01579     while (neighbor_gone_ == false) {
01580 
01581         if (cmdqueue_.size() != 0)
01582         {
01583             process_command();
01584             continue;
01585         }
01586 
01587         // mix up jitter on timeout to +/- 5%
01588         // so much for limiting FP math eh?
01589         int r = oasys::Random::rand(10);
01590         double ratio = (double) (10 - r) / (double) 100;
01591         timeout = (int) ((double) timeout_ * (1.05 - ratio));
01592         log_debug("poll timeout = %d", timeout);
01593 
01594         short revents = 0;
01595         int cc = oasys::IO::poll_single(cmdqueue_.read_fd(),
01596                                         POLL_IN,&revents,timeout);
01597 
01598         if (neighbor_gone_ == true) break;
01599         if (cc == oasys::IOTIMEOUT)
01600         {
01601             handle_poll_timeout();
01602         }
01603         else if (cc > 0)
01604         {
01605             // flip back 'round to process_command();
01606             continue;
01607         }
01608         else
01609         {
01610             log_err("unexpected return on poll: %d",cc);
01611             handle_neighbor_gone();
01612             break;
01613         } 
01614 
01615         oasys::Time now;
01616         now.get_time();
01617         if ((now-data_rcvd_).in_milliseconds() >= (oracle_->params()->hello_dead_ * timeout_)) {
01618             log_err("%d silent Hello intervals, giving up",oracle_->params()->hello_dead_);
01619             break;
01620         }
01621     }
01622 
01623     ProphetController::instance()->unreg(this);
01624 }
01625 
01626 } // namespace dtn

Generated on Sat Sep 8 08:36:17 2007 for DTN Reference Implementation by  doxygen 1.5.3