00001 #include <config.h>
00002 #ifdef OASYS_BLUETOOTH_ENABLED
00003
00004 #ifndef MIN
00005 # define MIN(x, y) ((x)<(y) ? (x) : (y))
00006 #endif
00007
00008 #include <sys/poll.h>
00009 #include <stdlib.h>
00010
00011 #include <netinet/in.h>
00012 #include <bluetooth/bluetooth.h>
00013 #include <errno.h>
00014 extern int errno;
00015
00016 #include <oasys/thread/Notifier.h>
00017 #include <oasys/thread/Timer.h>
00018 #include <oasys/util/StringBuffer.h>
00019 #include <oasys/bluez/Bluetooth.h>
00020 #include <oasys/bluez/BluetoothSocket.h>
00021 #include <oasys/bluez/RFCOMMClient.h>
00022 #include <oasys/util/OptParser.h>
00023 #include <oasys/util/ScratchBuffer.h>
00024 #include <oasys/util/Random.h>
00025
00026 #include "BluetoothConvergenceLayer.h"
00027 #include "bundling/Bundle.h"
00028 #include "bundling/AnnounceBundle.h"
00029 #include "bundling/BundleEvent.h"
00030 #include "bundling/BundleDaemon.h"
00031 #include "bundling/BundleList.h"
00032 #include "bundling/BundleProtocol.h"
00033 #include "bundling/SDNV.h"
00034 #include "contacts/ContactManager.h"
00035 #include "routing/BundleRouter.h"
00036
00037 namespace dtn {
00038
00039 struct BluetoothConvergenceLayer::Params BluetoothConvergenceLayer::defaults_;
00040 struct BluetoothConvergenceLayer::ConnectionManager BluetoothConvergenceLayer::connections_;
00041
00042
00043
00044
00045
00046
00047
00048
00049 BluetoothConvergenceLayer::BluetoothConvergenceLayer() :
00050 ConvergenceLayer("BluetoothConvergenceLayer", "bt")
00051 {
00052
00053
00054 bacpy(&defaults_.local_addr_,BDADDR_ANY);
00055 bacpy(&defaults_.remote_addr_,BDADDR_ANY);
00056 defaults_.hcidev_ = "hci0";
00057 defaults_.bundle_ack_enabled_ = true;
00058 defaults_.partial_ack_len_ = 1024;
00059 defaults_.writebuf_len_ = 32768;
00060 defaults_.readbuf_len_ = 32768;
00061 defaults_.keepalive_interval_ = 30;
00062 defaults_.retry_interval_ = 5;
00063 defaults_.min_retry_interval_ = 5;
00064 defaults_.max_retry_interval_ = 10 * 60;
00065 defaults_.rtt_timeout_ = 30000;
00066 defaults_.neighbor_poll_interval_ = 0;
00067 }
00068
00072 bool
00073 BluetoothConvergenceLayer::parse_params(Params* params, int argc, const char** argv,
00074 const char** invalidp)
00075 {
00076 oasys::OptParser p;
00077
00078 p.addopt(new oasys::BdAddrOpt("local_addr", ¶ms->local_addr_));
00079 p.addopt(new oasys::BdAddrOpt("remote_addr", ¶ms->remote_addr_));
00080 p.addopt(new oasys::StringOpt("hcidev", ¶ms->hcidev_));
00081 p.addopt(new oasys::BoolOpt("bundle_ack_enabled",
00082 ¶ms->bundle_ack_enabled_));
00083 p.addopt(new oasys::UIntOpt("partial_ack_len", ¶ms->partial_ack_len_));
00084 p.addopt(new oasys::UIntOpt("writebuf_len", ¶ms->writebuf_len_));
00085 p.addopt(new oasys::UIntOpt("readbuf_len", ¶ms->readbuf_len_));
00086 p.addopt(new oasys::UIntOpt("keepalive_interval",
00087 ¶ms->keepalive_interval_));
00088 p.addopt(new oasys::UIntOpt("min_retry_interval",
00089 ¶ms->min_retry_interval_));
00090 p.addopt(new oasys::UIntOpt("max_retry_interval",
00091 ¶ms->max_retry_interval_));
00092 p.addopt(new oasys::UIntOpt("rtt_timeout", ¶ms->rtt_timeout_));
00093 p.addopt(new oasys::UIntOpt("neighbor_poll_interval",
00094 ¶ms->neighbor_poll_interval_));
00095
00096 if (! p.parse(argc, argv, invalidp)) {
00097 return false;
00098 }
00099
00100 return true;
00101 }
00102
00106 bool
00107 BluetoothConvergenceLayer::interface_up(Interface* iface,
00108 int argc, const char* argv[])
00109 {
00110 log_debug("adding interface %s",iface->name().c_str());
00111
00112 Params *params = new Params(defaults_);
00113 const char* invalid;
00114 if (!parse_params(params, argc, argv, &invalid)) {
00115 log_err("error parsing interface options: invalid option '%s'",
00116 invalid);
00117 return false;
00118 }
00119
00120
00121 if (bacmp(¶ms->local_addr_,BDADDR_ANY) == 0 ) {
00122
00123 oasys::Bluetooth::hci_get_bdaddr(params->hcidev_.c_str(),
00124 ¶ms->local_addr_);
00125 if (bacmp(¶ms->local_addr_,BDADDR_ANY) == 0 ) {
00126
00127 log_err("invalid local address setting of BDADDR_ANY");
00128 return false;
00129 }
00130 }
00131
00132
00133
00134
00135
00136 Listener* receiver = connections_.listener(this,params);
00137 receiver->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
00138
00139
00140 if (receiver->rc_bind() != 0)
00141 return false;
00142
00143 if (receiver->listen() != 0)
00144 return false;
00145
00146 receiver->start();
00147
00148
00149 if (params->neighbor_poll_interval_ > 0) {
00150 log_debug("Starting up neighbor polling with interval=%d",
00151 params->neighbor_poll_interval_);
00152 receiver->nd_ = new NeighborDiscovery(this,params);
00153 receiver->nd_->logpathf("%s/neighbordiscovery",logpath_);
00154 receiver->nd_->start();
00155 }
00156
00157
00158
00159 iface->set_cl_info(receiver);
00160
00161 return true;
00162 }
00163
00167 bool
00168 BluetoothConvergenceLayer::interface_down(Interface* iface)
00169 {
00170
00171
00172
00173
00174 Listener* receiver = (Listener*)iface->cl_info();
00175 receiver->set_should_stop();
00176 receiver->interrupt_from_io();
00177
00178 while (! receiver->is_stopped()) {
00179 oasys::Thread::yield();
00180 }
00181 receiver->close();
00182
00183 if (receiver->nd_ != NULL) {
00184 receiver->nd_->set_should_stop();
00185 receiver->nd_->interrupt();
00186 while (! receiver->nd_->is_stopped()) {
00187 oasys::Thread::yield();
00188 }
00189 delete receiver->nd_;
00190 receiver->nd_ = NULL;
00191 }
00192
00193 bool res = connections_.del_listener(receiver);
00194 delete receiver;
00195 return res;
00196 }
00197
00201 void
00202 BluetoothConvergenceLayer::dump_interface(Interface* iface,
00203 oasys::StringBuffer* buf)
00204 {
00205 Params* params = &((Listener*)iface->cl_info())->params_;
00206
00207 char buff[18];
00208 buf->appendf("\tlocal_addr: %s device: %s\n",
00209 oasys::Bluetooth::batostr(¶ms->local_addr_,buff),
00210 params->hcidev_.c_str());
00211
00212 if (bacmp(¶ms->remote_addr_,BDADDR_ANY) != 0)
00213 buf->appendf("\tremote_addr: %s\n",
00214 oasys::Bluetooth::batostr(¶ms->remote_addr_,buff));
00215
00216 if (params->neighbor_poll_interval_ > 0)
00217 buf->appendf("\tneighbor_poll_interval: %d\n",
00218 params->neighbor_poll_interval_);
00219 }
00220
00227 bool
00228 BluetoothConvergenceLayer::init_link(Link* link, int argc, const char* argv[])
00229 {
00230 bdaddr_t addr;
00231
00232 log_debug("adding %s link %s", link->type_str(), link->nexthop());
00233
00234
00235 if (! parse_nexthop(link->nexthop(), &addr)) {
00236 log_err("invalid next hop address '%s'", link->nexthop());
00237 return false;
00238 }
00239
00240
00241 if (bacmp(&addr,BDADDR_ANY) == 0 ) {
00242 log_err("invalid host in next hop address '%s'", link->nexthop());
00243 return false;
00244 }
00245
00246
00247
00248 Params* params = new Params(defaults_);
00249 const char* invalid;
00250 if (! parse_params(params, argc, argv, &invalid)) {
00251 log_err("error parsing link options: invalid option '%s'", invalid);
00252 delete params;
00253 return false;
00254 }
00255
00256
00257 if (bacmp(¶ms->local_addr_,BDADDR_ANY) == 0) {
00258
00259 oasys::Bluetooth::hci_get_bdaddr(params->hcidev_.c_str(),
00260 ¶ms->local_addr_);
00261 if (bacmp(¶ms->local_addr_,BDADDR_ANY) == 0) {
00262 log_err("invalid local address setting of BDADDR_ANY");
00263 return false;
00264 }
00265 }
00266
00267 char buff[18];
00268 link->set_local(oasys::Bluetooth::batostr(¶ms->local_addr_,buff));
00269
00270
00271 if (link->type() == Link::OPPORTUNISTIC) {
00272 delete params;
00273 return true;
00274 }
00275
00276
00277 params->retry_interval_ = link->retry_interval_;
00278 params->min_retry_interval_ = link->params().min_retry_interval_;
00279 params->max_retry_interval_ = link->params().max_retry_interval_;
00280
00281 link->set_cl_info(params);
00282
00283 return true;
00284 }
00285
00289 void
00290 BluetoothConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00291 {
00292 Params* params = (Params*) link->cl_info();
00293
00294 char buff[18];
00295 buf->appendf("\tlocal_addr: %s\n",
00296 oasys::Bluetooth::batostr(¶ms->local_addr_,buff));
00297 buf->appendf("\tremote_addr: %s\n",
00298 oasys::Bluetooth::batostr(¶ms->remote_addr_,buff));
00299 }
00300
00305 bool
00306 BluetoothConvergenceLayer::open_contact(const ContactRef& contact)
00307 {
00308 bdaddr_t addr;
00309
00310 Link* link = contact->link();
00311 log_debug("opening contact on link *%p", link);
00312
00313
00314
00315
00316 bool valid = parse_nexthop(link->nexthop(),&addr);
00317 ASSERT(valid == true);
00318 ASSERT(bacmp(&addr,BDADDR_ANY) != 0);
00319
00320 Params* params = (Params*)link->cl_info();
00321
00322
00323
00324 Connection* sender = connections_.connection(this,addr,params);
00325
00326 if (sender == NULL) return false;
00327
00328
00329 sender->set_contact(contact);
00330 sender->start();
00331
00332 return true;
00333 }
00334
00338 bool
00339 BluetoothConvergenceLayer::close_contact(const ContactRef& contact)
00340 {
00341 Connection* sender = (Connection*)contact->cl_info();
00342 log_info("close_contact *%p", contact.object());
00343
00344 if (sender != NULL) {
00345
00346 if (!sender->is_stopped() && !sender->should_stop()) {
00347 log_debug("interrupting connection thread");
00348 sender->set_should_stop();
00349 sender->interrupt_from_io();
00350 oasys::Thread::yield();
00351 }
00352
00353
00354
00355
00356
00357
00358
00359 while (contact->cl_info() != NULL) {
00360 log_debug("waiting for connection thread to stop...");
00361 usleep(100000);
00362 oasys::Thread::yield();
00363 }
00364
00365 log_debug("connection thread stopped...");
00366 }
00367
00368 return true;
00369 }
00370
00375 void BluetoothConvergenceLayer::send_bundle(const ContactRef& contact,Bundle* bundle)
00376 {
00377 log_info("send_bundle *%p to *%p",bundle,contact.object());
00378 Connection* sender = (Connection*)contact->cl_info();
00379
00380 ASSERT(sender != NULL);
00381
00382
00383 if ((sender->queue_->size() + 1) >= 3)
00384 contact->link()->set_state(Link::BUSY);
00385
00386 sender->queue_->push_back(bundle);
00387 }
00388
00389 bool
00390 BluetoothConvergenceLayer::parse_nexthop(const char* nexthop,
00391 bdaddr_t* addr)
00392 {
00393 std::string tmp;
00394 bdaddr_t ba;
00395 const char* p = nexthop;
00396 int numcolons = 5;
00397
00398 while (numcolons > 0) {
00399 p = strchr(p+1, ':');
00400 if (p != NULL) {
00401 numcolons--;
00402 } else {
00403 log_warn("bad format for remote Bluetooth address: '%s'",
00404 nexthop);
00405 return false;
00406 }
00407 }
00408 tmp.assign(nexthop, p - nexthop + 3);
00409 oasys::Bluetooth::strtoba(tmp.c_str(),&ba);
00410
00411 bacpy(addr,&ba);
00412 return true;
00413 }
00414
00415
00416
00417
00418
00419
00420
00421 BluetoothConvergenceLayer::Listener*
00422 BluetoothConvergenceLayer::ConnectionManager::listener(
00423 BluetoothConvergenceLayer *cl,
00424 Params* params)
00425 {
00426 ASSERT(params);
00427 ASSERT(bacmp(¶ms->local_addr_,BDADDR_ANY) != 0);
00428
00429 Listener *l = listener(params->local_addr_);
00430 if (l != NULL) return l;
00431
00432
00433 l = new Listener(cl,params);
00434 addListener(l);
00435 return l;
00436 }
00437
00438 BluetoothConvergenceLayer::Listener*
00439 BluetoothConvergenceLayer::ConnectionManager::listener(bdaddr_t& addr)
00440 {
00441 if (bacmp(&addr,BDADDR_ANY) == 0) {
00442 log_info("ConnectionManager::listener called with BDADDR_ANY");
00443 return NULL;
00444 }
00445
00446 char buff[18];
00447 (void)buff;
00448
00449
00450 it_ = l_map_.find(addr);
00451 if( it_ != l_map_.end() ) {
00452 log_debug("Retrieving Listener(%p) for bdaddr %s",
00453 (*it_).second,oasys::Bluetooth::batostr(&addr,buff));
00454 return (*it_).second;
00455 }
00456
00457
00458 log_debug("Nothing found in ConnectionManager for bdaddr %s",
00459 oasys::Bluetooth::batostr(&addr,buff));
00460
00461 return NULL;
00462 }
00463
00464 bool
00465 BluetoothConvergenceLayer::ConnectionManager::delListener(Listener* l)
00466 {
00467 ASSERT(l != NULL);
00468
00469 bdaddr_t addr = l->local_addr();
00470 return (l_map_.erase(addr) > 0);
00471 }
00472
00473 void
00474 BluetoothConvergenceLayer::ConnectionManager::addListener(Listener* l)
00475 {
00476 ASSERT(l != NULL);
00477
00478 char buff[18];
00479 (void)buff;
00480
00481 bdaddr_t addr;
00482 l->local_addr(addr);
00483
00484 ASSERT(bacmp(&addr,BDADDR_ANY) != 0);
00485
00486 log_debug("Adding Listener(%p) for bdaddr %s",
00487 l,oasys::Bluetooth::batostr(&addr,buff));
00488 l_map_[addr]=l;
00489 }
00490
00491 bool
00492 BluetoothConvergenceLayer::ConnectionManager::del_listener(Listener* l)
00493 {
00494 ASSERT(l != NULL);
00495
00496 return delListener(l);
00497 }
00498
00499 BluetoothConvergenceLayer::Connection*
00500 BluetoothConvergenceLayer::ConnectionManager::connection(
00501 BluetoothConvergenceLayer* cl,
00502 bdaddr_t& addr,
00503 Params* params)
00504 {
00505 ASSERT(bacmp(&addr,BDADDR_ANY) != 0);
00506 ASSERT(params != NULL);
00507 ASSERT(bacmp(¶ms->local_addr_,BDADDR_ANY) != 0);
00508
00509
00510 Listener* prev = listener(params->local_addr_);
00511
00512
00513
00514
00515 if (prev != NULL ) {
00516
00517 log_debug("Instantiating new connection");
00518 Connection *c = new Connection(cl,addr,params);
00519 c->listener_ = prev;
00520 return c;
00521
00522 } else {
00523
00524
00525
00526 char buff[18];
00527
00528 for(it_ = l_map_.begin(); it_ != l_map_.end(); it_++) {
00529 Listener *l = (*it_).second;
00530 bdaddr_t ba = (*it_).first;
00531 (void)l;
00532 (void)ba;
00533 log_debug("Listener\tListener %p bdaddr %s",
00534 l,oasys::Bluetooth::batostr(&ba,buff));
00535 }
00536
00537 PANIC("ConnectionManager: new connection requested for %s "
00538 "where no previous listener existed",
00539 oasys::Bluetooth::batostr(&addr,buff));
00540 }
00541
00542
00543 return NULL;
00544 }
00545
00546
00547
00548
00549
00550
00551
00552 BluetoothConvergenceLayer::Listener::Listener(BluetoothConvergenceLayer* cl,
00553 BluetoothConvergenceLayer::Params* params)
00554 : RFCOMMServerThread("/cl/bt/listener",oasys::Thread::INTERRUPTABLE),
00555 params_(*params),
00556 cl_(cl)
00557 {
00558 set_notifier(new oasys::Notifier("/cl/bt/listener"));
00559
00560 ASSERT(bacmp(¶ms->local_addr_,BDADDR_ANY) != 0);
00561
00562 set_local_addr(params->local_addr_);
00563
00564
00565 set_accept_timeout(1000);
00566
00567 logfd_ = false;
00568 nd_ = NULL;
00569 }
00570
00571 void
00572 BluetoothConvergenceLayer::Listener::accepted(int fd, bdaddr_t addr, u_int8_t channel)
00573 {
00574 ASSERT(cl_ != NULL);
00575 Connection *c = new Connection(cl_,fd,addr,channel,¶ms_);
00576 c->listener_ = this;
00577 c->start();
00578
00579
00580 close();
00581 ASSERT(rc_bind() == 0);
00582 ASSERT(listen() == 0);
00583 }
00584
00585
00586
00587
00588
00589
00590
00591
00595 BluetoothConvergenceLayer::Connection::Connection(
00596 BluetoothConvergenceLayer* cl,
00597 bdaddr_t remote_addr,
00598 BluetoothConvergenceLayer::Params* params)
00599 : Thread("BluetoothConvergenceLayer::Connection"),
00600 Logger("BluetoothConvergenceLayer::Connection", "/dtn/cl/bt/conn"),
00601 params_(*params), listener_(NULL), cl_(cl), initiate_(true),
00602 contact_("BluetoothConvergenceLayer::Connection"),
00603 announce_("BluetoothConvergenceLayer::Connection")
00604 {
00605 char buff[18];
00606 logpath_appendf("/%s",oasys::Bluetooth::batostr(&remote_addr,buff));
00607
00608 queue_ = new BlockingBundleList(logpath_);
00609 queue_->notifier()->logpath_appendf("/queue");
00610 Thread::set_flag(Thread::DELETE_ON_EXIT);
00611 sock_ = new oasys::RFCOMMClient();
00612 sock_->set_local_addr(params->local_addr_);
00613 sock_->logpathf("%s/sock",logpath_);
00614 sock_->set_logfd(false);
00615 sock_->set_remote_addr(remote_addr);
00616 sock_->set_notifier(new oasys::Notifier("/cl/bt/active-connection"));
00617 sock_->get_notifier()->logpath_appendf("/sock_notifier");
00618 event_notifier_ = new oasys::Notifier(logpath_);
00619 event_notifier_->logpath_appendf("/event_notifier");
00620 }
00621
00625 BluetoothConvergenceLayer::Connection::Connection(
00626 BluetoothConvergenceLayer* cl,
00627 int fd,
00628 bdaddr_t remote_addr,
00629 u_int8_t channel,
00630 Params* params)
00631 : Thread("BluetoothConvergenceLayer::Connection"),
00632 Logger("BluetoothConvergenceLayer::Connection", "/dtn/cl/bt/conn"),
00633 params_(*params), listener_(NULL), cl_(cl), initiate_(false),
00634 contact_("BluetoothConvergenceLayer::Connection"),
00635 announce_("BluetoothConvergenceLayer::Connection")
00636 {
00637 char buff[18];
00638 logpathf("/dtn/cl/bt/passive-conn/%s:%d", oasys::Bluetooth::batostr(&remote_addr,buff),
00639 channel);
00640 queue_ = NULL;
00641 Thread::set_flag(Thread::DELETE_ON_EXIT);
00642 sock_ = new oasys::RFCOMMClient(fd,remote_addr,channel,logpath_);
00643 sock_->set_local_addr(params->local_addr_);
00644 sock_->logpathf("%s/sock",logpath_);
00645 sock_->set_logfd(false);
00646 sock_->set_notifier(new oasys::Notifier("/cl/bt/passive-connection"));
00647 sock_->get_notifier()->logpath_appendf("/sock_notifier");
00648 event_notifier_ = NULL;
00649 }
00650
00654 BluetoothConvergenceLayer::Connection::~Connection()
00655 {
00656 if (queue_) delete queue_;
00657 delete sock_;
00658 if (event_notifier_) delete event_notifier_;
00659 }
00660
00661
00671 void
00672 BluetoothConvergenceLayer::Connection::run()
00673 {
00674
00675
00676
00677
00678 while (1) {
00679 log_debug("connection main loop starting up...");
00680
00681 if (initiate_)
00682 {
00683 ASSERT(contact_ != NULL);
00684 Link* link = contact_->link();
00685
00686 if (! connect()) {
00687
00688
00689
00690
00691 int param_num_retries = 3;
00692 int param_how_long = 5;
00693
00694 if (link->type() == Link::OPPORTUNISTIC) {
00695 do {
00696 sleep(param_how_long);
00697 if(--param_num_retries == 0) break;
00698 } while (!connect());
00699 }
00700 log_debug("connection failed");
00701 break_contact(ContactEvent::BROKEN);
00702 goto broken;
00703 }
00704
00705
00706
00707 params_.retry_interval_ = params_.min_retry_interval_;
00708
00709
00710 if (link->state() != Link::OPENING) {
00711 link->set_state(Link::OPENING);
00712 }
00713
00714 send_loop();
00715
00716 } else {
00717
00718
00719
00720
00721
00722 if (! accept()) {
00723 log_debug("accept failed");
00724 return;
00725 }
00726
00727
00728 recv_loop();
00729 }
00730
00731 broken:
00732
00733
00734
00735
00736
00737 if (should_stop())
00738 return;
00739
00740
00741
00742 if (!initiate_) {
00743 log_err("passive side exited loop but didn't set should_stop!");
00744 return;
00745 }
00746
00747
00748
00749 ASSERT(sock_->get_notifier());
00750 ASSERT(params_.retry_interval_ != 0);
00751 log_info("waiting for %d seconds before retrying connection",
00752 params_.retry_interval_);
00753 if (sock_->get_notifier()->wait(NULL, params_.retry_interval_ * 1000)) {
00754 log_info("socket interrupted from retry sleep -- exiting thread");
00755 return;
00756 }
00757
00758
00759 params_.retry_interval_ = params_.retry_interval_ * 2;
00760 if (params_.retry_interval_ > params_.max_retry_interval_) {
00761 params_.retry_interval_ = params_.max_retry_interval_;
00762 }
00763 }
00764
00765 log_debug("connection thread main loop complete -- thread exiting");
00766 }
00767
00768
00772 bool
00773 BluetoothConvergenceLayer::Connection::send_announce()
00774 {
00775 char buff[18];
00776 (void)buff;
00777
00778 bdaddr_t remote;
00779 sock_->remote_addr(remote);
00780
00781 ASSERT( bacmp(&remote,BDADDR_ANY) != 0 );
00782
00783
00784 if (announce_ == NULL) {
00785 announce_ = new Bundle();
00786
00787
00788
00789 AnnounceBundle::create_announce_bundle(announce_.object(),
00790 BundleDaemon::instance()->local_eid());
00791 }
00792
00793 log_debug("attempting to contact %s with ANNOUNCE",
00794 oasys::Bluetooth::batostr(&remote,buff));
00795
00796 oasys::StaticStringBuffer<1024> buf;
00797 buf.appendf("BTCL AnnounceBundle:\n");
00798 announce_->format_verbose(&buf);
00799 log_multiline(oasys::LOG_DEBUG, buf.c_str());
00800
00801
00802 rcvbuf_.reserve(params_.readbuf_len_);
00803 sndbuf_.reserve(params_.writebuf_len_);
00804
00805
00806
00807 if (sock_->state() != oasys::BluetoothSocket::ESTABLISHED) {
00808 if (! connect()) {
00809 log_debug("connect failed");
00810 return false;
00811 }
00812 }
00813
00814 return send_bundle(announce_.object());
00815 }
00816
00817
00821 void
00822 BluetoothConvergenceLayer::Connection::recv_announce()
00823 {
00824 int timeout;
00825 int ret;
00826 char typecode;
00827
00828
00829 if (rcvbuf_.fullbytes() == 0) {
00830 ASSERT(rcvbuf_.end() == rcvbuf_.data());
00831
00832 timeout = 2 * params_.keepalive_interval_ * 1000;
00833 log_debug("recv_announce: blocking on frame... (timeout %d)", timeout);
00834
00835 ret = sock_->timeout_read(rcvbuf_.end(),
00836 rcvbuf_.tailbytes(),
00837 timeout);
00838 if (ret == oasys::IOEOF || ret == oasys::IOERROR) {
00839 log_info("recv_announce: remote connection unexpectedly closed");
00840 shutdown:
00841 break_contact(ContactEvent::BROKEN);
00842 return;
00843
00844 } else if (ret == oasys::IOTIMEOUT) {
00845 log_info("recv_announce: no message heard for > %d msecs -- "
00846 "breaking contact", timeout);
00847 goto shutdown;
00848 }
00849
00850 rcvbuf_.fill(ret);
00851 note_data_rcvd();
00852 }
00853
00854 typecode = *rcvbuf_.start();
00855 rcvbuf_.consume(1);
00856
00857 if (typecode != BUNDLE_DATA)
00858 goto shutdown;
00859
00860 if (! recv_bundle() )
00861 goto shutdown;
00862 }
00863
00864
00870 bool
00871 BluetoothConvergenceLayer::Connection::send_bundle(Bundle* bundle)
00872 {
00873 int header_len;
00874 size_t sdnv_len;
00875 size_t bthdr_len;
00876 u_char bthdr_buf[32];
00877 size_t block_len;
00878 size_t payload_len = bundle->payload_.length();
00879 size_t payload_offset = 0;
00880 const u_char* payload_data;
00881
00882
00883 if (AnnounceBundle::parse_announce_bundle(bundle) == false)
00884 inflight_.push_back(InFlightBundle(bundle));
00885
00886
00887
00888
00889
00890
00891
00892
00893 retry_headers:
00894 header_len =
00895 BundleProtocol::format_header_blocks(bundle, sndbuf_.buf(),
00896 sndbuf_.buf_len());
00897 if (header_len < 0) {
00898 log_debug("send_bundle: bundle header too big for buffer len %zu -- "
00899 "doubling size and retrying", sndbuf_.buf_len());
00900 goto retry_headers;
00901 }
00902
00903 sdnv_len = SDNV::encoding_len(header_len + payload_len);
00904 bthdr_len = 1 + sizeof(BundleDataHeader) + sdnv_len;
00905
00906 ASSERT(sizeof(bthdr_buf) >= bthdr_len);
00907
00908
00909
00910 bthdr_buf[0] = BUNDLE_DATA;
00911 BundleDataHeader* dh = (BundleDataHeader*)(&bthdr_buf[1]);
00912 memcpy(&dh->bundle_id, &bundle->bundleid_, sizeof(bundle->bundleid_));
00913 SDNV::encode(header_len + payload_len, &dh->total_length[0], sdnv_len);
00914
00915
00916 struct iovec iov[2];
00917 iov[0].iov_base = (char*)bthdr_buf;
00918 iov[0].iov_len = bthdr_len;
00919
00920 iov[1].iov_base = (char*)sndbuf_.buf();
00921 iov[1].iov_len = header_len;
00922
00923
00924 log_debug("send_bundle: sending bundle id %d "
00925 "btcl hdr len: %zu, bundle header len: %zu payload len: %zu",
00926 bundle->bundleid_, bthdr_len, header_len, payload_len);
00927
00928 int cc = sock_->writevall(iov, 2);
00929
00930 if (cc == oasys::IOINTR) {
00931 log_info("send_bundle: interrupted while sending bundle header");
00932 break_contact(ContactEvent::USER);
00933 return false;
00934 } else if (cc != (int)bthdr_len + header_len) {
00935 if (cc == 0) {
00936 log_err("send_bundle: remote side closed connection");
00937 } else {
00938 log_err("send_bundle: "
00939 "error sending bundle header (wrote %d/%zu): %s",
00940 cc, (bthdr_len + header_len), strerror(errno));
00941 }
00942
00943 break_contact(ContactEvent::BROKEN);
00944 return false;
00945 }
00946
00947
00948
00949 while (payload_len > 0) {
00950
00951 if (payload_len <= params_.writebuf_len_) {
00952 block_len = payload_len;
00953 } else {
00954 block_len = params_.writebuf_len_;
00955 }
00956
00957
00958 payload_data = bundle->payload_.read_data(
00959 payload_offset,
00960 block_len,
00961 sndbuf_.buf(block_len),
00962 BundlePayload::KEEP_FILE_OPEN);
00963
00964 log_debug("send_bundle: sending %zu byte block %p",
00965 block_len, payload_data);
00966
00967 cc = sock_->writeall((char*)payload_data, block_len);
00968
00969 if (cc == oasys::IOINTR) {
00970 log_info("send_bundle: interrupted while sending bundle block");
00971 break_contact(ContactEvent::USER);
00972 bundle->payload_.close_file();
00973 return false;
00974 } else if (cc != (int)block_len) {
00975 if (cc == 0) {
00976 log_err("send_bundle: remote side closed connection");
00977 } else {
00978 log_err("send_bundle: "
00979 "error sending bundle block (wrote %d/%zu): %s",
00980 cc, block_len, strerror(errno));
00981 }
00982 break_contact(ContactEvent::BROKEN);
00983 bundle->payload_.close_file();
00984 return false;
00985 }
00986
00987
00988 payload_offset += block_len;
00989 payload_len -= block_len;
00990
00991
00992
00993
00994 int revents;
00995 cc = sock_->poll_sockfd(POLLIN, &revents, 0);
00996
00997 if (cc == 1) {
00998 log_debug("send_bundle: data available on the socket");
00999 if (! handle_reply()) {
01000 return false;
01001 }
01002 } else if (cc == 0 || cc == oasys::IOTIMEOUT) {
01003
01004 } else if (cc == oasys::IOINTR) {
01005 log_info("send_bundle: interrupted while checking for acks");
01006 break_contact(ContactEvent::USER);
01007 bundle->payload_.close_file();
01008 return false;
01009 } else {
01010 log_err("send_bundle: unexpected error while checking for acks");
01011 break_contact(ContactEvent::BROKEN);
01012 bundle->payload_.close_file();
01013 return false;
01014 }
01015 }
01016
01017
01018
01019
01020 if (! params_.bundle_ack_enabled_ &&
01021 (AnnounceBundle::parse_announce_bundle(bundle) == false)) {
01022 inflight_.pop_front();
01023 BundleDaemon::post(
01024 new BundleTransmittedEvent(bundle, contact_, payload_len, 0));
01025 }
01026
01027 bundle->payload_.close_file();
01028 return true;
01029 }
01030
01031
01037 bool
01038 BluetoothConvergenceLayer::Connection::recv_bundle()
01039 {
01040 int cc;
01041 Bundle* bundle = new Bundle();
01042 BundleDataHeader datahdr;
01043 int header_len;
01044 u_int64_t total_len;
01045 int sdnv_len = 0;
01046 size_t rcvd_len = 0;
01047 size_t acked_len = 0;
01048 size_t payload_len = 0;
01049
01050 bool valid = false;
01051 bool recvok = false;
01052
01053 log_debug("recv_bundle: consuming bundle headers...");
01054
01055
01056
01057
01058 if (rcvbuf_.fullbytes() < sizeof(BundleDataHeader)) {
01059 incomplete_bt_header:
01060 rcvbuf_.reserve(sizeof(BundleDataHeader) + 10);
01061 cc = sock_->timeout_read(rcvbuf_.end(), rcvbuf_.tailbytes(),
01062 params_.rtt_timeout_);
01063 if (cc < 0) {
01064 log_err("recv_bundle: error reading bundle headers: %s",
01065 strerror(errno));
01066 delete bundle;
01067 return false;
01068 }
01069
01070 log_debug("recv_bundle: read %d bytes...", cc);
01071 rcvbuf_.fill(cc);
01072 note_data_rcvd();
01073 }
01074
01075
01076 if (rcvbuf_.fullbytes() < sizeof(BundleDataHeader)) {
01077 log_err("recv_bundle: read too short to encode data header");
01078 goto incomplete_bt_header;
01079 }
01080
01081
01082 memcpy(&datahdr, rcvbuf_.start(), sizeof(BundleDataHeader));
01083
01084
01085 sdnv_len = SDNV::decode((u_char*)rcvbuf_.start() + sizeof(BundleDataHeader),
01086 rcvbuf_.fullbytes() - sizeof(BundleDataHeader),
01087 &total_len);
01088 if (sdnv_len < 0) {
01089 log_err("recv_bundle: read too short to encode SDNV");
01090 goto incomplete_bt_header;
01091 }
01092
01093
01094 rcvbuf_.consume(sizeof(BundleDataHeader) + sdnv_len);
01095
01096 incomplete_bundle_header:
01097
01098
01099
01100 header_len = BundleProtocol::parse_header_blocks(bundle,
01101 (u_char*)rcvbuf_.start(),
01102 rcvbuf_.fullbytes());
01103
01104 if (header_len < 0) {
01105 if (rcvbuf_.tailbytes() == 0) {
01106 rcvbuf_.reserve(rcvbuf_.size() * 2);
01107 }
01108 cc = sock_->timeout_read(rcvbuf_.end(), rcvbuf_.tailbytes(),
01109 params_.rtt_timeout_);
01110 if (cc < 0) {
01111 log_err("recv_bundle: error reading bundle headers: %s",
01112 strerror(errno));
01113 delete bundle;
01114 return false;
01115 }
01116 rcvbuf_.fill(cc);
01117 note_data_rcvd();
01118 goto incomplete_bundle_header;
01119 }
01120
01121 log_debug("recv_bundle: got valid bundle header -- "
01122 "sender bundle id %d, header_length %zu, total_length %llu",
01123 datahdr.bundle_id, header_len, total_len);
01124 rcvbuf_.consume(header_len);
01125
01126
01127
01128 payload_len = bundle->payload_.length();
01129 if (total_len != header_len + payload_len) {
01130 log_err("recv_bundle: bundle length mismatch -- "
01131 "total_len %llu, header_len %zu, payload_len %zu",
01132 total_len, header_len, payload_len);
01133 delete bundle;
01134 return false;
01135 }
01136
01137
01138
01139
01140
01141 do {
01142 if (rcvbuf_.fullbytes() == 0) {
01143
01144 ASSERT(rcvbuf_.data() == rcvbuf_.end());
01145
01146 cc = sock_->timeout_read(rcvbuf_.data(),
01147 rcvbuf_.tailbytes(),
01148 params_.rtt_timeout_);
01149 if (cc == oasys::IOTIMEOUT) {
01150 log_warn("recv_bundle: timeout reading bundle data block");
01151 goto done;
01152 } else if (cc == oasys::IOEOF) {
01153 log_info("recv_bundle: eof reading bundle data block");
01154 goto done;
01155 } else if (cc < 0) {
01156 log_warn("recv_bundle: error reading bundle data block: %s",
01157 strerror(errno));
01158 goto done;
01159 }
01160 rcvbuf_.fill(cc);
01161 note_data_rcvd();
01162 }
01163
01164 log_debug("recv_bundle: got %zu byte chunk, rcvd_len %zu",
01165 rcvbuf_.fullbytes(), rcvd_len);
01166
01167
01168
01169 cc = std::min(rcvbuf_.fullbytes(), payload_len - rcvd_len);
01170 if (cc != 0) {
01171 bundle->payload_.append_data((u_char*)rcvbuf_.start(), cc);
01172 rcvd_len += cc;
01173 rcvbuf_.consume(cc);
01174 }
01175
01176
01177
01178
01179 valid = true;
01180
01181
01182
01183 if (rcvd_len - acked_len > params_.partial_ack_len_ &&
01184 AnnounceBundle::parse_announce_bundle(bundle) == false) {
01185 log_debug("recv_bundle: "
01186 "got %zu bytes acked %zu, sending partial ack",
01187 rcvd_len, acked_len);
01188
01189 if (! send_ack(datahdr.bundle_id, rcvd_len)) {
01190 goto done;
01191 }
01192 acked_len = rcvd_len;
01193 }
01194
01195 } while (rcvd_len < payload_len);
01196
01197
01198
01199
01200 if (params_.bundle_ack_enabled_ &&
01201 (AnnounceBundle::parse_announce_bundle(bundle) == false) &&
01202 (payload_len == 0 || (acked_len != rcvd_len)))
01203 {
01204 if (! send_ack(datahdr.bundle_id, payload_len)) {
01205 goto done;
01206 }
01207 }
01208
01209 recvok = true;
01210
01211 done:
01212 bundle->payload_.close_file();
01213
01214 if ((!valid) || (!recvok)) {
01215
01216
01217 if (bundle)
01218 delete bundle;
01219
01220 return false;
01221 }
01222
01223 log_debug("recv_bundle: "
01224 "new bundle id %d arrival, payload length %zu (rcvd %zu)",
01225 bundle->bundleid_, payload_len, rcvd_len);
01226
01227
01228
01229 ASSERT(rcvd_len <= bundle->payload_.length());
01230 BundleDaemon::post(
01231 new BundleReceivedEvent(bundle, EVENTSRC_PEER, rcvd_len));
01232
01233
01234 EndpointID eid;
01235 if (AnnounceBundle::parse_announce_bundle(bundle,&eid)) {
01236
01237 ASSERT(params_.neighbor_poll_interval_ > 0);
01238
01239 char buff[18];
01240 bdaddr_t remote;
01241 sock_->remote_addr(remote);
01242 const char *nexthop =
01243 (const char *) oasys::Bluetooth::batostr(&remote,buff);
01244
01245
01246
01247
01248
01249
01250
01251 oasys::StaticStringBuffer<1024> buf;
01252 buf.appendf("received BTCL AnnounceBundle from %s at %s\n",
01253 eid.c_str(),
01254 nexthop);
01255 bundle->format_verbose(&buf);
01256 log_multiline(oasys::LOG_INFO, buf.c_str());
01257
01258
01259 bool receiver = false;
01260 if (announce_ == NULL) {
01261
01262 send_announce();
01263 receiver = true;
01264 }
01265
01266
01267
01268
01269
01270 ContactManager* cm = BundleDaemon::instance()->contactmgr();
01271
01272
01273
01274
01275 (void)cm->new_opportunistic_link(cl_,nexthop,eid);
01276
01277 if (receiver) {
01278
01279 set_should_stop();
01280 break_contact(ContactEvent::BROKEN);
01281 return false;
01282 }
01283 }
01284 return true;
01285 }
01286
01290 bool
01291 BluetoothConvergenceLayer::Connection::send_ack(u_int32_t bundle_id,
01292 size_t acked_len)
01293 {
01294 char typecode = BUNDLE_ACK;
01295
01296 BundleAckHeader ackhdr;
01297 ackhdr.bundle_id = bundle_id;
01298 ackhdr.acked_length = htonl(acked_len);
01299
01300 struct iovec iov[2];
01301 iov[0].iov_base = &typecode;
01302 iov[0].iov_len = 1;
01303 iov[1].iov_base = (char*)&ackhdr;
01304 iov[1].iov_len = sizeof(BundleAckHeader);
01305
01306 int total = 1 + sizeof(BundleAckHeader);
01307 int cc = sock_->writevall(iov, 2);
01308
01309 if (cc != total) {
01310 log_info("send_ack: problem sending ack (wrote %d/%d): %s",
01311 cc, total, strerror(errno));
01312 return false;
01313 }
01314
01315 return true;
01316 }
01317
01318
01322 bool
01323 BluetoothConvergenceLayer::Connection::send_keepalive()
01324 {
01325 char typecode = KEEPALIVE;
01326 int ret = sock_->write(&typecode, 1);
01327
01328 if (ret == oasys::IOINTR) {
01329 log_info("send_keepalive: connection interrupted");
01330 break_contact(ContactEvent::USER);
01331 return false;
01332 }
01333
01334 if (ret != 1) {
01335 log_info("remote connection unexpectedly closed");
01336 break_contact(ContactEvent::BROKEN);
01337 return false;
01338 }
01339
01340 return true;
01341 }
01342
01343
01347 void
01348 BluetoothConvergenceLayer::Connection::note_data_rcvd()
01349 {
01350 log_debug("noting receipt of data");
01351 ::gettimeofday(&data_rcvd_, 0);
01352 }
01353
01354
01358 int
01359 BluetoothConvergenceLayer::Connection::handle_ack()
01360 {
01361 if (inflight_.empty()) {
01362 log_err("handle_ack: received ack but no bundles in flight!");
01363 protocol_error:
01364 break_contact(ContactEvent::BROKEN);
01365 return EINVAL;
01366 }
01367 InFlightBundle* ifbundle = &inflight_.front();
01368
01369
01370 if (rcvbuf_.fullbytes() < (1 + sizeof(BundleAckHeader))) {
01371 log_debug("handle_ack: not enough space in buffer (got %zu, need %zu...)",
01372 rcvbuf_.fullbytes(), sizeof(BundleAckHeader));
01373 return ENOMEM;
01374 }
01375
01376
01377 BundleAckHeader ackhdr;
01378 memcpy(&ackhdr, rcvbuf_.start() + 1, sizeof(BundleAckHeader));
01379 rcvbuf_.consume(1 + sizeof(BundleAckHeader));
01380
01381 Bundle* bundle = ifbundle->bundle_.object();
01382 size_t new_acked_len = ntohl(ackhdr.acked_length);
01383 size_t payload_len = bundle->payload_.length();
01384
01385 log_debug("handle_ack: got ack length %zu for bundle id %d length %zu",
01386 new_acked_len, ackhdr.bundle_id, payload_len);
01387
01388 if (ackhdr.bundle_id != bundle->bundleid_) {
01389 log_err("handle_ack: error: bundle id mismatch %d != %d",
01390 ackhdr.bundle_id, bundle->bundleid_);
01391 goto protocol_error;
01392 }
01393
01394 if (new_acked_len < ifbundle->acked_len_ || new_acked_len > payload_len)
01395 {
01396 log_err("handle_ack: invalid acked length %zu (acked %zu, bundle %zu)",
01397 new_acked_len, ifbundle->acked_len_,
01398 payload_len);
01399 goto protocol_error;
01400 }
01401
01402
01403 if (new_acked_len == payload_len) {
01404 inflight_.pop_front();
01405
01406 BundleDaemon::post(
01407 new BundleTransmittedEvent(bundle, contact_, payload_len, new_acked_len));
01408
01409 if (contact_->link()->state() == Link::BUSY) {
01410 BundleDaemon::post_and_wait(
01411 new LinkStateChangeRequest(contact_->link(), Link::AVAILABLE,
01412 ContactEvent::NO_INFO),
01413 event_notifier_);
01414 }
01415
01416 } else {
01417 ifbundle->acked_len_ = new_acked_len;
01418 }
01419
01420 return 0;
01421 }
01422
01426 bool
01427 BluetoothConvergenceLayer::Connection::send_contact_header()
01428 {
01429 BTCLHeader contacthdr;
01430 contacthdr.magic = htonl(MAGIC);
01431 contacthdr.version = BTCL_VERSION;
01432
01433 contacthdr.flags = 0;
01434 if (params_.bundle_ack_enabled_)
01435 contacthdr.flags |= BUNDLE_ACK_ENABLED;
01436
01437 contacthdr.partial_ack_len = htons(params_.partial_ack_len_);
01438 contacthdr.keepalive_interval = htons(params_.keepalive_interval_);
01439 contacthdr.xx__unused = 0;
01440
01441 int cc = sock_->writeall((char*)&contacthdr, sizeof(BTCLHeader));
01442 if (cc != sizeof(BTCLHeader)) {
01443 log_err("error writing contact header (wrote %d/%zu): %s",
01444 cc, sizeof(BTCLHeader), strerror(errno));
01445 return false;
01446 }
01447
01448 return true;
01449 }
01450
01451
01457 bool
01458 BluetoothConvergenceLayer::Connection::recv_contact_header(int timeout)
01459 {
01460 BTCLHeader contacthdr;
01461
01462 ASSERT(timeout != 0);
01463 int cc = sock_->timeout_readall((char*)&contacthdr,
01464 sizeof(BTCLHeader),
01465 timeout);
01466
01467 if (cc == oasys::IOTIMEOUT) {
01468 log_warn("timeout reading contact header");
01469 return false;
01470 }
01471
01472 if (cc != sizeof(BTCLHeader)) {
01473 log_err("error reading contact header (read %d/%zu): %d-%s",
01474 cc, sizeof(BTCLHeader),
01475 errno, strerror(errno));
01476 return false;
01477 }
01478
01479
01480
01481
01482 if (ntohl(contacthdr.magic) != MAGIC) {
01483 log_warn("remote sent magic number 0x%.8x, expected 0x%.8x "
01484 "-- disconnecting.", contacthdr.magic,
01485 MAGIC);
01486 return false;
01487 }
01488
01489 if (contacthdr.version != BTCL_VERSION) {
01490 log_warn("remote sent version %d, expected version %d "
01491 "-- disconnecting.", contacthdr.version,
01492 BTCL_VERSION);
01493 return false;
01494 }
01495
01496
01497
01498
01499 params_.partial_ack_len_ = MIN(params_.partial_ack_len_,
01500 ntohs(contacthdr.partial_ack_len));
01501 params_.keepalive_interval_ = MIN(params_.keepalive_interval_,
01502 ntohs(contacthdr.keepalive_interval));
01503 params_.bundle_ack_enabled_ = params_.bundle_ack_enabled_ &&
01504 (contacthdr.flags & BUNDLE_ACK_ENABLED);
01505
01506 note_data_rcvd();
01507 return true;
01508 }
01509
01515 bool
01516 BluetoothConvergenceLayer::Connection::connect()
01517 {
01518 ASSERT(sock_->state() != oasys::BluetoothSocket::ESTABLISHED);
01519 char buff[18];
01520 (void)buff;
01521
01522 bdaddr_t ba;
01523 sock_->remote_addr(ba);
01524 ASSERT(bacmp(&ba,BDADDR_ANY) != 0);
01525 log_debug("connect: connecting to %s ... ",
01526 oasys::Bluetooth::batostr(&ba,buff));
01527
01528
01529
01530
01531
01532 ASSERT(listener_ != NULL);
01533 listener_->set_should_stop();
01534 listener_->interrupt_from_io();
01535 while(!listener_->is_stopped()) {
01536 oasys::Thread::yield();
01537 }
01538 listener_->close();
01539
01540
01541 int ret = sock_->rc_connect();
01542
01543
01544
01545 if (listener_->rc_bind() == 0 &&
01546 listener_->listen() == 0) {
01547 listener_->start();
01548 }
01549
01550 if (ret != 0) return false;
01551 log_debug("connect: connection established (channel %d), "
01552 "sending contact header ... ", sock_->channel());
01553 if (!send_contact_header()) return false;
01554 log_debug("connect: waiting for contact header reply ... ");
01555 if (!recv_contact_header(params_.rtt_timeout_)) return false;
01556 return true;
01557 }
01558
01559
01563 bool
01564 BluetoothConvergenceLayer::Connection::accept()
01565 {
01566
01567 ASSERT(contact_ == NULL);
01568 log_debug("accept: sending contact header ...");
01569 if (!send_contact_header()) return false;
01570 log_debug("accept: waiting for peer contact header ...");
01571 if (!recv_contact_header(params_.rtt_timeout_)) return false;
01572 return true;
01573 }
01574
01575
01587 void
01588 BluetoothConvergenceLayer::Connection::break_contact(ContactEvent::reason_t reason)
01589 {
01590 ASSERT(sock_);
01591 char typecode = SHUTDOWN;
01592 if (sock_->state() != oasys::BluetoothSocket::CLOSED) {
01593 sock_->set_nonblocking(true);
01594 sock_->write(&typecode,1);
01595 sock_->close();
01596 }
01597
01598 if (contact_ != NULL) {
01599 while (!inflight_.empty()) {
01600 InFlightBundle* inflight = &inflight_.front();
01601 if (inflight->acked_len_ != 0) {
01602 BundleDaemon::post(
01603 new BundleTransmittedEvent(inflight->bundle_.object(),
01604 contact_,
01605 inflight->acked_len_,
01606 inflight->acked_len_));
01607 } else {
01608 BundleDaemon::post(
01609 new BundleTransmittedEvent(inflight->bundle_.object(),
01610 contact_,
01611 inflight->bundle_->payload_.length(),
01612 0));
01613 }
01614 inflight_.pop_front();
01615 }
01616 if (reason != ContactEvent::USER) {
01617
01618
01619
01620
01621
01622
01623
01624
01625 if (contact_->link()->isopen())
01626 {
01627 bool ok = BundleDaemon::post_and_wait(
01628 new ContactDownEvent(contact_, reason),
01629 event_notifier_, 5000);
01630
01631
01632
01633
01634
01635
01636
01637
01638 int total = 0;
01639 while (!ok) {
01640 total += 5;
01641 if (should_stop()) {
01642 log_notice("bundle daemon took > %d seconds to process event: "
01643 "breaking close_contact deadlock", total);
01644 break;
01645 }
01646 if (total >= 60)
01647 PANIC("bundle daemon took > 60 seconds to process event: "
01648 "fatal deadlock condition");
01649 log_warn("bundle daemon took > %d seconds to process event", total);
01650 ok = event_notifier_->wait(0,5000);
01651 }
01652 }
01653 }
01654 if (queue_->size() > 0 )
01655 log_warn("%d bundles still in queue", queue_->size());
01656
01657
01658 if (contact_->cl_info() != NULL ) {
01659 ASSERT(contact_->cl_info() == this);
01660 contact_->set_cl_info(NULL);
01661 }
01662 } else {
01663 ASSERT(inflight_.empty());
01664 }
01665
01666 if (!initiate_ || (reason == ContactEvent::IDLE)) set_should_stop();
01667
01668 }
01669
01670
01677 bool
01678 BluetoothConvergenceLayer::Connection::handle_reply()
01679 {
01680
01681
01682
01683
01684
01685 rcvbuf_.reserve(1);
01686
01687 int ret = sock_->read(rcvbuf_.end(), rcvbuf_.tailbytes());
01688
01689 if (ret == oasys::IOINTR) {
01690 log_info("connection interrupted");
01691 break_contact(ContactEvent::USER);
01692 return false;
01693 }
01694
01695 if (ret < 1) {
01696 log_info("remote connection unexpectedly closed");
01697 break_contact(ContactEvent::BROKEN);
01698 return false;
01699 }
01700
01701 rcvbuf_.fill(ret);
01702 note_data_rcvd();
01703
01704 do {
01705 char typecode = *rcvbuf_.start();
01706 if (typecode == BUNDLE_ACK) {
01707 int ret = handle_ack();
01708
01709 if (ret == 0) {
01710
01711 } else if (ret == ENOMEM) {
01712 break;
01713 } else if (ret == EINVAL) {
01714 return false;
01715 } else {
01716 PANIC("unexpected return %d from handle_ack", ret);
01717 }
01718 } else if (typecode == BUNDLE_DATA) {
01719 rcvbuf_.consume(1);
01720 if(!recv_bundle()) {
01721 break_contact(ContactEvent::BROKEN);
01722 return false;
01723 }
01724 return true;
01725 } else if (typecode == KEEPALIVE) {
01726 rcvbuf_.consume(1);
01727 if (!initiate_) send_keepalive();
01728 } else if (typecode == SHUTDOWN) {
01729 rcvbuf_.consume(1);
01730 log_info("got shutdown request from other side");
01731 break_contact(ContactEvent::SHUTDOWN);
01732 return false;
01733 } else {
01734 log_err("got unexpected frame code %d", typecode);
01735 break_contact(ContactEvent::BROKEN);
01736 return false;
01737 }
01738 } while (rcvbuf_.fullbytes() > 0);
01739 return true;
01740 }
01741
01742
01746 void
01747 BluetoothConvergenceLayer::Connection::send_loop()
01748 {
01749
01750 ASSERT(sock_->state() == oasys::BluetoothSocket::ESTABLISHED);
01751
01752
01753 ASSERT(contact_ != NULL);
01754 ASSERT(contact_->cl_info() == NULL);
01755 contact_->set_cl_info(this);
01756
01757
01758 BundleDaemon::post_and_wait(new ContactUpEvent(contact_), event_notifier_);
01759
01760
01761 rcvbuf_.reserve(params_.readbuf_len_);
01762 sndbuf_.reserve(params_.writebuf_len_);
01763
01764 log_info("connection established -- (keepalive time %d seconds)",
01765 params_.keepalive_interval_);
01766
01767
01768
01769 struct pollfd pollfds[2];
01770
01771 struct pollfd* bundle_poll = &pollfds[0];
01772 bundle_poll->fd = queue_->notifier()->read_fd();
01773 bundle_poll->events = POLLIN;
01774
01775 struct pollfd* sock_poll = &pollfds[1];
01776 sock_poll->fd = sock_->fd();
01777 sock_poll->events = POLLIN;
01778
01779
01780 bool idle = false;
01781
01782
01783 while (true) {
01784
01785 struct timeval now;
01786 struct timeval idle_start;
01787
01788 BundleRef bundle("BTCL::send_loop temporary");
01789
01790
01791 if (should_stop()) {
01792 break_contact(ContactEvent::USER);
01793 return;
01794 }
01795
01796
01797
01798 bundle = queue_->pop_front();
01799
01800 if (bundle != NULL) {
01801
01802 idle = false;
01803
01804
01805 bool sentok = send_bundle(bundle.object());
01806 bundle = NULL;
01807
01808
01809
01810 if (!sentok) {
01811 return;
01812 }
01813
01814
01815
01816
01817 continue;
01818 }
01819
01820
01821
01822 if (!idle && inflight_.empty()) {
01823 idle = true;
01824 ::gettimeofday(&idle_start, 0);
01825 }
01826
01827
01828
01829
01830
01831
01832
01833
01834 pollfds[0].revents = 0;
01835 pollfds[1].revents = 0;
01836
01837 int timeout = params_.keepalive_interval_ * 1000;
01838 if (timeout == 0) {
01839 timeout = -1;
01840 }
01841
01842 log_debug("send_loop: calling poll (timeout %d)", timeout);
01843 int cc = oasys::IO::poll_multiple(pollfds, 2, timeout,
01844 sock_->get_notifier(), logpath_);
01845
01846 if (cc == oasys::IOINTR) {
01847 log_info("send_loop: interrupted from poll, breaking connection");
01848 break_contact(ContactEvent::USER);
01849 return;
01850 }
01851
01852
01853 if (sock_poll->revents != 0) {
01854 if ((sock_poll->revents & POLLHUP) ||
01855 (sock_poll->revents & POLLERR))
01856 {
01857 log_info("send_loop: remote connection error");
01858 break_contact(ContactEvent::BROKEN);
01859 return;
01860 }
01861
01862 if (sock_poll->revents & POLLNVAL)
01863 {
01864 PANIC("send_loop: sock_poll->revents returned with "
01865 "POLLNVAL (0x%x)", sock_poll->revents);
01866 return;
01867 }
01868
01869 if (! (sock_poll->revents & POLLIN)) {
01870 PANIC("unknown revents value 0x%x", sock_poll->revents);
01871 }
01872
01873 log_debug("send_loop: data available on the socket");
01874 if (!handle_reply()) {
01875 return;
01876 }
01877 }
01878
01879
01880 if (cc == oasys::IOTIMEOUT) {
01881 log_debug("timeout from poll, sending keepalive");
01882 ASSERT(params_.keepalive_interval_ != 0);
01883 if (send_keepalive() == false) return;
01884 continue;
01885 }
01886
01887
01888
01889 ::gettimeofday(&now,0);
01890 u_int elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
01891 if (elapsed > (2 * params_.keepalive_interval_ * 1000)) {
01892 log_info("send_loop: no data rcvd for %d msecs "
01893 "(sent %zu.%zu, rcvd %zu.%zu, now %zu.%zu) -- closing contact",
01894 elapsed,
01895 (u_int)keepalive_sent_.tv_sec,
01896 (u_int)keepalive_sent_.tv_usec,
01897 (u_int)data_rcvd_.tv_sec, (u_int)data_rcvd_.tv_usec,
01898 (u_int)now.tv_sec, (u_int)now.tv_usec);
01899 break_contact(ContactEvent::BROKEN);
01900 return;
01901 }
01902
01903
01904 if (idle && (contact_->link()->type() == Link::ONDEMAND)) {
01905 u_int idle_close_time = contact_->link()->params().idle_close_time_;
01906
01907 elapsed = TIMEVAL_DIFF_MSEC(now, idle_start);
01908 if (idle_close_time != 0 && (elapsed > idle_close_time * 1000))
01909 {
01910 log_info("connection idle for %d msecs, closing.",elapsed);
01911 set_should_stop();
01912 break_contact(ContactEvent::IDLE);
01913 return;
01914 } else {
01915 log_debug("connection not idle: %d <= %d",
01916 elapsed, idle_close_time * 1000);
01917 }
01918 }
01919 }
01920 }
01921
01922 void
01923 BluetoothConvergenceLayer::Connection::recv_loop()
01924 {
01925 int timeout;
01926 int ret;
01927 char typecode;
01928
01929
01930 rcvbuf_.reserve(params_.readbuf_len_);
01931
01932 while (1) {
01933
01934
01935 if (rcvbuf_.fullbytes() == 0) {
01936 ASSERT(rcvbuf_.end() == rcvbuf_.data());
01937
01938 timeout = 2 * params_.keepalive_interval_ * 1000;
01939 log_debug("recv_loop: blocking on frame... (timeout %d)", timeout);
01940
01941 ret = sock_->timeout_read(rcvbuf_.end(),
01942 rcvbuf_.tailbytes(),
01943 timeout);
01944
01945 if (ret == oasys::IOEOF || ret == oasys::IOERROR) {
01946 log_info("recv_loop: remote connection unexpectedly closed");
01947 shutdown:
01948 break_contact(ContactEvent::BROKEN);
01949 return;
01950
01951 } else if (ret == oasys::IOTIMEOUT) {
01952 log_info("recv_loop: no message heard for > %d msecs -- "
01953 "breaking contact", timeout);
01954 goto shutdown;
01955 }
01956
01957 rcvbuf_.fill(ret);
01958 note_data_rcvd();
01959 }
01960
01961 typecode = *rcvbuf_.start();
01962 rcvbuf_.consume(1);
01963
01964 log_debug("recv_loop: got frame packet type 0x%x...", typecode);
01965
01966 if (typecode == SHUTDOWN) {
01967 break_contact(ContactEvent::SHUTDOWN);
01968 return;
01969 }
01970
01971 if (typecode == KEEPALIVE) {
01972 log_debug("recv_loop: " "got keepalive, sending response");
01973
01974 if (!send_keepalive()) {
01975 return;
01976 }
01977 continue;
01978 }
01979
01980 if (typecode != BUNDLE_DATA) {
01981 log_err("recv_loop: "
01982 "unexpected typecode 0x%x waiting for BUNDLE_DATA",
01983 typecode);
01984 goto shutdown;
01985 }
01986
01987
01988 if (! recv_bundle()) {
01989 goto shutdown;
01990 }
01991 }
01992 }
01993
01994 void
01995 BluetoothConvergenceLayer::NeighborDiscovery::send_announce(bdaddr_t remote)
01996 {
01997 char buff[18];
01998 const char *nexthop = oasys::Bluetooth::batostr(&remote,buff);
01999
02000 ContactManager* cm = BundleDaemon::instance()->contactmgr();
02001 Link* link = cm->find_link_to(cl_, nexthop);
02002 if (link == NULL) {
02003
02004
02005 Connection *sender = cl_->connections_.connection(
02006 cl_,remote,¶ms_);
02007 if (sender != NULL) {
02008 log_info("sending announce bundle to %s", nexthop);
02009 if (sender->send_announce()) {
02010
02011
02012 sender->recv_announce();
02013 }
02014
02015 delete sender;
02016 sender = NULL;
02017 }
02018 }
02019 }
02020
02021 void
02022 BluetoothConvergenceLayer::NeighborDiscovery::run()
02023 {
02024 if (poll_interval_ == 0) {
02025 return;
02026 }
02027
02028
02029 oasys::BluetoothServiceRegistration sdp_reg(¶ms_.local_addr_);
02030 if (sdp_reg.success() == false) {
02031 log_err("SDP registration failed");
02032 return;
02033 }
02034
02035
02036 while (true) {
02037
02038
02039
02040
02041
02042
02043
02044 int sleep_time = oasys::Random::rand(poll_interval_);
02045
02046 log_debug("sleep_time %d",sleep_time);
02047 sleep(sleep_time);
02048
02049
02050 int nr = inquire();
02051
02052 if (should_stop()) break;
02053
02054 if (nr < 0) {
02055 log_debug("no Bluetooth devices in range");
02056 continue;
02057 }
02058
02059
02060 oasys::BluetoothInquiryInfo bii;
02061 while (next(bii) != -1) {
02062
02063
02064 oasys::BluetoothServiceDiscoveryClient sdpclient;
02065 sdpclient.set_local_addr(params_.local_addr_);
02066 if (sdpclient.is_dtn_router(bii.addr_)) {
02067 send_announce(bii.addr_);
02068 }
02069 if (should_stop()) break;
02070 }
02071 if (should_stop()) break;
02072
02073
02074 reset();
02075 }
02076
02077
02078 log_info("Bluetooth inquiry interrupted by user");
02079 }
02080
02081 }
02082
02083 #endif