00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include <netinet/in.h>
00018 #include "bundling/Bundle.h"
00019 #include "bundling/BundleRef.h"
00020 #include "bundling/BundleList.h"
00021 #include "bundling/BundleDaemon.h"
00022 #include "ProphetEncounter.h"
00023 #include "ProphetController.h"
00024 #include <oasys/thread/Lock.h>
00025 #include <oasys/util/Random.h>
00026 #include <oasys/util/ScratchBuffer.h>
00027
00028 namespace dtn {
00029
00030 ProphetEncounter::ProphetEncounter(Link* nexthop,
00031 ProphetOracle* oracle)
00032 : oasys::Thread("ProphetEncounter",oasys::Thread::DELETE_ON_EXIT),
00033 oasys::Logger("ProphetEncounter","/dtn/route"),
00034 oracle_(oracle),
00035 remote_instance_(0),
00036 local_instance_(Prophet::UniqueID::instance()->instance_id()),
00037 tid_(0),
00038 timeout_(0),
00039 next_hop_(nexthop),
00040 synsender_(false),initiator_(false),
00041 synsent_(false),synrcvd_(false),
00042 estab_(false),
00043 neighbor_gone_(false),
00044 state_(WAIT_NB),
00045 cmdqueue_("/dtn/route"),
00046 to_be_fwd_("ProphetEncounter to be forwarded"),
00047 outbound_tlv_(NULL),
00048 state_lock_(new oasys::SpinLock()),
00049 otlv_lock_(new oasys::SpinLock())
00050 {
00051
00052 ASSERT(oracle != NULL);
00053 ASSERT(local_instance_ != 0);
00054 ASSERT(nexthop != NULL);
00055 ASSERT(nexthop->local() != "");
00056 ASSERT(!nexthop->remote_eid().equals(EndpointID::NULL_EID()));
00057
00058 logpath_appendf("/encounter-%d",local_instance_);
00059
00060
00061 ASSERT(oracle_->params() != NULL);
00062 ASSERT(oracle_->bundles() != NULL);
00063 ASSERT(oracle_->nodes() != NULL);
00064 ASSERT(oracle_->actions() != NULL);
00065 ASSERT(oracle_->acks() != NULL);
00066 ASSERT(oracle_->stats() != NULL);
00067
00068
00069
00070 timeout_ = oracle->params()->hello_interval_ * 100;
00071
00072
00073 data_sent_.get_time();
00074 data_rcvd_.get_time();
00075
00076
00077 offers_.set_type(BundleOffer::OFFER);
00078 requests_.set_type(BundleOffer::RESPONSE);
00079
00080 ack_count_ = 0;
00081 }
00082
00083 ProphetEncounter::~ProphetEncounter() {
00084
00085 while(cmdqueue_.size() != 0)
00086 {
00087 ProphetEncounter::PEMsg pm;
00088 ASSERT(cmdqueue_.try_pop(&pm));
00089 if(pm.tlv_ != NULL)
00090 delete pm.tlv_;
00091 }
00092 oasys::ScopeLockIf l(otlv_lock_,"destructor",outbound_tlv_ != NULL);
00093 if (outbound_tlv_ != NULL)
00094 {
00095 delete outbound_tlv_;
00096 outbound_tlv_ = NULL;
00097 }
00098 l.unlock();
00099 delete state_lock_;
00100 delete otlv_lock_;
00101 }
00102
00103 bool
00104 ProphetEncounter::operator< (const ProphetEncounter& p) const
00105 {
00106 return local_instance_ < p.local_instance_;
00107 }
00108
00109 bool
00110 ProphetEncounter::operator< (u_int16_t inst) const
00111 {
00112 return local_instance_ < inst;
00113 }
00114
00115 ProphetEncounter::prophet_state_t
00116 ProphetEncounter::get_state(const char *where)
00117 {
00118 oasys::ScopeLock l(state_lock_, where);
00119 log_debug("get_state(%s) == %s",where,state_to_str(state_));
00120 return state_;
00121 }
00122
00123 void
00124 ProphetEncounter::set_state(prophet_state_t new_state)
00125 {
00126 oasys::ScopeLock l(state_lock_,"ProphetEncounter::set_state");
00127 log_debug("set_state from %s to %s",
00128 state_to_str(state_),
00129 state_to_str(new_state));
00130
00131 prophet_state_t oldstate = state_;
00132 state_ = new_state;
00133
00134 switch(new_state) {
00135 case WAIT_NB:
00136
00137 synsent_ = false;
00138 synrcvd_ = false;
00139 estab_ = false;
00140 ack_count_ = 0;
00141 timeout_ = oracle_->params()->hello_interval_ * 100;
00142 break;
00143 case SYNSENT:
00144 ASSERT(oldstate == WAIT_NB ||
00145 oldstate == SYNSENT);
00146 synsent_ = true;
00147 break;
00148 case SYNRCVD:
00149 ASSERT(oldstate == WAIT_NB ||
00150 oldstate == SYNSENT ||
00151 oldstate == SYNRCVD);
00152 synrcvd_ = true;
00153 break;
00154 case ESTAB:
00155 ASSERT(oldstate == SYNRCVD ||
00156 oldstate == ESTAB ||
00157 oldstate == SYNSENT);
00158 estab_ = true;
00159 if (initiator_ == true)
00160 {
00161 set_state(CREATE_DR);
00162 }
00163 else
00164 {
00165 set_state(WAIT_DICT);
00166 }
00167 break;
00168 case WAIT_DICT:
00169 ASSERT(oldstate == ESTAB ||
00170 oldstate == WAIT_DICT ||
00171 oldstate == WAIT_RIB ||
00172 oldstate == OFFER ||
00173 oldstate == WAIT_INFO);
00174 reset_ribd();
00175 break;
00176 case WAIT_RIB:
00177 ASSERT(oldstate == WAIT_DICT ||
00178 oldstate == WAIT_RIB);
00179 break;
00180 case OFFER:
00181 ASSERT(oldstate == OFFER ||
00182 oldstate == WAIT_RIB);
00183 send_bundle_offer();
00184 break;
00185 case CREATE_DR:
00186 ASSERT(oldstate == ESTAB ||
00187 oldstate == WAIT_INFO ||
00188 oldstate == WAIT_DICT ||
00189 oldstate == SEND_DR ||
00190 oldstate == REQUEST);
00191 reset_ribd();
00192 break;
00193 case SEND_DR:
00194 ASSERT(oldstate == CREATE_DR ||
00195 oldstate == SEND_DR);
00196 break;
00197 case REQUEST:
00198 ASSERT(oldstate == REQUEST ||
00199 oldstate == SEND_DR);
00200 break;
00201 case WAIT_INFO:
00202 ASSERT(oldstate == REQUEST ||
00203 oldstate == OFFER);
00204
00205
00206
00207
00208 if ((synsender_ && initiator_) ||
00209 (!synsender_ && !initiator_))
00210 {
00211 switch_info_role();
00212 }
00213 break;
00214 default:
00215 PANIC("Unknown Prophet state: %d", (int)new_state);
00216 break;
00217 }
00218 }
00219
00220 void
00221 ProphetEncounter::reset_link()
00222 {
00223 log_debug("reset_link");
00224 neighbor_gone();
00225 }
00226
00227 void
00228 ProphetEncounter::handle_bundle_received(Bundle* bundle)
00229 {
00230 BundleRef b("handle_bundle_received");
00231 b = bundle;
00232 log_debug("handle_bundle_received *%p",bundle);
00233
00234
00235 EndpointIDPattern route = Prophet::eid_to_route(next_hop_->remote_eid());
00236 if (route.match(b->dest_))
00237 {
00238
00239 if (should_fwd(b.object()))
00240 {
00241 fwd_to_nexthop(b.object());
00242 }
00243 }
00244
00245 if (get_state("handle_bundle_received") == REQUEST)
00246 {
00247 u_int32_t cts = b->creation_ts_.seconds_;
00248 u_int16_t sid = ribd_.find(Prophet::eid_to_routeid(b->dest_));
00249 requests_.remove_bundle(cts,sid);
00250
00251
00252
00253 if (requests_.size() == 0)
00254 {
00255 enqueue_bundle_tlv(requests_);
00256 send_prophet_tlv();
00257 set_state(WAIT_INFO);
00258 }
00259 }
00260 }
00261
00262 void
00263 ProphetEncounter::receive_tlv(ProphetTLV* pt)
00264 {
00265 log_debug("receive_tlv");
00266
00267 cmdqueue_.push_back(PEMsg(PEMSG_PROPHET_TLV_RECEIVED,pt));
00268 }
00269
00270 void
00271 ProphetEncounter::neighbor_gone()
00272 {
00273 log_debug("neighbor_gone");
00274
00275 cmdqueue_.push_back(PEMsg(PEMSG_NEIGHBOR_GONE,NULL));
00276 }
00277
00278 bool
00279 ProphetEncounter::should_fwd(Bundle* b)
00280 {
00281 BundleRef bundle("should_fwd");
00282 ForwardingInfo info;
00283
00284 bundle = b;
00285 bool found = bundle->fwdlog_.get_latest_entry(next_hop_,&info);
00286
00287 if (found) {
00288 ASSERT(info.state_ != ForwardingInfo::NONE);
00289 } else {
00290 ASSERT(info.state_ == ForwardingInfo::NONE);
00291 }
00292
00293 if (info.state_ == ForwardingInfo::TRANSMITTED ||
00294 info.state_ == ForwardingInfo::IN_FLIGHT)
00295 {
00296 log_debug("should_fwd bundle %d: "
00297 "skip %s due to forwarding log entry %s",
00298 bundle->bundleid_, next_hop_->name(),
00299 ForwardingInfo::state_to_str(
00300 static_cast<ForwardingInfo::state_t>(info.state_)));
00301 return false;
00302 }
00303
00304 if (info.state_ == ForwardingInfo::TRANSMIT_FAILED) {
00305 log_debug("should_fwd bundle %d: "
00306 "match %s: forwarding log entry %s TRANSMIT_FAILED %d",
00307 bundle->bundleid_, next_hop_->name(),
00308 ForwardingInfo::state_to_str(
00309 static_cast<ForwardingInfo::state_t>(info.state_)),
00310 bundle->bundleid_);
00311 } else {
00312 log_debug("should_fwd bundle %d: "
00313 "match %s: forwarding log entry %s",
00314 bundle->bundleid_, next_hop_->name(),
00315 ForwardingInfo::state_to_str(
00316 static_cast<ForwardingInfo::state_t>(info.state_)));
00317 }
00318
00319 return true;
00320 }
00321
00322 void
00323 ProphetEncounter::fwd_to_nexthop(Bundle* b,bool add_front)
00324 {
00325 BundleRef bundle("fwd_to_nexthop");
00326 bundle = b;
00327 log_debug("fwd_to_nexthop *%p at %s",b,add_front?"front":"back");
00328
00329
00330 ASSERT(next_hop_->isopen());
00331
00332 if(bundle!=NULL)
00333 {
00334 if(add_front)
00335
00336
00337
00338
00339 to_be_fwd_.push_front(b);
00340 else
00341 to_be_fwd_.push_back(b);
00342 }
00343
00344
00345 while (next_hop_->isbusy() == false &&
00346 to_be_fwd_.size() > 0)
00347 {
00348 BundleRef ref("ProphetEncounter fwd_to_nexthop");
00349 ref = to_be_fwd_.pop_front();
00350
00351 if (ref.object() == NULL)
00352 {
00353 log_err("Unexpected NULL pointer in to_be_fwd_ list");
00354 continue;
00355 }
00356
00357 Bundle* b = ref.object();
00358 log_debug("sending *%p to *%p", b, next_hop_);
00359
00360 bool ok = oracle_->actions()->send_bundle(b,next_hop_,
00361 ForwardingInfo::COPY_ACTION,
00362 CustodyTimerSpec());
00363 ASSERTF(ok == true,"failed to send bundle");
00364
00365
00366 oracle_->stats()->update_stats(b,remote_nodes_.p_value(b));
00367 }
00368
00369 oasys::ScopeLock l(to_be_fwd_.lock(),"fwd_to_nexthop");
00370 for(BundleList::iterator i = to_be_fwd_.begin();
00371 i != to_be_fwd_.end(); i++)
00372 {
00373 log_debug("can't forward *%p to *%p because link is busy",
00374 *i, link);
00375 }
00376 }
00377
00378 void
00379 ProphetEncounter::handle_prophet_tlv(ProphetTLV* pt)
00380 {
00381 ASSERT(pt != NULL);
00382
00383 data_rcvd_.get_time();
00384
00385 oasys::StringBuffer buf;
00386 pt->dump(&buf);
00387 log_debug("handle_prophet_tlv (tid %u,%s,%zu entries)\n"
00388 "Inbound TLV\n\n%s\n",
00389 pt->transaction_id(),
00390 Prophet::result_to_str(pt->result()),
00391 pt->num_tlv(),buf.c_str());
00392
00393 tid_ = pt->transaction_id();
00394
00395 BaseTLV* tlv = NULL;
00396 bool ok = true;
00397
00398 while (neighbor_gone_ == false &&
00399 (tlv = pt->get_tlv()) != NULL &&
00400 ok != false)
00401 {
00402 if (estab_ == false && tlv->typecode() != Prophet::HELLO_TLV)
00403 {
00404 delete tlv;
00405 handle_bad_protocol(pt->transaction_id());
00406 ok = false;
00407 break;
00408 }
00409
00410 switch(tlv->typecode()) {
00411 case Prophet::HELLO_TLV:
00412 ok = handle_hello_tlv((HelloTLV*)tlv,pt);
00413 break;
00414 case Prophet::RIBD_TLV:
00415 ok = handle_ribd_tlv((RIBDTLV*)tlv,pt);
00416 break;
00417 case Prophet::RIB_TLV:
00418 ok = handle_rib_tlv((RIBTLV*)tlv,pt);
00419 break;
00420 case Prophet::BUNDLE_TLV:
00421 ok = handle_bundle_tlv((BundleTLV*)tlv,pt);
00422 break;
00423 case Prophet::UNKNOWN_TLV:
00424 case Prophet::ERROR_TLV:
00425 default:
00426 PANIC("Unexpected TLV type received by ProphetEncounter: %d",
00427 tlv->typecode());
00428 }
00429 delete tlv;
00430 }
00431 if (ok != true)
00432 {
00433 log_debug("killing off %zu unread TLVs",pt->list().size());
00434
00435 while ((tlv = pt->get_tlv()) != NULL)
00436 delete tlv;
00437 }
00438 }
00439
00440 bool
00441 ProphetEncounter::handle_hello_tlv(HelloTLV* ht,ProphetTLV* pt)
00442 {
00443 ASSERT(ht != NULL);
00444 ASSERT(ht->typecode() == Prophet::HELLO_TLV);
00445 log_debug("handle_hello_tlv %s",Prophet::hf_to_str(ht->hf()));
00446
00447
00448 bool hello_a = (remote_instance_ == pt->sender_instance());
00449 bool hello_b = hello_a &&
00450 (remote_addr_ == next_hop_->nexthop());
00451 bool hello_c = (local_instance_ == pt->receiver_instance());
00452
00453 log_info("received HF %s in state %s",Prophet::hf_to_str(ht->hf()),
00454 state_to_str(get_state("handle_hello_tlv")));
00455
00456
00457 u_int timeout = std::min(
00458 (u_int)oracle_->params()->hello_interval_,
00459 (u_int)ht->timer(),
00460 std::less<u_int>());
00461
00462 if ((timeout * 100) != timeout_)
00463 {
00464
00465 timeout_ = timeout * 100;
00466 }
00467
00468 if (ht->hf() == Prophet::RSTACK)
00469 {
00470 if (hello_a && hello_c && synsent_ == false)
00471 {
00472 handle_neighbor_gone();
00473 return false;
00474 }
00475 else
00476 return true;
00477 }
00478
00479 prophet_state_t state = get_state("handle_hello_tlv");
00480 switch(state) {
00481
00482 case WAIT_NB:
00483
00484 if (ht->hf() == Prophet::SYN)
00485 {
00486 update_peer_verifier(pt->sender_instance());
00487 enqueue_hello(Prophet::SYNACK,
00488 pt->transaction_id(),
00489 Prophet::Success);
00490 send_prophet_tlv();
00491 set_state(SYNRCVD);
00492 return true;
00493 }
00494 return false;
00495
00496 case SYNSENT:
00497
00498 if (ht->hf() == Prophet::SYNACK)
00499 {
00500 if (hello_c)
00501 {
00502 update_peer_verifier(pt->sender_instance());
00503 enqueue_hello(Prophet::ACK,
00504 pt->transaction_id(),
00505 Prophet::Success);
00506 send_prophet_tlv();
00507 set_state(ESTAB);
00508 }
00509 else
00510 {
00511 enqueue_hello(Prophet::RSTACK,
00512 pt->transaction_id(),
00513 Prophet::Failure);
00514 send_prophet_tlv();
00515 set_state(SYNSENT);
00516 }
00517 }
00518 else
00519 if (ht->hf() == Prophet::SYN)
00520 {
00521 update_peer_verifier(pt->sender_instance());
00522 enqueue_hello(Prophet::SYNACK,
00523 pt->transaction_id(),
00524 Prophet::Success);
00525 send_prophet_tlv();
00526 set_state(SYNRCVD);
00527 }
00528 else
00529 if (ht->hf() == Prophet::ACK)
00530 {
00531 enqueue_hello(Prophet::RSTACK,
00532 pt->transaction_id(),
00533 Prophet::Failure);
00534 send_prophet_tlv();
00535 set_state(SYNSENT);
00536 }
00537
00538 return true;
00539
00540 case SYNRCVD:
00541
00542 if (ht->hf() == Prophet::SYNACK)
00543 {
00544 if (hello_c)
00545 {
00546 update_peer_verifier(pt->sender_instance());
00547 enqueue_hello(Prophet::ACK,
00548 pt->transaction_id(),
00549 Prophet::Success);
00550 send_prophet_tlv();
00551 set_state(ESTAB);
00552 }
00553 else
00554 {
00555 enqueue_hello(Prophet::RSTACK,
00556 pt->transaction_id(),
00557 Prophet::Failure);
00558 send_prophet_tlv();
00559 set_state(SYNRCVD);
00560 }
00561 }
00562 else
00563 if (ht->hf() == Prophet::SYN)
00564 {
00565 update_peer_verifier(pt->sender_instance());
00566 enqueue_hello(Prophet::SYNACK,
00567 pt->transaction_id(),
00568 Prophet::Success);
00569 send_prophet_tlv();
00570 set_state(SYNRCVD);
00571 }
00572 else
00573 if (ht->hf() == Prophet::ACK)
00574 {
00575 if (hello_b && hello_c)
00576 {
00577 enqueue_hello(Prophet::ACK,
00578 pt->transaction_id(),
00579 Prophet::Success);
00580 send_prophet_tlv();
00581 set_state(ESTAB);
00582 }
00583 else
00584 {
00585 enqueue_hello(Prophet::RSTACK,
00586 pt->transaction_id(),
00587 Prophet::Failure);
00588 send_prophet_tlv();
00589 set_state(SYNRCVD);
00590 }
00591 }
00592
00593 return true;
00594
00595 case ESTAB:
00596
00597 if (ht->hf() == Prophet::SYN || ht->hf() == Prophet::SYNACK)
00598 {
00599
00600
00601
00602
00603
00604
00605
00606 if (ack_count_ < 2)
00607 {
00608 ++ack_count_;
00609 enqueue_hello(Prophet::ACK,
00610 pt->transaction_id(),
00611 Prophet::Success);
00612 send_prophet_tlv();
00613 }
00614 }
00615 else
00616 if (ht->hf() == Prophet::ACK)
00617 {
00618 if (hello_b && hello_c)
00619 {
00620
00621
00622
00623 if (ack_count_ < 1)
00624 {
00625 ++ack_count_;
00626 enqueue_hello(Prophet::ACK,
00627 pt->transaction_id(),
00628 Prophet::Success);
00629 send_prophet_tlv();
00630 }
00631 }
00632 else
00633 {
00634 enqueue_hello(Prophet::RSTACK,
00635 pt->transaction_id(),
00636 Prophet::Failure);
00637 send_prophet_tlv();
00638 }
00639 }
00640 return true;
00641
00642 case WAIT_DICT:
00643 case WAIT_RIB:
00644 case OFFER:
00645
00646 if (ht->hf() == Prophet::ACK)
00647 {
00648 set_state(WAIT_DICT);
00649 return true;
00650 }
00651 return false;
00652
00653 case CREATE_DR:
00654 case SEND_DR:
00655
00656 return false;
00657
00658 case REQUEST:
00659
00660 if (ht->hf() == Prophet::ACK)
00661 {
00662 set_state(CREATE_DR);
00663 return true;
00664 }
00665 return false;
00666
00667 default:
00668 if (ht->hf() == Prophet::ACK)
00669 {
00670 return true;
00671 }
00672 log_err("Unrecognized state %s and HF %d",
00673 state_to_str(state),ht->hf());
00674 return false;
00675 }
00676
00677 return false;
00678 }
00679
00680 bool
00681 ProphetEncounter::handle_bad_protocol(u_int32_t tid)
00682 {
00683 log_debug("handle_bad_protocol");
00684
00685
00686
00687 oasys::Time now;
00688 now.get_time();
00689 if ((now - data_sent_).in_milliseconds() < (timeout_ / 2))
00690 {
00691 log_debug("flow control engaged, skipping");
00692 return false;
00693 }
00694
00695 prophet_state_t state = get_state("handle_bad_protocol");
00696 if (state == SYNSENT)
00697 {
00698 enqueue_hello(Prophet::SYN,
00699 tid, Prophet::Failure);
00700 send_prophet_tlv();
00701 }
00702 else
00703 if (state == SYNRCVD)
00704 {
00705 enqueue_hello(Prophet::SYNACK,
00706 tid, Prophet::Failure);
00707 send_prophet_tlv();
00708 }
00709 return false;
00710 }
00711
00712 bool
00713 ProphetEncounter::handle_ribd_tlv(RIBDTLV* rt,ProphetTLV* pt)
00714 {
00715 log_debug("handle_ribd_tlv");
00716 (void)pt;
00717
00718 prophet_state_t state = get_state("handle_ribd_tlv");
00719 if (state == WAIT_DICT ||
00720 state == WAIT_RIB)
00721 {
00722 ProphetDictionary remote = rt->ribd();
00723 log_debug("handle_ribd_tlv has %zu entries",remote.size());
00724 oasys::StringBuffer buf("handle_ribd_tlv\n");
00725 for(ProphetDictionary::const_iterator i = remote.begin();
00726 i != remote.end();
00727 i++)
00728 {
00729 u_int16_t sid = (*i).first;
00730 EndpointID eid = Prophet::eid_to_routeid((*i).second);
00731 ASSERTF(ribd_.assign(eid,sid) == true,
00732 "failed to assign %d to %s",sid,eid.c_str());
00733 }
00734 ribd_.dump(&buf);
00735 log_debug("\n%s\n",buf.c_str());
00736 set_state(WAIT_RIB);
00737 return true;
00738 }
00739 else
00740 if (state == OFFER)
00741 {
00742
00743 set_state(OFFER);
00744 return true;
00745 }
00746
00747 return false;
00748 }
00749
00750 bool
00751 ProphetEncounter::handle_rib_tlv(RIBTLV* rt,ProphetTLV* pt)
00752 {
00753 log_debug("handle_rib_tlv");
00754 (void)pt;
00755
00756 EndpointID remote = Prophet::eid_to_routeid(next_hop_->remote_eid());
00757 prophet_state_t state = get_state("handle_rib_tlv");
00758 if (state == WAIT_RIB)
00759 {
00760 RIBTLV::List rib = rt->nodes();
00761 log_debug("handle_rib_tlv has %zu entries",rib.size());
00762
00763 double peer_pvalue = 0.0;
00764
00765
00766 ProphetNode* node = oracle_->nodes()->find(remote);
00767
00768
00769 if (node == NULL)
00770 {
00771 node = new ProphetNode(oracle_->params());
00772 node->set_eid(remote);
00773 }
00774
00775 node->set_relay(rt->relay_node());
00776 node->set_custody(rt->custody_node());
00777 node->set_internet_gw(rt->internet_gateway());
00778
00779
00780 node->update_pvalue();
00781 peer_pvalue = node->p_value();
00782
00783
00784 oracle_->nodes()->update(node);
00785
00786
00787 for(RIBTLV::iterator i = rib.begin();
00788 i != rib.end();
00789 i++)
00790 {
00791 RIBNode* rib = (*i);
00792 u_int16_t sid = rib->sid_;
00793 EndpointID eid = Prophet::eid_to_routeid(ribd_.find(sid));
00794 ASSERT(eid.equals(EndpointID::NULL_EID()) == false);
00795 node = new ProphetNode(oracle_->params());
00796 node->set_eid(eid);
00797 node->set_relay(rib->relay());
00798 node->set_custody(rib->custody());
00799 node->set_internet_gw(rib->internet_gw());
00800
00801
00802 node->update_transitive(peer_pvalue,rib->p_value());
00803
00804
00805 oracle_->nodes()->update(node);
00806
00807
00808 ProphetNode* rn = new ProphetNode(*node);
00809 rn->set_pvalue(rib->p_value());
00810 remote_nodes_.update(rn);
00811 }
00812
00813 set_state(OFFER);
00814
00815 return true;
00816 }
00817
00818 return false;
00819 }
00820
00821 bool
00822 ProphetEncounter::handle_bundle_tlv(BundleTLV* bt,ProphetTLV* pt)
00823 {
00824 log_debug("handle_bundle_tlv");
00825
00826 prophet_state_t state = get_state("handle_bundle_tlv");
00827 BundleList list("handle_bundle_tlv");
00828 if (state == WAIT_RIB)
00829 {
00830 enqueue_hello(Prophet::ACK,
00831 pt->transaction_id(),
00832 Prophet::Failure);
00833 send_prophet_tlv();
00834 set_state(WAIT_DICT);
00835 return false;
00836 }
00837 else
00838 if (state == OFFER)
00839 {
00840
00841 oasys::ScopeLock l(oracle_->bundles()->lock(),
00842 "ProphetEncounter::handle_bundle_tlv");
00843 oracle_->bundles()->bundle_list(list);
00844
00845
00846 size_t num = bt->list().size();
00847 log_debug("handle_bundle_tlv(OFFER) received list of %zu elements",
00848 num);
00849
00850
00851 if (num == 0)
00852 {
00853 set_state(WAIT_INFO);
00854 }
00855 else
00856 if (list.size() > 0)
00857 {
00858 oasys::ScopeLock l(requests_.lock(),"handle_bundle_tlv");
00859 requests_ = bt->list();
00860 ASSERT(requests_.type() == BundleOffer::RESPONSE);
00861
00862
00863 std::vector<std::pair<u_int32_t,u_int16_t> > to_delete;
00864 for (BundleOfferList::iterator i = requests_.begin();
00865 i != requests_.end(); i++)
00866 {
00867 BundleOffer* bo = (*i);
00868 ASSERT (bo != NULL);
00869 u_int32_t cts = (*i)->creation_ts();
00870 u_int16_t sid = (*i)->sid();
00871 EndpointID eid = ribd_.find(sid);
00872
00873 ASSERTF(eid.equals(EndpointID::NULL_EID()) == false,
00874 "null eid found for sid %d",sid);
00875
00876
00877
00878 BundleRef b("handle_bundle_tlv");
00879 b = ProphetBundleList::find(list,eid,cts);
00880
00881 if (b.object() != NULL &&
00882
00883 !oracle_->acks()->is_ackd(b.object()))
00884 {
00885
00886 if(should_fwd(b.object()))
00887 {
00888
00889 fwd_to_nexthop(b.object());
00890
00891
00892 to_delete.push_back(
00893 std::pair<u_int32_t,u_int16_t>(cts,sid));
00894 }
00895 }
00896 }
00897
00898 for(std::vector<std::pair<u_int32_t,u_int16_t> >::iterator
00899 i = to_delete.begin(); i != to_delete.end(); i++)
00900 {
00901 requests_.remove_bundle((*i).first,(*i).second);
00902 }
00903
00904 set_state(OFFER);
00905 }
00906 return true;
00907 }
00908 else
00909 if (state == SEND_DR)
00910 {
00911
00912 oasys::ScopeLock l(oracle_->bundles()->lock(),
00913 "ProphetEncounter::handle_bundle_tlv");
00914 oracle_->bundles()->bundle_list(list);
00915
00916
00917 ASSERT(offers_.type() == BundleOffer::OFFER);
00918 offers_ = bt->list();
00919 log_debug("handle_bundle_tlv(SEND_DR) received list of %zu elements",
00920 requests_.size());
00921
00922
00923 requests_.clear();
00924 requests_.set_type(BundleOffer::RESPONSE);
00925
00926 oasys::ScopeLock l2(offers_.lock(),"handle_bundle_tlv");
00927 for (BundleOfferList::iterator i = offers_.begin();
00928 i != offers_.end();
00929 i++)
00930 {
00931 u_int32_t cts = (*i)->creation_ts();
00932 u_int16_t sid = (*i)->sid();
00933 EndpointID eid = ribd_.find(sid);
00934
00935 ASSERTF(eid.equals(EndpointID::NULL_EID()) == false,
00936 "failed on eid lookup for sid %d",sid);
00937
00938 if ((*i)->ack())
00939 {
00940 u_int32_t ets = 0;
00941
00942 BundleRef b("handle_bundle_tlv");
00943 b = ProphetBundleList::find(list,eid,cts);
00944 if (b.object() != NULL)
00945 {
00946 ets = b->expiration_;
00947 oracle_->bundles()->drop_bundle(b.object());
00948
00949
00950 list.clear();
00951 oracle_->bundles()->bundle_list(list);
00952 }
00953
00954
00955 oracle_->acks()->insert(eid,cts,ets);
00956 }
00957 else
00958
00959 if (ProphetBundleList::find(list,eid,cts) == NULL)
00960 {
00961 bool accept = true;
00962
00963
00964 bool custody = false;
00965 requests_.add_offer(cts,sid,custody,accept,false);
00966 log_debug("requesting bundle, cts %d, sid %d",cts,sid);
00967 }
00968 }
00969
00970 requests_.sort(&ribd_,oracle_->nodes(),synsender_ ? 0 : 1);
00971 ASSERT(requests_.type() == BundleOffer::RESPONSE);
00972 enqueue_bundle_tlv(requests_,
00973 pt->transaction_id(),
00974 Prophet::Success);
00975 send_prophet_tlv();
00976 set_state(REQUEST);
00977 if(requests_.size() == 0)
00978 {
00979 set_state(WAIT_INFO);
00980 }
00981 }
00982 return false;
00983 }
00984
00985 void
00986 ProphetEncounter::handle_neighbor_gone()
00987 {
00988 log_debug("handle_neighbor_gone");
00989 neighbor_gone_ = true;
00990 log_info("*%p - %u received NEIGHBOR_GONE signal",
00991 next_hop_,local_instance_);
00992 }
00993
00994 void
00995 ProphetEncounter::handle_poll_timeout()
00996 {
00997 log_debug("handle_poll_timeout");
00998
00999 prophet_state_t state = get_state("handle_poll_timeout");
01000 switch(state) {
01001 case SYNSENT:
01002 case WAIT_NB:
01003 if (synsender_ == true)
01004 {
01005 enqueue_hello(Prophet::SYN);
01006 send_prophet_tlv();
01007 }
01008 break;
01009 case SYNRCVD:
01010 enqueue_hello(Prophet::SYNACK,
01011 tid_,
01012 Prophet::Success);
01013 send_prophet_tlv();
01014 break;
01015 case ESTAB:
01016
01017
01018
01019
01020
01021
01022
01023 if (ack_count_ >= 2)
01024 {
01025 ack_count_ = 0;
01026 }
01027 else
01028 {
01029 ++ack_count_;
01030 enqueue_hello(Prophet::ACK);
01031 send_prophet_tlv();
01032 }
01033 break;
01034 case WAIT_DICT:
01035 enqueue_hello(Prophet::ACK,
01036 tid_,
01037 Prophet::Success);
01038 send_prophet_tlv();
01039 break;
01040 case WAIT_RIB:
01041 enqueue_hello(Prophet::ACK,
01042 tid_,
01043 Prophet::Success);
01044 send_prophet_tlv();
01045 set_state(WAIT_DICT);
01046 break;
01047 case CREATE_DR:
01048 send_dictionary();
01049 set_state(SEND_DR);
01050 break;
01051 case SEND_DR:
01052 send_dictionary();
01053 break;
01054 case REQUEST:
01055 if(requests_.size() == 0)
01056 {
01057 set_state(WAIT_INFO);
01058 }
01059 else
01060 {
01061 enqueue_bundle_tlv(requests_,
01062 tid_,
01063 Prophet::Success);
01064 send_prophet_tlv();
01065 }
01066 break;
01067 case OFFER:
01068 send_bundle_offer();
01069 break;
01070 case WAIT_INFO:
01071
01072
01073 {
01074 oasys::Time now;
01075 now.get_time();
01076
01077 u_int timeout = oracle_->params()->hello_dead_ * timeout_ / 2;
01078 u_int diff = (now - data_sent_).in_milliseconds();
01079 if ( diff <= timeout )
01080 {
01081 log_debug("wait_info: timediff %u timeout %u",
01082 diff,timeout);
01083 }
01084 else
01085 {
01086 switch_info_role();
01087 }
01088 }
01089 break;
01090 default:
01091 break;
01092 }
01093 }
01094
01095 void
01096 ProphetEncounter::reset_ribd()
01097 {
01098 EndpointID local(BundleDaemon::instance()->local_eid()),
01099 remote(next_hop_->remote_eid());
01100 ribd_.clear();
01101 if (synsender_ == true)
01102 {
01103 ASSERT(ribd_.assign(local,0));
01104 ASSERT(ribd_.assign(remote,1));
01105 }
01106 else
01107 {
01108 ASSERT(ribd_.assign(local,1));
01109 ASSERT(ribd_.assign(remote,0));
01110 }
01111 }
01112
01113 void
01114 ProphetEncounter::switch_info_role()
01115 {
01116 ASSERT(get_state("switch_info_role") == WAIT_INFO);
01117 if (synsender_ == true)
01118 {
01119 if (initiator_ == true)
01120 {
01121 initiator_ = false;
01122 set_state(WAIT_DICT);
01123 }
01124 else
01125 {
01126 initiator_ = true;
01127 set_state(CREATE_DR);
01128 }
01129 }
01130 else
01131 {
01132 if (initiator_ == false)
01133 {
01134 initiator_ = true;
01135 set_state(CREATE_DR);
01136 }
01137 else
01138 {
01139 initiator_ = false;
01140 set_state(WAIT_DICT);
01141 }
01142 }
01143 }
01144
01145 void
01146 ProphetEncounter::dump_state(oasys::StringBuffer* buf)
01147 {
01148 buf->appendf("%05d %10s %s\n",remote_instance_,
01149 state_to_str(state_),
01150 next_hop_->remote_eid().c_str());
01151 }
01152
01153 void
01154 ProphetEncounter::send_bundle_offer()
01155 {
01156 log_debug("send_bundle_offer");
01157 ASSERT(oracle_ != NULL);
01158 ASSERT(oracle_->bundles() != NULL);
01159
01160 prophet_state_t state = get_state("send_bundle_offer");
01161 ASSERT( state == WAIT_RIB ||
01162 state == OFFER );
01163
01164 bool update_dictionary = false;
01165
01166
01167 offers_.set_type(BundleOffer::OFFER);
01168
01169
01170 oasys::ScopeLock l(oracle_->bundles()->lock(),
01171 "ProphetEncounter::send_bundle_offer");
01172 BundleList list("send_bundle_offer");
01173 oracle_->bundles()->bundle_list(list);
01174
01175 if (list.size() > 0)
01176 {
01177
01178 FwdStrategy* fs = FwdStrategy::strategy(
01179 oracle_->params()->fs_,
01180 oracle_->nodes(),
01181 &remote_nodes_);
01182
01183
01184 ProphetDecider* d = ProphetDecider::decider(
01185 oracle_->params()->fs_,
01186 oracle_->nodes(),
01187 &remote_nodes_,
01188 next_hop_,
01189 oracle_->params()->max_forward_,
01190 oracle_->stats());
01191
01192
01193 ProphetBundleOffer offer(list,fs,d);
01194
01195 while(!offer.empty())
01196 {
01197 BundleRef b("send_bundle_offer");
01198 b = offer.top();
01199 offer.pop();
01200 if (b.object() == NULL)
01201 continue;
01202
01203 EndpointID eid = Prophet::eid_to_routeid(b->dest_);
01204 u_int16_t sid = ribd_.find(eid);
01205
01206
01207 if (sid == 0)
01208 {
01209 if (ribd_.is_assigned(eid) == true &&
01210 synsender_ == true)
01211 {
01212
01213 continue;
01214 }
01215
01216 sid = ribd_.insert(eid);
01217 update_dictionary = true;
01218 }
01219 else
01220 if (sid == 1 && synsender_ == false)
01221 {
01222
01223 continue;
01224 }
01225
01226
01227 offers_.add_offer(b.object(),sid);
01228 log_debug("offering bundle *%p (sid %d)",b.object(),sid);
01229 }
01230
01231 delete fs;
01232 delete d;
01233 }
01234
01235
01236 PointerList<ProphetAck> acklist;
01237 oracle_->acks()->fetch(EndpointIDPattern("dtn://*"),acklist);
01238
01239 for (PointerList<ProphetAck>::iterator i = acklist.begin();
01240 i != acklist.end();
01241 i++)
01242 {
01243 ProphetAck* pa = *i;
01244
01245 EndpointID eid = Prophet::eid_to_routeid(pa->dest_id_);
01246 u_int16_t sid = ribd_.find(eid);
01247
01248 if (sid == 0)
01249 {
01250 if (ribd_.is_assigned(eid) == false)
01251 {
01252 sid = ribd_.insert(eid);
01253 }
01254
01255 if (sid != 0)
01256 {
01257 update_dictionary = true;
01258 }
01259 }
01260
01261
01262 offers_.add_offer(pa->cts_,sid,false,false,true);
01263 log_debug("appending ACK to bundle offer: cts %d, sid %d",
01264 pa->cts_,sid);
01265 }
01266
01267 if (update_dictionary)
01268 send_dictionary();
01269
01270 enqueue_bundle_tlv(offers_);
01271 send_prophet_tlv();
01272 }
01273
01274 void
01275 ProphetEncounter::send_dictionary()
01276 {
01277 log_debug("send_dictionary");
01278 ASSERT(oracle_ != NULL);
01279 ASSERT(oracle_->nodes() != NULL);
01280 ASSERT(next_hop_ != NULL);
01281
01282
01283 ProphetNodeList nodes;
01284
01285 RIBTLV::List rib;
01286
01287 EndpointID local(BundleDaemon::instance()->local_eid()),
01288 remote(next_hop_->remote_eid());
01289
01290
01291 if (synsender_ == true)
01292 {
01293 ASSERT(ribd_.find(local) == 0);
01294 ASSERT(ribd_.find(remote) == 1);
01295 }
01296 else
01297 {
01298 ASSERT(ribd_.find(remote) == 0);
01299 ASSERT(ribd_.find(local) == 1);
01300 }
01301
01302
01303 oracle_->nodes()->dump_table(nodes);
01304
01305
01306 for(ProphetNodeList::iterator i = nodes.begin();
01307 i != nodes.end();
01308 i++)
01309 {
01310 ProphetNode* node = *i;
01311
01312 u_int16_t sid;
01313
01314 if (ribd_.is_assigned(node->remote_eid()))
01315 {
01316 sid = ribd_.find(node->remote_eid());
01317 }
01318 else
01319 {
01320 sid = ribd_.insert(node->remote_eid());
01321 }
01322
01323 ASSERT(ribd_.is_assigned(node->remote_eid()));
01324
01325 if (sid == 0 || sid == 1)
01326 {
01327
01328 continue;
01329 }
01330
01331 ASSERT(sid != 0);
01332 rib.push_back(new RIBNode(node,sid));
01333 }
01334
01335
01336 oasys::ScopeLock l(oracle_->bundles()->lock(),
01337 "ProphetEncounter::send_dictionary");
01338 BundleList list("send_dictionary");
01339 oracle_->bundles()->bundle_list(list);
01340 oasys::ScopeLock bl(list.lock(),"send_dictionary");
01341 for (BundleList::iterator i = list.begin();
01342 i != list.end();
01343 i++)
01344 {
01345 u_int16_t sid;
01346 EndpointID routeid = Prophet::eid_to_routeid((*i)->dest_);
01347 if (ribd_.is_assigned(routeid))
01348 continue;
01349
01350
01351
01352 ASSERT((sid = ribd_.insert(routeid)) != 0);
01353 ProphetNode* node = new ProphetNode(oracle_->params());
01354 node->set_eid(routeid);
01355 oracle_->nodes()->update(node);
01356 rib.push_back(new RIBNode(node,sid));
01357 }
01358
01359 u_int32_t tid = Prophet::UniqueID::tid();
01360 enqueue_ribd(ribd_,tid,Prophet::NoSuccessAck);
01361 enqueue_rib(rib,tid,Prophet::NoSuccessAck);
01362 send_prophet_tlv();
01363 }
01364
01365 ProphetTLV*
01366 ProphetEncounter::outbound_tlv(u_int32_t tid,
01367 Prophet::header_result_t result)
01368 {
01369 oasys::ScopeLock l(otlv_lock_,"outbound_tlv");
01370 if (outbound_tlv_ == NULL)
01371 {
01372 outbound_tlv_ = new ProphetTLV(result,
01373 local_instance_,
01374 remote_instance_,
01375 tid);
01376 }
01377 else
01378 {
01379 if (outbound_tlv_->transaction_id() != tid)
01380 {
01381 log_err("mismatched tid: TLV %u tid %u",
01382 outbound_tlv_->transaction_id(),
01383 tid);
01384 return NULL;
01385 }
01386
01387 if (outbound_tlv_->result() != result)
01388 {
01389 log_err("mismatched result field: TLV %s result %s",
01390 Prophet::result_to_str(outbound_tlv_->result()),
01391 Prophet::result_to_str(result));
01392 return NULL;
01393 }
01394 }
01395 return outbound_tlv_;
01396 }
01397
01398 bool
01399 ProphetEncounter::send_prophet_tlv()
01400 {
01401 oasys::ScopeLock l(otlv_lock_,"send_prophet_tlv");
01402 if (neighbor_gone_ == true) return false;
01403 if (outbound_tlv_ == NULL) return false;
01404
01405 ASSERT(outbound_tlv_->num_tlv() > 0);
01406
01407 bool retval = false;
01408
01409 BundleRef b("ProphetEncounter send_prophet_tlv");
01410 b = new Bundle();
01411 if (outbound_tlv_->create_bundle(b.object(),
01412 BundleDaemon::instance()->local_eid(),
01413 next_hop_->remote_eid()))
01414 {
01415 oasys::StringBuffer buf;
01416 outbound_tlv_->dump(&buf);
01417 log_debug("Outbound TLV\n"
01418 "\n%s\n",buf.c_str());
01419
01420
01421 fwd_to_nexthop(b.object(),true);
01422
01423
01424 data_sent_.get_time();
01425
01426 retval = true;
01427 }
01428 else
01429 {
01430 log_err("Failed to write out ProphetTLV");
01431 retval = false;
01432 }
01433
01434 delete outbound_tlv_;
01435 outbound_tlv_ = NULL;
01436 return retval;
01437 }
01438
01439 void
01440 ProphetEncounter::enqueue_hello(Prophet::hello_hf_t hf,
01441 u_int32_t tid,
01442 Prophet::header_result_t result)
01443 {
01444 log_debug("enqueue_hello %s %u %s",
01445 Prophet::hf_to_str(hf), tid,
01446 Prophet::result_to_str(result));
01447
01448 oasys::ScopeLock l(otlv_lock_,"enqueue_hello");
01449 ProphetTLV* tlv = outbound_tlv(tid,result);
01450 HelloTLV *ht = new HelloTLV(hf,
01451 oracle_->params()->hello_interval_,
01452 BundleDaemon::instance()->local_eid(),
01453 logpath_);
01454
01455 ASSERT(tlv != NULL);
01456 tlv->add_tlv(ht);
01457 }
01458
01459 void
01460 ProphetEncounter::enqueue_ribd(const ProphetDictionary& ribd,
01461 u_int32_t tid,
01462 Prophet::header_result_t result)
01463 {
01464 log_debug("enqueue_ribd (%zu entries) %u %s",
01465 ribd.size(),tid,
01466 Prophet::result_to_str(result));
01467
01468 oasys::ScopeLock l(otlv_lock_,"enqueue_ribd");
01469 ProphetTLV* tlv = outbound_tlv(tid,result);
01470 RIBDTLV *rt = new RIBDTLV(ribd,logpath_);
01471
01472 ASSERT(tlv != NULL);
01473 tlv->add_tlv(rt);
01474 }
01475
01476 void
01477 ProphetEncounter::enqueue_rib(const RIBTLV::List& nodes,
01478 u_int32_t tid,
01479 Prophet::header_result_t result)
01480 {
01481 log_debug("enqueue_rib (%zu entries)",nodes.size());
01482
01483 oasys::ScopeLock l(otlv_lock_,"enqueue_rib");
01484 ProphetTLV* tlv = outbound_tlv(tid,result);
01485 RIBTLV *rt = new RIBTLV(nodes,
01486 oracle_->params()->relay_node_,
01487 oracle_->params()->custody_node_,
01488 oracle_->params()->internet_gw_,
01489 logpath_);
01490
01491 ASSERT(tlv != NULL);
01492 tlv->add_tlv(rt);
01493 }
01494
01495 void
01496 ProphetEncounter::enqueue_bundle_tlv(const BundleOfferList& bundles,
01497 u_int32_t tid,
01498 Prophet::header_result_t result)
01499 {
01500 log_debug("enqueue_bundle_tlv (%zu entries)",bundles.size());
01501
01502 oasys::ScopeLock l(otlv_lock_,"enqueue_bundle_tlv");
01503 ProphetTLV* tlv = outbound_tlv(tid,result);
01504 BundleTLV* bt = new BundleTLV(bundles,logpath_);
01505
01506 ASSERT(tlv != NULL);
01507 tlv->add_tlv(bt);
01508 }
01509
01510 void
01511 ProphetEncounter::process_command()
01512 {
01513 PEMsg msg;
01514 bool ok = cmdqueue_.try_pop(&msg);
01515
01516
01517 ASSERT( ok == true );
01518
01519
01520 switch(msg.type_) {
01521 case PEMSG_PROPHET_TLV_RECEIVED:
01522 log_debug("processing PEMSG_PROPHET_TLV_RECEIVED");
01523 handle_prophet_tlv(msg.tlv_);
01524 delete msg.tlv_;
01525 break;
01526 case PEMSG_NEIGHBOR_GONE:
01527 log_debug("processing PEMSG_NEIGHBOR_GONE");
01528 handle_neighbor_gone();
01529 break;
01530 case PEMSG_HELLO_INTERVAL_CHANGED:
01531 log_debug("processing PEMSG_HELLO_INTERVAL_CHANGED");
01532 handle_hello_interval_changed();
01533 break;
01534 default:
01535 PANIC("invalid PEMsg typecode %d",msg.type_);
01536 }
01537 }
01538
01539 void
01540 ProphetEncounter::handle_hello_interval_changed() {
01541 timeout_ = oracle_->params()->hello_interval_ * 100;
01542 }
01543
01544 void
01545 ProphetEncounter::run() {
01546
01547 ASSERT(oracle_ != NULL);
01548 EndpointID local(BundleDaemon::instance()->local_eid()),
01549 remote(next_hop_->remote_eid());
01550
01551
01552
01553 synsender_ = (local.str() < remote.str());
01554
01555
01556
01557 initiator_ = synsender_;
01558
01559 log_debug("synsender_ == %s", synsender_ ? "true" : "false");
01560
01561 remote_nodes_.clear();
01562 if (synsender_ == true)
01563 {
01564 enqueue_hello(Prophet::SYN);
01565 send_prophet_tlv();
01566 set_state(SYNSENT);
01567 }
01568
01569
01570
01571
01572
01573
01574
01575
01576
01577
01578 u_int timeout = timeout_;
01579 while (neighbor_gone_ == false) {
01580
01581 if (cmdqueue_.size() != 0)
01582 {
01583 process_command();
01584 continue;
01585 }
01586
01587
01588
01589 int r = oasys::Random::rand(10);
01590 double ratio = (double) (10 - r) / (double) 100;
01591 timeout = (int) ((double) timeout_ * (1.05 - ratio));
01592 log_debug("poll timeout = %d", timeout);
01593
01594 short revents = 0;
01595 int cc = oasys::IO::poll_single(cmdqueue_.read_fd(),
01596 POLL_IN,&revents,timeout);
01597
01598 if (neighbor_gone_ == true) break;
01599 if (cc == oasys::IOTIMEOUT)
01600 {
01601 handle_poll_timeout();
01602 }
01603 else if (cc > 0)
01604 {
01605
01606 continue;
01607 }
01608 else
01609 {
01610 log_err("unexpected return on poll: %d",cc);
01611 handle_neighbor_gone();
01612 break;
01613 }
01614
01615 oasys::Time now;
01616 now.get_time();
01617 if ((now-data_rcvd_).in_milliseconds() >= (oracle_->params()->hello_dead_ * timeout_)) {
01618 log_err("%d silent Hello intervals, giving up",oracle_->params()->hello_dead_);
01619 break;
01620 }
01621 }
01622
01623 ProphetController::instance()->unreg(this);
01624 }
01625
01626 }