00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #ifdef HAVE_CONFIG_H
00023 # include <dtn-config.h>
00024 #endif
00025
00026 #if defined(XERCES_C_ENABLED) && defined(EXTERNAL_DP_ENABLED)
00027
00028 #include <memory>
00029 #include <iostream>
00030 #include <map>
00031 #include <vector>
00032 #include <sys/ioctl.h>
00033 #include <string.h>
00034 #include <time.h>
00035 #include <netinet/in.h>
00036 #include <sstream>
00037 #include <xercesc/framework/MemBufFormatTarget.hpp>
00038
00039 #include "ExternalRouter.h"
00040 #include "bundling/GbofId.h"
00041 #include "bundling/BundleDaemon.h"
00042 #include "bundling/BundleActions.h"
00043 #include "bundling/MetadataBlockProcessor.h"
00044 #include "contacts/ContactManager.h"
00045 #include "contacts/NamedAttribute.h"
00046 #include "reg/RegistrationTable.h"
00047 #include "conv_layers/ConvergenceLayer.h"
00048 #include <oasys/io/UDPClient.h>
00049 #include <oasys/tclcmd/TclCommand.h>
00050 #include <oasys/io/IO.h>
00051
00052 #define SEND(event, data) \
00053 rtrmessage::bpa message; \
00054 message.event(data); \
00055 send(message);
00056
00057 #define CATCH(exception) \
00058 catch (exception &e) { log_warn(e.what()); }
00059
00060 namespace dtn {
00061
00062 using namespace rtrmessage;
00063
00064 ExternalRouter::ExternalRouter()
00065 : BundleRouter("ExternalRouter", "external")
00066 {
00067 log_notice("Initializing ExternalRouter");
00068 }
00069
00070 ExternalRouter::~ExternalRouter()
00071 {
00072 delete srv_;
00073 delete hello_;
00074 delete reg_;
00075 delete route_table_;
00076 }
00077
00078
00079 void
00080 ExternalRouter::initialize()
00081 {
00082
00083 route_table_ = new RouteTable("external");
00084
00085
00086 reg_ = new ERRegistration(this);
00087
00088
00089 hello_ = new HelloTimer(this);
00090
00091
00092 BundleDaemon::instance()->set_rtr_shutdown(
00093 external_rtr_shutdown, (void *) 0);
00094
00095
00096 srv_ = new ModuleServer();
00097 srv_->start();
00098
00099 bpa message;
00100 message.alert(dtnStatusType(std::string("justBooted")));
00101 message.hello_interval(ExternalRouter::hello_interval);
00102 send(message);
00103 hello_->schedule_in(ExternalRouter::hello_interval * 1000);
00104 }
00105
00106 void
00107 ExternalRouter::shutdown()
00108 {
00109 dtnStatusType e(std::string("shuttingDown"));
00110 SEND(alert, e)
00111 }
00112
00113
00114 void
00115 ExternalRouter::get_routing_state(oasys::StringBuffer* buf)
00116 {
00117 buf->appendf("Static route table for %s router(s):\n", name_.c_str());
00118 route_table_->dump(buf);
00119 }
00120
00121
00122 void
00123 ExternalRouter::handle_event(BundleEvent *event)
00124 {
00125 dispatch_event(event);
00126 }
00127
00128 void
00129 ExternalRouter::handle_bundle_received(BundleReceivedEvent *event)
00130 {
00131 bpa::bundle_received_event::type e(
00132 event->bundleref_.object(),
00133 event->bundleref_->dest(),
00134 event->bundleref_->custodian(),
00135 event->bundleref_->replyto(),
00136 bundle_ts_to_long(event->bundleref_->extended_id()),
00137 event->bundleref_->expiration(),
00138 event->bytes_received_);
00139
00140
00141 e.prevhop(event->bundleref_->prevhop());
00142
00143 LinkRef null_link("ExternalRouter::handle_bundle_received");
00144 MetadataVec * gen_meta = event->bundleref_->generated_metadata().
00145 find_blocks(null_link);
00146 unsigned int num_meta_blocks = event->bundleref_->recv_metadata().size() +
00147 ((gen_meta == NULL)? 0 : gen_meta->size());
00148 e.num_meta_blocks(num_meta_blocks);
00149
00150 SEND(bundle_received_event, e)
00151 }
00152
00153 void
00154 ExternalRouter::handle_bundle_transmitted(BundleTransmittedEvent* event)
00155 {
00156 if (event->contact_ == NULL) return;
00157
00158 bpa::data_transmitted_event::type e(
00159 event->bundleref_.object(),
00160 bundle_ts_to_long(event->bundleref_->extended_id()),
00161 event->link_.object()->name_str(),
00162 event->bytes_sent_,
00163 event->reliably_sent_);
00164 SEND(data_transmitted_event, e)
00165 }
00166
00167 void
00168 ExternalRouter::handle_bundle_delivered(BundleDeliveredEvent* event)
00169 {
00170 bpa::bundle_delivered_event::type e(
00171 event->bundleref_.object(),
00172 bundle_ts_to_long(event->bundleref_->extended_id()));
00173 SEND(bundle_delivered_event, e)
00174 }
00175
00176 void
00177 ExternalRouter::handle_bundle_expired(BundleExpiredEvent* event)
00178 {
00179 bpa::bundle_expired_event::type e(
00180 event->bundleref_.object(),
00181 bundle_ts_to_long(event->bundleref_->extended_id()));
00182 SEND(bundle_expired_event, e)
00183 }
00184
00185 void
00186 ExternalRouter::handle_bundle_cancelled(BundleSendCancelledEvent* event)
00187 {
00188 bpa::bundle_send_cancelled_event::type e(
00189 event->bundleref_.object(),
00190 event->link_.object()->name_str(),
00191 bundle_ts_to_long(event->bundleref_->extended_id()));
00192 SEND(bundle_send_cancelled_event, e)
00193 }
00194
00195 void
00196 ExternalRouter::handle_bundle_injected(BundleInjectedEvent* event)
00197 {
00198 bpa::bundle_injected_event::type e(
00199 event->request_id_,
00200 event->bundleref_.object(),
00201 bundle_ts_to_long(event->bundleref_->extended_id()));
00202 SEND(bundle_injected_event, e)
00203 }
00204
00205 void
00206 ExternalRouter::handle_contact_up(ContactUpEvent* event)
00207 {
00208 ASSERT(event->contact_->link() != NULL);
00209 ASSERT(!event->contact_->link()->isdeleted());
00210
00211 bpa::link_opened_event::type e(
00212 event->contact_.object(),
00213 event->contact_->link().object()->name_str());
00214 SEND(link_opened_event, e)
00215 }
00216
00217 void
00218 ExternalRouter::handle_contact_down(ContactDownEvent* event)
00219 {
00220 bpa::link_closed_event::type e(
00221 event->contact_.object(),
00222 event->contact_->link().object()->name_str(),
00223 contactReasonType(reason_to_str(event->reason_)));
00224 SEND(link_closed_event, e)
00225 }
00226
00227 void
00228 ExternalRouter::handle_link_created(LinkCreatedEvent *event)
00229 {
00230 ASSERT(event->link_ != NULL);
00231 ASSERT(!event->link_->isdeleted());
00232
00233 bpa::link_created_event::type e(
00234 event->link_.object(),
00235 event->link_.object()->name_str(),
00236 contactReasonType(reason_to_str(event->reason_)));
00237 SEND(link_created_event, e)
00238 }
00239
00240 void
00241 ExternalRouter::handle_link_deleted(LinkDeletedEvent *event)
00242 {
00243 ASSERT(event->link_ != NULL);
00244
00245 bpa::link_deleted_event::type e(
00246 event->link_.object()->name_str(),
00247 contactReasonType(reason_to_str(event->reason_)));
00248 SEND(link_deleted_event, e)
00249 }
00250
00251 void
00252 ExternalRouter::handle_link_available(LinkAvailableEvent *event)
00253 {
00254 ASSERT(event->link_ != NULL);
00255 ASSERT(!event->link_->isdeleted());
00256
00257 bpa::link_available_event::type e(
00258 event->link_.object()->name_str(),
00259 contactReasonType(reason_to_str(event->reason_)));
00260 SEND(link_available_event, e)
00261 }
00262
00263 void
00264 ExternalRouter::handle_link_unavailable(LinkUnavailableEvent *event)
00265 {
00266 bpa::link_unavailable_event::type e(
00267 event->link_.object()->name_str(),
00268 contactReasonType(reason_to_str(event->reason_)));
00269 SEND(link_unavailable_event, e)
00270 }
00271
00272 void
00273 ExternalRouter::handle_link_attribute_changed(LinkAttributeChangedEvent *event)
00274 {
00275 ASSERT(event->link_ != NULL);
00276 ASSERT(!event->link_->isdeleted());
00277
00278 bpa::link_attribute_changed_event::type e(
00279 event->link_.object(),
00280 event->link_.object()->name_str(),
00281 contactReasonType(reason_to_str(event->reason_)));
00282 SEND(link_attribute_changed_event, e)
00283 }
00284
00285 void
00286 ExternalRouter::handle_new_eid_reachable(NewEIDReachableEvent* event)
00287 {
00288 bpa::eid_reachable_event::type e(
00289 event->endpoint_,
00290 event->iface_->name());
00291 SEND(eid_reachable_event, e)
00292 }
00293
00294 void
00295 ExternalRouter::handle_contact_attribute_changed(ContactAttributeChangedEvent *event)
00296 {
00297 ASSERT(event->contact_->link() != NULL);
00298 ASSERT(!event->contact_->link()->isdeleted());
00299
00300
00301 bpa::contact_attribute_changed_event::type e(
00302 eidType(event->contact_->link()->remote_eid().str()),
00303 event->contact_.object(),
00304 contactReasonType(reason_to_str(event->reason_)));
00305 SEND(contact_attribute_changed_event, e)
00306 }
00307
00308
00309
00310
00311
00312
00313
00314
00315
00316 void
00317 ExternalRouter::handle_registration_added(RegistrationAddedEvent* event)
00318 {
00319 bpa::registration_added_event::type e(
00320 event->registration_,
00321 (xml_schema::string)source_to_str((event_source_t)event->source_));
00322 SEND(registration_added_event, e)
00323 }
00324
00325 void
00326 ExternalRouter::handle_registration_removed(RegistrationRemovedEvent* event)
00327 {
00328 bpa::registration_removed_event::type e(
00329 event->registration_);
00330 SEND(registration_removed_event, e)
00331 }
00332
00333 void
00334 ExternalRouter::handle_registration_expired(RegistrationExpiredEvent* event)
00335 {
00336 bpa::registration_expired_event::type e(
00337 event->registration_->regid());
00338 SEND(registration_expired_event, e)
00339 }
00340
00341 void
00342 ExternalRouter::handle_route_add(RouteAddEvent* event)
00343 {
00344
00345 route_table_->add_entry(event->entry_);
00346
00347 bpa::route_add_event::type e(
00348 event->entry_);
00349 SEND(route_add_event, e)
00350 }
00351
00352 void
00353 ExternalRouter::handle_route_del(RouteDelEvent* event)
00354 {
00355
00356 route_table_->del_entries(event->dest_);
00357
00358 bpa::route_delete_event::type e(
00359 eidType(event->dest_.str()));
00360 SEND(route_delete_event, e)
00361 }
00362
00363 void
00364 ExternalRouter::handle_custody_signal(CustodySignalEvent* event)
00365 {
00366 custodySignalType attr(
00367 event->data_.admin_type_,
00368 event->data_.admin_flags_,
00369 event->data_.succeeded_,
00370 event->data_.reason_,
00371 event->data_.orig_frag_offset_,
00372 event->data_.orig_frag_length_,
00373 event->data_.custody_signal_tv_.seconds_,
00374 event->data_.custody_signal_tv_.seqno_,
00375 event->data_.orig_creation_tv_.seconds_,
00376 event->data_.orig_creation_tv_.seqno_);
00377
00378
00379
00380
00381
00382 GbofId gbof_id;
00383 gbof_id.source_ = event->data_.orig_source_eid_;
00384 gbof_id.creation_ts_ = event->data_.orig_creation_tv_;
00385 gbof_id.is_fragment_
00386 = event->data_.admin_flags_ & BundleProtocol::ADMIN_IS_FRAGMENT;
00387 gbof_id.frag_length_
00388 = gbof_id.is_fragment_ ? event->data_.orig_frag_length_ : 0;
00389 gbof_id.frag_offset_
00390 = gbof_id.is_fragment_ ? event->data_.orig_frag_offset_ : 0;
00391
00392 BundleDaemon *bd = BundleDaemon::instance();
00393 BundleRef br = bd->custody_bundles()->find(gbof_id);
00394 if (!br.object()) {
00395
00396 return;
00397 }
00398
00399 bpa::custody_signal_event::type e(
00400 event->data_,
00401 attr,
00402 bundle_ts_to_long(br->extended_id()));
00403
00404 SEND(custody_signal_event, e)
00405 }
00406
00407 void
00408 ExternalRouter::handle_custody_timeout(CustodyTimeoutEvent* event)
00409 {
00410 bpa::custody_timeout_event::type e(
00411 event->bundle_.object(),
00412 bundle_ts_to_long(event->bundle_->extended_id()));
00413 SEND(custody_timeout_event, e)
00414 }
00415
00416 void
00417 ExternalRouter::handle_link_report(LinkReportEvent *event)
00418 {
00419 BundleDaemon *bd = BundleDaemon::instance();
00420 oasys::ScopeLock l(bd->contactmgr()->lock(),
00421 "ExternalRouter::handle_event");
00422
00423 (void) event;
00424
00425 const LinkSet *links = bd->contactmgr()->links();
00426 LinkSet::const_iterator i = links->begin();
00427 LinkSet::const_iterator end = links->end();
00428
00429 link_report report;
00430 link_report::link::container c;
00431
00432 for(; i != end; ++i)
00433 c.push_back(link_report::link::type((*i).object()));
00434
00435 report.link(c);
00436 SEND(link_report, report)
00437 }
00438
00439 void
00440 ExternalRouter::handle_link_attributes_report(LinkAttributesReportEvent *event)
00441 {
00442 AttributeVector::const_iterator iter = event->attributes_.begin();
00443 AttributeVector::const_iterator end = event->attributes_.end();
00444
00445 link_attributes_report::report_params::container c;
00446
00447 for(; iter != end; ++iter) {
00448 c.push_back( key_value_pair(*iter) );
00449 }
00450
00451 link_attributes_report e(event->query_id_);
00452 e.report_params(c);
00453 SEND(link_attributes_report, e)
00454 }
00455
00456 void
00457 ExternalRouter::handle_contact_report(ContactReportEvent* event)
00458 {
00459 BundleDaemon *bd = BundleDaemon::instance();
00460 oasys::ScopeLock l(bd->contactmgr()->lock(),
00461 "ExternalRouter::handle_event");
00462
00463 (void) event;
00464
00465 const LinkSet *links = bd->contactmgr()->links();
00466 LinkSet::const_iterator i = links->begin();
00467 LinkSet::const_iterator end = links->end();
00468
00469 contact_report report;
00470 contact_report::contact::container c;
00471
00472 for(; i != end; ++i) {
00473 if ((*i)->contact() != NULL) {
00474 c.push_back((*i)->contact().object());
00475 }
00476 }
00477
00478 report.contact(c);
00479 SEND(contact_report, report)
00480 }
00481
00482 void
00483 ExternalRouter::handle_bundle_report(BundleReportEvent *event)
00484 {
00485 BundleDaemon *bd = BundleDaemon::instance();
00486 oasys::ScopeLock l(bd->pending_bundles()->lock(),
00487 "ExternalRouter::handle_event");
00488
00489 (void) event;
00490
00491 log_debug("pending_bundles size %zu", bd->pending_bundles()->size());
00492 const BundleList *bundles = bd->pending_bundles();
00493 BundleList::iterator i = bundles->begin();
00494 BundleList::iterator end = bundles->end();
00495
00496 bundle_report report;
00497 bundle_report::bundle::container c;
00498
00499 for(; i != end; ++i)
00500 c.push_back(bundle_report::bundle::type(*i));
00501
00502 report.bundle(c);
00503 SEND(bundle_report, report)
00504 }
00505
00506 void
00507 ExternalRouter::handle_bundle_attributes_report(BundleAttributesReportEvent *event)
00508 {
00509 BundleRef br = event->bundle_;
00510
00511 bundleAttributesReportType response;
00512
00513 AttributeNameVector::iterator i = event->attribute_names_.begin();
00514 AttributeNameVector::iterator end = event->attribute_names_.end();
00515
00516 for (; i != end; ++i) {
00517 const std::string& name = i->name();
00518
00519 if (name == "bundleid")
00520 response.bundleid( br->bundleid() );
00521 else if (name == "is_admin")
00522 response.is_admin( br->is_admin() );
00523 else if (name == "do_not_fragment")
00524 response.do_not_fragment( br->do_not_fragment() );
00525 else if (name == "priority")
00526 response.priority(
00527 bundlePriorityType(lowercase(br->prioritytoa(br->priority()))) );
00528 else if (name == "custody_requested")
00529 response.custody_requested( br->custody_requested() );
00530 else if (name == "local_custody")
00531 response.local_custody( br->local_custody() );
00532 else if (name == "singleton_dest")
00533 response.singleton_dest( br->singleton_dest() );
00534 else if (name == "custody_rcpt")
00535 response.custody_rcpt( br->custody_rcpt() );
00536 else if (name == "receive_rcpt")
00537 response.receive_rcpt( br->receive_rcpt() );
00538 else if (name == "forward_rcpt")
00539 response.forward_rcpt( br->forward_rcpt() );
00540 else if (name == "delivery_rcpt")
00541 response.delivery_rcpt( br->delivery_rcpt() );
00542 else if (name == "deletion_rcpt")
00543 response.deletion_rcpt( br->deletion_rcpt() );
00544 else if (name == "app_acked_rcpt")
00545 response.app_acked_rcpt( br->app_acked_rcpt() );
00546 else if (name == "expiration")
00547 response.expiration( br->expiration() );
00548 else if (name == "orig_length")
00549 response.orig_length( br->orig_length() );
00550 else if (name == "owner")
00551 response.owner( br->owner() );
00552 else if (name == "location")
00553 response.location(
00554 bundleLocationType(
00555 bundleType::location_to_str(br->payload().location())) );
00556 else if (name == "dest")
00557 response.dest( br->dest() );
00558 else if (name == "custodian")
00559 response.custodian( br->custodian() );
00560 else if (name == "replyto")
00561 response.replyto( br->replyto() );
00562 else if (name == "prevhop")
00563 response.prevhop( br->prevhop() );
00564 else if (name == "payload_file") {
00565 response.payload_file( br->payload().filename() );
00566 }
00567 }
00568
00569 if (event->metadata_blocks_.size() > 0) {
00570
00571
00572
00573 std::vector<unsigned int> added_blocks;
00574
00575
00576 MetaBlockRequestVector::iterator requested;
00577 for (requested = event->metadata_blocks_.begin();
00578 requested != event->metadata_blocks_.end();
00579 ++requested) {
00580
00581 for (unsigned int i = 0; i < 2; ++i) {
00582 MetadataVec * block_vec = NULL;
00583 if (i == 0) {
00584 block_vec = br->mutable_recv_metadata();
00585 ASSERT(block_vec != NULL);
00586 } else if (i == 1) {
00587 LinkRef null_link("ExternalRouter::"
00588 "handle_bundle_attributes_report");
00589 block_vec = br->generated_metadata().find_blocks(null_link);
00590 if (block_vec == NULL) {
00591 continue;
00592 }
00593 } else {
00594 ASSERT(block_vec != NULL);
00595 }
00596
00597 MetadataVec::iterator block_i;
00598 for (block_i = block_vec->begin();
00599 block_i != block_vec->end();
00600 ++block_i) {
00601
00602 MetadataBlockRef block = *block_i;
00603
00604
00605 bool added = false;
00606 for (unsigned int i = 0; i < added_blocks.size(); ++i) {
00607 if (added_blocks[i] == block->id()) {
00608 added = true;
00609 break;
00610 }
00611 }
00612 if (added) {
00613 continue;
00614 }
00615
00616 switch (requested->query_type()) {
00617
00618 case MetadataBlockRequest::QueryByIdentifier:
00619 if (requested->query_value() != block->id())
00620 continue;
00621 break;
00622
00623
00624 case MetadataBlockRequest::QueryByType:
00625 if (block->ontology() != requested->query_value())
00626 continue;
00627 break;
00628
00629
00630 case MetadataBlockRequest::QueryAll:
00631 break;
00632 }
00633
00634
00635 added_blocks.push_back(block->id());
00636
00637
00638 response.meta_blocks().push_back(
00639 rtrmessage::metadataBlockType(
00640 block->id(), false, block->ontology(),
00641 xml_schema::base64_binary(
00642 block->metadata(),
00643 block->metadata_len())));
00644 }
00645 }
00646 }
00647 }
00648
00649 bundle_attributes_report e(event->query_id_, response);
00650 SEND(bundle_attributes_report, e)
00651 }
00652
00653 void
00654 ExternalRouter::handle_route_report(RouteReportEvent* event)
00655 {
00656 oasys::ScopeLock l(route_table_->lock(),
00657 "ExternalRouter::handle_event");
00658
00659 (void) event;
00660
00661 const RouteEntryVec *re = route_table_->route_table();
00662 RouteEntryVec::const_iterator i = re->begin();
00663 RouteEntryVec::const_iterator end = re->end();
00664
00665 route_report report;
00666 route_report::route_entry::container c;
00667
00668 for(; i != end; ++i)
00669 c.push_back(route_report::route_entry::type(*i));
00670
00671 report.route_entry(c);
00672 SEND(route_report, report)
00673 }
00674
00675 void
00676 ExternalRouter::send(bpa &message)
00677 {
00678 xercesc::MemBufFormatTarget buf;
00679 xml_schema::namespace_infomap map;
00680
00681 message.eid(BundleDaemon::instance()->local_eid().c_str());
00682
00683 if (ExternalRouter::client_validation)
00684 map[""].schema = ExternalRouter::schema.c_str();
00685
00686 try {
00687 bpa_(buf, message, map, "UTF-8",
00688 xml_schema::flags::dont_initialize);
00689 srv_->eventq->push_back(new std::string((char *)buf.getRawBuffer()));
00690 }
00691 catch (xml_schema::serialization &e) {
00692 const xml_schema::errors &elist = e.errors();
00693 xml_schema::errors::const_iterator i = elist.begin();
00694 xml_schema::errors::const_iterator end = elist.end();
00695
00696 for (; i < end; ++i) {
00697 std::cout << (*i).message() << std::endl;
00698 }
00699 }
00700 CATCH(xml_schema::unexpected_element)
00701 CATCH(xml_schema::no_namespace_mapping)
00702 CATCH(xml_schema::no_prefix_mapping)
00703 CATCH(xml_schema::xsi_already_in_use)
00704 }
00705
00706 const char *
00707 ExternalRouter::reason_to_str(int reason)
00708 {
00709 switch(reason) {
00710 case ContactEvent::NO_INFO: return "no_info";
00711 case ContactEvent::USER: return "user";
00712 case ContactEvent::SHUTDOWN: return "shutdown";
00713 case ContactEvent::BROKEN: return "broken";
00714 case ContactEvent::CL_ERROR: return "cl_error";
00715 case ContactEvent::CL_VERSION: return "cl_version";
00716 case ContactEvent::RECONNECT: return "reconnect";
00717 case ContactEvent::IDLE: return "idle";
00718 case ContactEvent::TIMEOUT: return "timeout";
00719 default: return "";
00720 }
00721 }
00722
00723 ExternalRouter::ModuleServer::ModuleServer()
00724 : IOHandlerBase(new oasys::Notifier("/router/external/moduleserver")),
00725 Thread("/router/external/moduleserver"),
00726 parser_(new oasys::XercesXMLUnmarshal(
00727 ExternalRouter::server_validation,
00728 ExternalRouter::schema.c_str())),
00729 lock_(new oasys::SpinLock())
00730 {
00731 set_logpath("/router/external/moduleserver");
00732
00733
00734
00735 if (fd() == -1) {
00736 init_socket();
00737 }
00738 const int on = 1;
00739 if (setsockopt(fd(), SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0)
00740 log_err("ExternalRouter::ModuleServer::ModuleServer(): "
00741 "Failed to set SO_REUSEADDR: %s", strerror(errno));
00742 bind(htonl(INADDR_ALLRTRS_GROUP), ExternalRouter::server_port);
00743
00744
00745 ip_mreq mreq;
00746 mreq.imr_multiaddr.s_addr = htonl(INADDR_ALLRTRS_GROUP);
00747 mreq.imr_interface.s_addr = htonl(INADDR_LOOPBACK);
00748 if (setsockopt(fd(), IPPROTO_IP, IP_ADD_MEMBERSHIP,
00749 &mreq, sizeof(mreq)) < 0)
00750 log_err("ExternalRouter::ModuleServer::ModuleServer(): "
00751 "Failed to join multicast group: %s", strerror(errno));
00752
00753
00754 in_addr src_if;
00755 src_if.s_addr = htonl(INADDR_LOOPBACK);
00756 if (setsockopt(fd(), IPPROTO_IP, IP_MULTICAST_IF,
00757 &src_if, sizeof(src_if)) < 0)
00758 log_err("ExternalRouter::ModuleServer::ModuleServer(): "
00759 "Failed to set IP_MULTICAST_IF: %s", strerror(errno));
00760
00761
00762 Thread::set_flag(Thread::DELETE_ON_EXIT);
00763
00764 set_logfd(false);
00765
00766 eventq = new oasys::MsgQueue< std::string * >(logpath_, lock_);
00767 }
00768
00769 ExternalRouter::ModuleServer::~ModuleServer()
00770 {
00771
00772 std::string *event;
00773 while (eventq->try_pop(&event))
00774 delete event;
00775
00776 delete eventq;
00777 }
00778
00779
00780 void
00781 ExternalRouter::ModuleServer::run()
00782 {
00783
00784
00785 struct pollfd pollfds[2];
00786
00787 struct pollfd* event_poll = &pollfds[0];
00788 event_poll->fd = eventq->read_fd();
00789 event_poll->events = POLLIN;
00790 event_poll->revents = 0;
00791
00792 struct pollfd* sock_poll = &pollfds[1];
00793 sock_poll->fd = fd();
00794 sock_poll->events = POLLIN;
00795 sock_poll->revents = 0;
00796
00797 while (1) {
00798 if (should_stop()) return;
00799
00800
00801 int ret = oasys::IO::poll_multiple(pollfds, 2, -1,
00802 get_notifier());
00803
00804 if (ret == oasys::IOINTR) {
00805 log_debug("module server interrupted");
00806 set_should_stop();
00807 continue;
00808 }
00809
00810 if (ret == oasys::IOERROR) {
00811 log_debug("module server error");
00812 set_should_stop();
00813 continue;
00814 }
00815
00816
00817 if (event_poll->revents & POLLIN) {
00818 std::string *event;
00819 if (eventq->try_pop(&event)) {
00820 ASSERT(event != NULL)
00821 sendto(const_cast< char * >(event->c_str()),
00822 event->size(), 0,
00823 htonl(INADDR_ALLRTRS_GROUP),
00824 ExternalRouter::server_port);
00825 delete event;
00826 }
00827 }
00828
00829 if (sock_poll->revents & POLLIN) {
00830 char buf[MAX_UDP_PACKET];
00831 in_addr_t raddr;
00832 u_int16_t rport;
00833 int bytes;
00834
00835 bytes = recvfrom(buf, MAX_UDP_PACKET, 0, &raddr, &rport);
00836 buf[bytes] = '\0';
00837
00838 process_action(buf);
00839 }
00840 }
00841 }
00842
00843
00844 void
00845 ExternalRouter::ModuleServer::process_action(const char *payload)
00846 {
00847
00848 parser_->reset_error();
00849
00850
00851 const xercesc::DOMDocument *doc = parser_->doc(payload);
00852
00853
00854 if (parser_->error()) {
00855 log_debug("received invalid message");
00856 return;
00857 }
00858
00859 std::auto_ptr<bpa> instance;
00860
00861 try {
00862 instance = bpa_(*doc);
00863 }
00864 CATCH(xml_schema::expected_element)
00865 CATCH(xml_schema::unexpected_element)
00866 CATCH(xml_schema::expected_attribute)
00867 CATCH(xml_schema::unexpected_enumerator)
00868 CATCH(xml_schema::no_type_info)
00869 CATCH(xml_schema::not_derived)
00870
00871
00872 if (instance.get() == 0)
00873 return;
00874
00875
00876
00877
00878
00879 if (instance->send_bundle_request().present()) {
00880 log_debug("posting BundleSendRequest");
00881 send_bundle_request& in_request = instance->send_bundle_request().get();
00882
00883 gbofIdType id = in_request.gbof_id();
00884 BundleTimestamp local_id;
00885 local_id.seconds_ = in_request.local_id() >> 32;
00886 local_id.seqno_ = in_request.local_id() & 0xffffffff;
00887 std::string link = in_request.link_id();
00888 int action = convert_fwd_action(in_request.fwd_action());
00889
00890 GbofId gbof_id;
00891 gbof_id.source_ = EndpointID( id.source().uri() );
00892 gbof_id.creation_ts_.seconds_ = id.creation_ts() >> 32;
00893 gbof_id.creation_ts_.seqno_ = id.creation_ts() & 0xffffffff;
00894 gbof_id.is_fragment_ = id.is_fragment();
00895 gbof_id.frag_length_ = id.frag_length();
00896 gbof_id.frag_offset_ = id.frag_offset();
00897
00898 BundleDaemon *bd = BundleDaemon::instance();
00899 log_debug("pending_bundles size %zu", bd->pending_bundles()->size());
00900 BundleRef br = bd->pending_bundles()->find(gbof_id, local_id);
00901 if (br.object()) {
00902 BundleSendRequest *request = new BundleSendRequest(br, link, action);
00903
00904
00905
00906 LinkRef link_ref = bd->contactmgr()->find_link(link.c_str());
00907 if (link_ref == NULL) {
00908 if (in_request.metadata_block().size() > 0) {
00909 log_err("link %s does not exist; failed to "
00910 "modify/generate metadata for send bundle request",
00911 link.c_str());
00912 }
00913
00914 } else if (in_request.metadata_block().size() > 0) {
00915
00916 typedef ::xsd::cxx::tree::sequence<rtrmessage::metadataBlockType>
00917 MetaBlockSequence;
00918
00919 MetadataBlockProcessor* meta_processor =
00920 dynamic_cast<MetadataBlockProcessor*>(
00921 BundleProtocol::find_processor(
00922 BundleProtocol::METADATA_BLOCK));
00923 ASSERT(meta_processor != NULL);
00924
00925 oasys::ScopeLock bundle_lock(br->lock(), "ExternalRouter");
00926
00927 MetaBlockSequence::const_iterator block_i;
00928 for (block_i = in_request.metadata_block().begin();
00929 block_i != in_request.metadata_block().end();
00930 ++block_i) {
00931
00932 if (!block_i->generated()) {
00933
00934 MetadataBlockRef existing("ExternalRouter metadata block search");
00935 for (unsigned int i = 0;
00936 i < br->recv_metadata().size(); ++i) {
00937 if (br->recv_metadata()[i]->id() ==
00938 block_i->identifier()) {
00939 existing = br->recv_metadata()[i];
00940 break;
00941 }
00942 }
00943
00944 if (existing != NULL) {
00945
00946
00947 oasys::ScopeLock metadata_lock(existing->lock(),
00948 "ExternalRouter");
00949
00950
00951
00952 if (block_i->contents().size() == 0) {
00953 existing->remove_outgoing_metadata(link_ref);
00954 log_info("Removing metadata block %u from bundle "
00955 "%u on link %s", block_i->identifier(),
00956 br->bundleid(), link.c_str());
00957 }
00958
00959
00960
00961 else {
00962 log_info("Modifying metadata block %u on bundle "
00963 "%u on link %s", block_i->identifier(),
00964 br->bundleid(), link.c_str());
00965 existing->modify_outgoing_metadata(
00966 link_ref,
00967 (u_char*)block_i->contents().data(),
00968 block_i->contents().size());
00969 }
00970 continue;
00971 }
00972
00973 ASSERT(existing == NULL);
00974
00975 LinkRef null_link("ExternalRouter::process_action");
00976 MetadataVec * nulldata = br->generated_metadata().
00977 find_blocks(null_link);
00978 if (nulldata != NULL) {
00979 for (unsigned int i = 0; i < nulldata->size(); ++i) {
00980 if ((*nulldata)[i]->id() == block_i->identifier()) {
00981 existing = (*nulldata)[i];
00982 break;
00983 }
00984 }
00985 }
00986
00987 if (existing != NULL) {
00988 MetadataVec * link_vec = br->generated_metadata().
00989 find_blocks(link_ref);
00990 if (link_vec == NULL) {
00991 link_vec = br->mutable_generated_metadata()->
00992 create_blocks(link_ref);
00993 }
00994 ASSERT(link_vec != NULL);
00995
00996 ASSERT(existing->ontology() == block_i->type());
00997
00998 MetadataBlock * meta_block =
00999 new MetadataBlock(
01000 existing->id(),
01001 block_i->type(),
01002 (u_char *)block_i->contents().data(),
01003 block_i->contents().size());
01004 meta_block->set_flags(existing->flags());
01005
01006 link_vec->push_back(meta_block);
01007
01008 log_info("Adding a metadata block to bundle %u on "
01009 "link %s", br->bundleid(), link.c_str());
01010 continue;
01011 }
01012
01013 log_err("bundle %u does not have a block %u",
01014 br->bundleid(), block_i->identifier());
01015
01016 } else {
01017 ASSERT(block_i->generated());
01018
01019 MetadataVec* vec =
01020 br->generated_metadata().find_blocks(link_ref);
01021 if (vec == NULL)
01022 vec = br->mutable_generated_metadata()->create_blocks(link_ref);
01023
01024 MetadataBlock* meta_block = new MetadataBlock(
01025 block_i->type(),
01026 (u_char*)block_i->contents().data(),
01027 block_i->contents().size());
01028
01029 vec->push_back(meta_block);
01030 log_info("Adding an metadata block to bundle %u on "
01031 "link %s", br->bundleid(), link.c_str());
01032 }
01033 }
01034 }
01035
01036 BundleDaemon::post(request);
01037 }
01038 else {
01039 log_warn("attempt to send nonexistent bundle %s",
01040 gbof_id.str().c_str());
01041 }
01042 }
01043
01044 if (instance->open_link_request().present()) {
01045 BundleDaemon *bd = BundleDaemon::instance();
01046 log_debug("posting LinkStateChangeRequest");
01047
01048 std::string lstr =
01049 instance->open_link_request().get().link_id();
01050 LinkRef link = bd->contactmgr()->find_link(lstr.c_str());
01051
01052 if (link.object() != 0) {
01053 BundleDaemon::post(
01054 new LinkStateChangeRequest(link, Link::OPENING,
01055 ContactEvent::NO_INFO));
01056 } else {
01057 log_warn("attempt to open link %s that doesn't exist!",
01058 lstr.c_str());
01059 }
01060 }
01061
01062 if (instance->close_link_request().present()) {
01063 BundleDaemon *bd = BundleDaemon::instance();
01064 log_debug("posting LinkStateChangeRequest");
01065
01066 std::string lstr =
01067 instance->close_link_request().get().link_id();
01068 LinkRef link = bd->contactmgr()->find_link(lstr.c_str());
01069
01070 if (link.object() != 0) {
01071 BundleDaemon::post(
01072 new LinkStateChangeRequest(link, Link::CLOSED,
01073 ContactEvent::NO_INFO));
01074 } else {
01075 log_warn("attempt to close link %s that doesn't exist!",
01076 lstr.c_str());
01077 }
01078 }
01079
01080 if (instance->add_link_request().present()) {
01081 log_debug("posting LinkCreateRequest");
01082
01083 rtrmessage::add_link_request request
01084 = instance->add_link_request().get();
01085 std::string clayer = request.clayer();
01086 ConvergenceLayer *cl = ConvergenceLayer::find_clayer(clayer.c_str());
01087 if (!cl) {
01088 log_warn("attempt to create link using non-existent CLA %s",
01089 clayer.c_str());
01090 }
01091 else {
01092 std::string name = request.link_id();
01093 Link::link_type_t type = convert_link_type( request.link_type() );
01094 std::string eid = request.remote_eid().uri();
01095
01096 AttributeVector params;
01097
01098 if (request.link_config_params().present()) {
01099 linkConfigType::cl_params::container
01100 c = request.link_config_params().get().cl_params();
01101 linkConfigType::cl_params::container::iterator iter;
01102
01103 for (iter = c.begin(); iter < c.end(); iter++) {
01104 if (iter->bool_value().present())
01105 params.push_back(NamedAttribute(iter->name(),
01106 iter->bool_value()));
01107 else if (iter->u_int_value().present())
01108 params.push_back(NamedAttribute(iter->name(),
01109 iter->u_int_value()));
01110 else if (iter->int_value().present())
01111 params.push_back(NamedAttribute(iter->name(),
01112 iter->int_value()));
01113 else if (iter->str_value().present())
01114 params.push_back(NamedAttribute(iter->name(),
01115 iter->str_value()));
01116 else
01117 log_warn("unknown value type in key-value pair");
01118 }
01119 }
01120
01121 BundleDaemon::post(
01122 new LinkCreateRequest(name, type, eid, cl, params));
01123 }
01124 }
01125
01126 if (instance->delete_link_request().present()) {
01127 BundleDaemon *bd = BundleDaemon::instance();
01128 log_debug("posting LinkDeleteRequest");
01129
01130 std::string lstr = instance->delete_link_request().get().link_id();
01131 LinkRef link = bd->contactmgr()->find_link(lstr.c_str());
01132
01133 if (link.object() != 0) {
01134 BundleDaemon::post(new LinkDeleteRequest(link));
01135 } else {
01136 log_warn("attempt to delete link %s that doesn't exist!",
01137 lstr.c_str());
01138 }
01139 }
01140
01141 if (instance->reconfigure_link_request().present()) {
01142 BundleDaemon *bd = BundleDaemon::instance();
01143 log_debug("posting LinkReconfigureRequest");
01144
01145 rtrmessage::reconfigure_link_request request
01146 = instance->reconfigure_link_request().get();
01147 std::string lstr = request.link_id();
01148 LinkRef link = bd->contactmgr()->find_link(lstr.c_str());
01149
01150 rtrmessage::linkConfigType request_params=request.link_config_params();
01151
01152 if (link.object() != 0) {
01153 AttributeVector params;
01154 if (request_params.is_usable().present()) {
01155 params.push_back(NamedAttribute("is_usable", request_params.is_usable().get()));
01156 }
01157 if (request_params.reactive_frag_enabled().present()) {
01158
01159 params.push_back(NamedAttribute("reactive_fragment", request_params.reactive_frag_enabled().get()));
01160 }
01161 if (request_params.nexthop().present()) {
01162 params.push_back(NamedAttribute("nexthop", request_params.nexthop().get()));
01163 }
01164
01165 if (request_params.min_retry_interval().present()) {
01166 params.push_back(
01167 NamedAttribute("min_retry_interval",
01168 request_params.min_retry_interval().get()));
01169 }
01170 if (request_params.max_retry_interval().present()) {
01171 params.push_back(
01172 NamedAttribute("max_retry_interval",
01173 request_params.max_retry_interval().get()));
01174 }
01175 if (request_params.idle_close_time().present()) {
01176 params.push_back(
01177 NamedAttribute("idle_close_time",
01178 request_params.idle_close_time().get()));
01179 }
01180
01181 linkConfigType::cl_params::container
01182 c = request_params.cl_params();
01183 linkConfigType::cl_params::container::iterator iter;
01184
01185 for (iter = c.begin(); iter < c.end(); iter++) {
01186 if (iter->bool_value().present())
01187 params.push_back(NamedAttribute(iter->name(),
01188 iter->bool_value()));
01189 else if (iter->u_int_value().present())
01190 params.push_back(NamedAttribute(iter->name(),
01191 iter->u_int_value()));
01192 else if (iter->int_value().present())
01193 params.push_back(NamedAttribute(iter->name(),
01194 iter->int_value()));
01195 else if (iter->str_value().present())
01196 params.push_back(NamedAttribute(iter->name(),
01197 iter->str_value()));
01198 else
01199 log_warn("unknown value type in key-value pair");
01200 }
01201
01202 BundleDaemon::post(new LinkReconfigureRequest(link, params));
01203 } else {
01204 log_warn("attempt to reconfigure link %s that doesn't exist!",
01205 lstr.c_str());
01206 }
01207 }
01208
01209 if (instance->inject_bundle_request().present()) {
01210 log_debug("posting BundleInjectRequest");
01211
01212 inject_bundle_request fields = instance->inject_bundle_request().get();
01213
01214
01215 BundleInjectRequest *request = new BundleInjectRequest();
01216
01217 request->src_ = fields.source().uri();
01218 request->dest_ = fields.dest().uri();
01219 request->link_ = fields.link_id();
01220 request->request_id_ = fields.request_id();
01221
01222 request->payload_file_ = fields.payload_file();
01223
01224 if(fields.replyto().present())
01225 request->replyto_ = fields.replyto().get().uri();
01226 else
01227 request->replyto_ = "";
01228
01229 if(fields.custodian().present())
01230 request->custodian_ = fields.custodian().get().uri();
01231 else
01232 request->custodian_ = "";
01233
01234 if(fields.priority().present())
01235 request->priority_ = convert_priority( fields.priority().get() );
01236 else
01237 request->priority_ = Bundle::COS_BULK;
01238
01239 if(fields.expiration().present())
01240 request->expiration_ = fields.expiration().get();
01241 else
01242 request->expiration_ = 0;
01243
01244 BundleDaemon::post(request);
01245 }
01246
01247 if (instance->cancel_bundle_request().present()) {
01248 log_debug("posting BundleCancelRequest");
01249
01250 gbofIdType id =
01251 instance->cancel_bundle_request().get().gbof_id();
01252 BundleTimestamp local_id;
01253 local_id.seconds_ =
01254 instance->cancel_bundle_request().get().local_id() >> 32;
01255 local_id.seqno_ =
01256 instance->cancel_bundle_request().get().local_id() & 0xffffffff;
01257 std::string link =
01258 instance->cancel_bundle_request().get().link_id();
01259
01260 GbofId gbof_id;
01261 gbof_id.source_ = EndpointID( id.source().uri() );
01262 gbof_id.creation_ts_.seconds_ = id.creation_ts() >> 32;
01263 gbof_id.creation_ts_.seqno_ = id.creation_ts() & 0xffffffff;
01264 gbof_id.is_fragment_ = id.is_fragment();
01265 gbof_id.frag_length_ = id.frag_length();
01266 gbof_id.frag_offset_ = id.frag_offset();
01267
01268 BundleDaemon *bd = BundleDaemon::instance();
01269 log_debug("pending_bundles size %zu", bd->pending_bundles()->size());
01270 BundleRef br = bd->pending_bundles()->find(gbof_id, local_id);
01271 if (br.object()) {
01272 BundleCancelRequest *request = new BundleCancelRequest(br, link);
01273 BundleDaemon::post(request);
01274 }
01275 else {
01276 log_warn("attempt to cancel send of nonexistent bundle %s",
01277 gbof_id.str().c_str());
01278 }
01279 }
01280
01281 if (instance->delete_bundle_request().present()) {
01282 log_debug("posting BundleDeleteRequest");
01283
01284 gbofIdType id =
01285 instance->delete_bundle_request().get().gbof_id();
01286
01287 GbofId gbof_id;
01288 gbof_id.source_ = EndpointID( id.source().uri() );
01289 gbof_id.creation_ts_.seconds_ = id.creation_ts() >> 32;
01290 gbof_id.creation_ts_.seqno_ = id.creation_ts() & 0xffffffff;
01291 gbof_id.is_fragment_ = id.is_fragment();
01292 gbof_id.frag_length_ = id.frag_length();
01293 gbof_id.frag_offset_ = id.frag_offset();
01294 BundleTimestamp local_id;
01295 local_id.seconds_ =
01296 instance->delete_bundle_request().get().local_id() >> 32;
01297 local_id.seqno_ =
01298 instance->delete_bundle_request().get().local_id() & 0xffffffff;
01299
01300 BundleDaemon *bd = BundleDaemon::instance();
01301 log_debug("pending_bundles size %zu", bd->pending_bundles()->size());
01302 BundleRef br = bd->pending_bundles()->find(gbof_id, local_id);
01303 if (br.object()) {
01304 BundleDeleteRequest *request =
01305 new BundleDeleteRequest(br,
01306 BundleProtocol::REASON_NO_ADDTL_INFO);
01307 BundleDaemon::post(request);
01308 }
01309 else {
01310 log_warn("attempt to delete nonexistent bundle %s",
01311 gbof_id.str().c_str());
01312 }
01313 }
01314
01315 if (instance->set_cl_params_request().present()) {
01316 log_debug("posting CLASetParamsRequest");
01317
01318 std::string clayer = instance->set_cl_params_request().get().clayer();
01319 ConvergenceLayer *cl = ConvergenceLayer::find_clayer(clayer.c_str());
01320 if (!cl) {
01321 log_warn("attempt to set parameters for non-existent CLA %s",
01322 clayer.c_str());
01323 }
01324 else {
01325 AttributeVector params;
01326
01327 set_cl_params_request::cl_params::container
01328 c = instance->set_cl_params_request().get().cl_params();
01329 set_cl_params_request::cl_params::container::iterator iter;
01330
01331 for (iter = c.begin(); iter < c.end(); iter++) {
01332 if (iter->bool_value().present())
01333 params.push_back(NamedAttribute(iter->name(),
01334 iter->bool_value()));
01335 else if (iter->u_int_value().present())
01336 params.push_back(NamedAttribute(iter->name(),
01337 iter->u_int_value()));
01338 else if (iter->int_value().present())
01339 params.push_back(NamedAttribute(iter->name(),
01340 iter->int_value()));
01341 else if (iter->str_value().present())
01342 params.push_back(NamedAttribute(iter->name(),
01343 iter->str_value()));
01344 else
01345 log_warn("unknown value type in key-value pair");
01346 }
01347
01348 BundleDaemon::post(new CLASetParamsRequest(cl, params));
01349 }
01350 }
01351
01352 if (instance->bundle_attributes_query().present()) {
01353 log_debug("posting BundleAttributesQueryRequest");
01354 bundle_attributes_query query =
01355 instance->bundle_attributes_query().get();
01356 std::string query_id = query.query_id();
01357 gbofIdType id = query.gbof_id();
01358
01359 GbofId gbof_id;
01360 gbof_id.source_ = EndpointID( id.source().uri() );
01361 gbof_id.creation_ts_.seconds_ = id.creation_ts() >> 32;
01362 gbof_id.creation_ts_.seqno_ = id.creation_ts() & 0xffffffff;
01363 gbof_id.is_fragment_ = id.is_fragment();
01364 gbof_id.frag_length_ = id.frag_length();
01365 gbof_id.frag_offset_ = id.frag_offset();
01366 BundleTimestamp local_id;
01367 local_id.seconds_ = query.local_id() >> 32;
01368 local_id.seqno_ = query.local_id() & 0xffffffff;
01369
01370 BundleDaemon *bd = BundleDaemon::instance();
01371 log_debug("pending_bundles size %zu", bd->pending_bundles()->size());
01372 BundleRef br = bd->pending_bundles()->find(gbof_id, local_id);
01373
01374
01375
01376
01377
01378 if (br.object()) {
01379 AttributeNameVector attribute_names;
01380 MetaBlockRequestVector metadata_blocks;
01381 bundle_attributes_query::query_params::container
01382 c = query.query_params();
01383 bundle_attributes_query::query_params::container::iterator iter;
01384
01385 for (iter = c.begin(); iter != c.end(); iter++) {
01386 bundleAttributesQueryType& q = *iter;
01387
01388 if (q.query().present()) {
01389 attribute_names.push_back( AttributeName(q.query().get()) );
01390 }
01391 if (q.meta_blocks().present()) {
01392 bundleMetaBlockQueryType& block = q.meta_blocks().get();
01393 int query_value = -1;
01394 MetadataBlockRequest::QueryType query_type;
01395
01396 if (block.identifier().present()) {
01397 query_value = block.identifier().get();
01398 query_type = MetadataBlockRequest::QueryByIdentifier;
01399 }
01400
01401 else if (block.type().present()) {
01402 query_value = block.type().get();
01403 query_type = MetadataBlockRequest::QueryByType;
01404 }
01405
01406 if (query_value < 0)
01407 query_type = MetadataBlockRequest::QueryAll;
01408
01409 metadata_blocks.push_back(
01410 MetadataBlockRequest(query_type, query_value) );
01411 }
01412 }
01413
01414 BundleAttributesQueryRequest* request
01415 = new BundleAttributesQueryRequest(query_id, br, attribute_names);
01416
01417 if (metadata_blocks.size() > 0) {
01418 request->metadata_blocks_ = metadata_blocks;
01419 }
01420
01421 BundleDaemon::post(request);
01422 }
01423 else {
01424 log_warn("attempt to query nonexistent bundle %s",
01425 gbof_id.str().c_str());
01426 }
01427 }
01428
01429 if (instance->link_query().present()) {
01430 log_debug("posting LinkQueryRequest");
01431 BundleDaemon::post(new LinkQueryRequest());
01432 }
01433
01434 if (instance->link_attributes_query().present()) {
01435 BundleDaemon *bd = BundleDaemon::instance();
01436 log_debug("posting LinkAttributesQueryRequest");
01437
01438 link_attributes_query query = instance->link_attributes_query().get();
01439 std::string query_id = query.query_id();
01440 std::string lstr = query.link_id();
01441
01442 LinkRef link = bd->contactmgr()->find_link(lstr.c_str());
01443
01444 if (link.object() != 0) {
01445 AttributeNameVector attribute_names;
01446
01447 link_attributes_query::query_params::container
01448 c = query.query_params();
01449 link_attributes_query::query_params::container::iterator iter;
01450
01451 for (iter = c.begin(); iter < c.end(); iter++) {
01452 attribute_names.push_back( AttributeName(*iter) );
01453 }
01454
01455 BundleDaemon::post(new LinkAttributesQueryRequest(query_id,
01456 link,
01457 attribute_names));
01458 } else {
01459 log_warn("attempt to query attributes of link %s that doesn't exist!",
01460 lstr.c_str());
01461 }
01462 }
01463
01464 if (instance->bundle_query().present()) {
01465 log_debug("posting BundleQueryRequest");
01466 BundleDaemon::post(new BundleQueryRequest());
01467 }
01468
01469 if (instance->contact_query().present()) {
01470 log_debug("posting ContactQueryRequest");
01471 BundleDaemon::post(new ContactQueryRequest());
01472 }
01473
01474 if (instance->route_query().present()) {
01475 log_debug("posting RouteQueryRequest");
01476 BundleDaemon::post(new RouteQueryRequest());
01477 }
01478
01479
01480 if (instance->deliver_bundle_to_app_request().present()) {
01481 log_debug("posting DeliverBundleToAppRequest");
01482 deliver_bundle_to_app_request& in_request =
01483 instance->deliver_bundle_to_app_request().get();
01484
01485 eidType reg = in_request.endpoint();
01486 EndpointID reg_eid;
01487 reg_eid.assign(reg.uri());
01488
01489 gbofIdType id = in_request.gbof_id();
01490 GbofId gbof_id;
01491 gbof_id.source_ = EndpointID( id.source().uri() );
01492 gbof_id.creation_ts_.seconds_ = id.creation_ts() >> 32;
01493 gbof_id.creation_ts_.seqno_ = id.creation_ts() & 0xffffffff;
01494 gbof_id.is_fragment_ = id.is_fragment();
01495 gbof_id.frag_length_ = id.frag_length();
01496 gbof_id.frag_offset_ = id.frag_offset();
01497 BundleTimestamp local_id;
01498 local_id.seconds_ = in_request.local_id() >> 32;
01499 local_id.seqno_ = in_request.local_id() & 0xffffffff;
01500
01501 BundleDaemon *bd = BundleDaemon::instance();
01502 log_debug("pending_bundles size %zu", bd->pending_bundles()->size());
01503 BundleRef br = bd->pending_bundles()->find(gbof_id, local_id);
01504
01505 if (br.object()) {
01506 bd->check_and_deliver_to_registrations(br.object(), reg_eid);
01507 }
01508 else {
01509 log_warn("attempt to deliver nonexistent bundle %s to app %s",
01510 gbof_id.str().c_str(), reg_eid.c_str());
01511 }
01512 }
01513 }
01514
01515 Link::link_type_t
01516 ExternalRouter::ModuleServer::convert_link_type(rtrmessage::linkTypeType type)
01517 {
01518 if(type==linkTypeType(linkTypeType::alwayson)){
01519 return Link::ALWAYSON;
01520 }
01521 else if(type==linkTypeType(linkTypeType::ondemand)){
01522 return Link::ONDEMAND;
01523 }
01524 else if(type==linkTypeType(linkTypeType::scheduled)){
01525 return Link::SCHEDULED;
01526 }
01527 else if(type==linkTypeType(linkTypeType::opportunistic)){
01528 return Link::OPPORTUNISTIC;
01529 }
01530
01531 return Link::LINK_INVALID;
01532 }
01533
01534 ForwardingInfo::action_t
01535 ExternalRouter::ModuleServer::convert_fwd_action(rtrmessage::bundleForwardActionType action)
01536 {
01537 if(action==bundleForwardActionType(bundleForwardActionType::forward)){
01538 return ForwardingInfo::FORWARD_ACTION;
01539 }
01540 else if(action== bundleForwardActionType(bundleForwardActionType::copy)){
01541 return ForwardingInfo::COPY_ACTION;
01542 }
01543
01544 return ForwardingInfo::INVALID_ACTION;
01545 }
01546
01547 Bundle::priority_values_t
01548 ExternalRouter::ModuleServer::convert_priority(rtrmessage::bundlePriorityType priority)
01549 {
01550 if(priority == bundlePriorityType(bundlePriorityType::bulk)){
01551 return Bundle::COS_BULK;
01552 }
01553 else if(priority == bundlePriorityType(bundlePriorityType::normal)){
01554 return Bundle::COS_NORMAL;
01555 }
01556 else if(priority == bundlePriorityType(bundlePriorityType::expedited)){
01557 return Bundle::COS_EXPEDITED;
01558 }
01559
01560 return Bundle::COS_INVALID;
01561 }
01562
01563 ExternalRouter::HelloTimer::HelloTimer(ExternalRouter *router)
01564 : router_(router)
01565 {
01566 }
01567
01568 ExternalRouter::HelloTimer::~HelloTimer()
01569 {
01570 cancel();
01571 }
01572
01573
01574 void
01575 ExternalRouter::HelloTimer::timeout(const struct timeval &)
01576 {
01577 bpa message;
01578 message.hello_interval(ExternalRouter::hello_interval);
01579 router_->send(message);
01580 schedule_in(ExternalRouter::hello_interval * 1000);
01581 }
01582
01583 ExternalRouter::ERRegistration::ERRegistration(ExternalRouter *router)
01584 : Registration(Registration::EXTERNALROUTER_REGID,
01585 EndpointID(BundleDaemon::instance()->local_eid().str() +
01586 EXTERNAL_ROUTER_SERVICE_TAG),
01587 Registration::DEFER, 0, 0),
01588 router_(router)
01589 {
01590 logpathf("/reg/admin");
01591
01592 BundleDaemon::post(new RegistrationAddedEvent(this, EVENTSRC_ADMIN));
01593 }
01594
01595
01596 void
01597 ExternalRouter::ERRegistration::deliver_bundle(Bundle *bundle)
01598 {
01599 bundle_delivery_event e(bundle, bundle,
01600 bundle_ts_to_long(bundle->extended_id()));
01601
01602 e.bundle().payload_file( bundle->payload().filename() );
01603
01604 bpa message;
01605 message.bundle_delivery_event(e);
01606 router_->send(message);
01607
01608 BundleDaemon::post(new BundleDeliveredEvent(bundle, this));
01609 }
01610
01611
01612 void external_rtr_shutdown(void *)
01613 {
01614 BundleDaemon::instance()->router()->shutdown();
01615 }
01616
01617
01618 u_int16_t ExternalRouter::server_port = 8001;
01619 u_int16_t ExternalRouter::hello_interval = 30;
01620 std::string ExternalRouter::schema = INSTALL_SYSCONFDIR "/router.xsd";
01621 bool ExternalRouter::server_validation = true;
01622 bool ExternalRouter::client_validation = false;
01623
01624 }
01625 #endif // XERCES_C_ENABLED && EXTERNAL_DP_ENABLED