BluetoothConvergenceLayer.cc

Go to the documentation of this file.
00001 #include <config.h>
00002 #ifdef OASYS_BLUETOOTH_ENABLED
00003 
00004 #ifndef MIN
00005 # define MIN(x, y) ((x)<(y) ? (x) : (y))
00006 #endif
00007 
00008 #include <sys/poll.h>
00009 #include <stdlib.h>
00010 
00011 #include <netinet/in.h>
00012 #include <bluetooth/bluetooth.h>
00013 #include <errno.h>
00014 extern int errno;
00015 
00016 #include <oasys/thread/Notifier.h>
00017 #include <oasys/thread/Timer.h>
00018 #include <oasys/util/StringBuffer.h>
00019 #include <oasys/bluez/Bluetooth.h>
00020 #include <oasys/bluez/BluetoothSocket.h>
00021 #include <oasys/bluez/RFCOMMClient.h>
00022 #include <oasys/util/OptParser.h>
00023 #include <oasys/util/ScratchBuffer.h>
00024 #include <oasys/util/Random.h>
00025 
00026 #include "BluetoothConvergenceLayer.h"
00027 #include "bundling/Bundle.h"
00028 #include "bundling/AnnounceBundle.h"
00029 #include "bundling/BundleEvent.h"
00030 #include "bundling/BundleDaemon.h"
00031 #include "bundling/BundleList.h"
00032 #include "bundling/BundleProtocol.h"
00033 #include "bundling/SDNV.h"
00034 #include "contacts/ContactManager.h"
00035 #include "routing/BundleRouter.h"
00036 
00037 namespace dtn {
00038 
00039 struct BluetoothConvergenceLayer::Params BluetoothConvergenceLayer::defaults_;
00040 struct BluetoothConvergenceLayer::ConnectionManager BluetoothConvergenceLayer::connections_;
00041 
00042 
00043 /******************************************************************************
00044  *
00045  * BluetoothConvergenceLayer
00046  *
00047  *****************************************************************************/
00048 
00049 BluetoothConvergenceLayer::BluetoothConvergenceLayer() :
00050     ConvergenceLayer("BluetoothConvergenceLayer", "bt")
00051 {
00052     // set defaults here, then let ../cmd/ParamCommand.cc (as well as the link
00053     // specific options) handle changing them
00054     bacpy(&defaults_.local_addr_,BDADDR_ANY);
00055     bacpy(&defaults_.remote_addr_,BDADDR_ANY);
00056     defaults_.hcidev_             = "hci0";
00057     defaults_.bundle_ack_enabled_ = true;
00058     defaults_.partial_ack_len_    = 1024;
00059     defaults_.writebuf_len_       = 32768;
00060     defaults_.readbuf_len_        = 32768;
00061     defaults_.keepalive_interval_ = 30;
00062     defaults_.retry_interval_     = 5;
00063     defaults_.min_retry_interval_ = 5;
00064     defaults_.max_retry_interval_ = 10 * 60;
00065     defaults_.rtt_timeout_        = 30000;  // msecs
00066     defaults_.neighbor_poll_interval_ = 0; // no polling
00067 }
00068 
00072 bool
00073 BluetoothConvergenceLayer::parse_params(Params* params, int argc, const char** argv,
00074                                  const char** invalidp)
00075 {
00076     oasys::OptParser p;
00077 
00078     p.addopt(new oasys::BdAddrOpt("local_addr", &params->local_addr_));
00079     p.addopt(new oasys::BdAddrOpt("remote_addr", &params->remote_addr_));
00080     p.addopt(new oasys::StringOpt("hcidev", &params->hcidev_));
00081     p.addopt(new oasys::BoolOpt("bundle_ack_enabled",
00082                                 &params->bundle_ack_enabled_));
00083     p.addopt(new oasys::UIntOpt("partial_ack_len", &params->partial_ack_len_));
00084     p.addopt(new oasys::UIntOpt("writebuf_len", &params->writebuf_len_));
00085     p.addopt(new oasys::UIntOpt("readbuf_len", &params->readbuf_len_));
00086     p.addopt(new oasys::UIntOpt("keepalive_interval",
00087                                 &params->keepalive_interval_));
00088     p.addopt(new oasys::UIntOpt("min_retry_interval",
00089                                 &params->min_retry_interval_));
00090     p.addopt(new oasys::UIntOpt("max_retry_interval",
00091                                 &params->max_retry_interval_));
00092     p.addopt(new oasys::UIntOpt("rtt_timeout", &params->rtt_timeout_));
00093     p.addopt(new oasys::UIntOpt("neighbor_poll_interval",
00094                                 &params->neighbor_poll_interval_));
00095 
00096     if (! p.parse(argc, argv, invalidp)) {
00097         return false;
00098     }
00099 
00100     return true;
00101 }
00102 
00106 bool
00107 BluetoothConvergenceLayer::interface_up(Interface* iface,
00108                                  int argc, const char* argv[])
00109 {
00110     log_debug("adding interface %s",iface->name().c_str());
00111 
00112     Params *params = new Params(defaults_);
00113     const char* invalid;
00114     if (!parse_params(params, argc, argv, &invalid)) {
00115       log_err("error parsing interface options: invalid option '%s'",
00116               invalid);
00117       return false;
00118     }
00119 
00120     // check for valid Bluetooth address or device name
00121     if (bacmp(&params->local_addr_,BDADDR_ANY) == 0 ) {
00122         // try to read bdaddr from HCI device name
00123         oasys::Bluetooth::hci_get_bdaddr(params->hcidev_.c_str(),
00124                                          &params->local_addr_);
00125         if (bacmp(&params->local_addr_,BDADDR_ANY) == 0 ) {
00126             // cannot proceed without valid local Bluetooth device
00127             log_err("invalid local address setting of BDADDR_ANY");
00128             return false;
00129         }
00130     }
00131     
00132     // create a new server socket for the requested interface using 
00133     // ConnectionManager's factory method (Bluetooth can only allow one 
00134     // process at a time to bind to bdaddr_t so we track anything that 
00135     // wants to bind using ConnectionManager)
00136     Listener* receiver = connections_.listener(this,params);
00137     receiver->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
00138 
00139     // Scan each of RFCOMM's 30 channels, bind to first available
00140     if (receiver->rc_bind() != 0)
00141         return false; // error log already emitted
00142 
00143     if (receiver->listen() != 0)
00144         return false; // error log already emitted
00145 
00146     receiver->start();
00147 
00148     // scan for neighbors
00149     if (params->neighbor_poll_interval_ > 0) {
00150         log_debug("Starting up neighbor polling with interval=%d",
00151                   params->neighbor_poll_interval_);
00152         receiver->nd_ = new NeighborDiscovery(this,params);
00153         receiver->nd_->logpathf("%s/neighbordiscovery",logpath_);
00154         receiver->nd_->start();
00155     }
00156 
00157     // store the new listener object in the cl specific portion of the
00158     // interface
00159     iface->set_cl_info(receiver);
00160 
00161     return true;
00162 }
00163 
00167 bool
00168 BluetoothConvergenceLayer::interface_down(Interface* iface)
00169 {
00170     // grab the listener object, set a flag for the thread to stop and
00171     // then close the socket out from under it, which should cause the
00172     // thread to break out of the blocking call to accept() and
00173     // terminate itself
00174     Listener* receiver = (Listener*)iface->cl_info();
00175     receiver->set_should_stop();
00176     receiver->interrupt_from_io();
00177 
00178     while (! receiver->is_stopped()) {
00179         oasys::Thread::yield();
00180     }
00181     receiver->close();
00182 
00183     if (receiver->nd_ != NULL) {
00184         receiver->nd_->set_should_stop();
00185         receiver->nd_->interrupt();
00186         while (! receiver->nd_->is_stopped()) {
00187             oasys::Thread::yield();
00188         }
00189         delete receiver->nd_;
00190         receiver->nd_ = NULL;
00191     }
00192 
00193     bool res = connections_.del_listener(receiver);
00194     delete receiver;
00195     return res;
00196 }
00197 
00201 void
00202 BluetoothConvergenceLayer::dump_interface(Interface* iface,
00203                                    oasys::StringBuffer* buf)
00204 {
00205     Params* params = &((Listener*)iface->cl_info())->params_;
00206 
00207     char buff[18];
00208     buf->appendf("\tlocal_addr: %s device: %s\n",
00209                  oasys::Bluetooth::batostr(&params->local_addr_,buff),
00210                  params->hcidev_.c_str());
00211 
00212     if (bacmp(&params->remote_addr_,BDADDR_ANY) != 0)
00213         buf->appendf("\tremote_addr: %s\n",
00214                      oasys::Bluetooth::batostr(&params->remote_addr_,buff));
00215 
00216     if (params->neighbor_poll_interval_ > 0) 
00217         buf->appendf("\tneighbor_poll_interval: %d\n",
00218                      params->neighbor_poll_interval_);
00219 }
00220 
00227 bool
00228 BluetoothConvergenceLayer::init_link(Link* link, int argc, const char* argv[])
00229 {
00230     bdaddr_t addr;
00231 
00232     log_debug("adding %s link %s", link->type_str(), link->nexthop());
00233 
00234     // validate the link next hop address
00235     if (! parse_nexthop(link->nexthop(), &addr)) {
00236         log_err("invalid next hop address '%s'", link->nexthop());
00237         return false;
00238     }
00239 
00240     // make sure it's really a valid address
00241     if (bacmp(&addr,BDADDR_ANY) == 0 ) {
00242         log_err("invalid host in next hop address '%s'", link->nexthop());
00243         return false;
00244     }
00245 
00246     // Create a new parameters structure, parse the options, and store
00247     // them in the link's cl info slot.
00248     Params* params = new Params(defaults_);
00249     const char* invalid;
00250     if (! parse_params(params, argc, argv, &invalid)) {
00251         log_err("error parsing link options: invalid option '%s'", invalid);
00252         delete params;
00253         return false;
00254     }
00255 
00256     // check that the local interface is valid
00257     if (bacmp(&params->local_addr_,BDADDR_ANY) == 0) {
00258         // try to read local adapter's address
00259         oasys::Bluetooth::hci_get_bdaddr(params->hcidev_.c_str(),
00260                                          &params->local_addr_);
00261         if (bacmp(&params->local_addr_,BDADDR_ANY) == 0) {
00262             log_err("invalid local address setting of BDADDR_ANY");
00263             return false;
00264         }
00265     }
00266 
00267     char buff[18];
00268     link->set_local(oasys::Bluetooth::batostr(&params->local_addr_,buff));
00269 
00270     // Nothing further, if OpLink
00271     if (link->type() == Link::OPPORTUNISTIC) {
00272         delete params;
00273         return true;
00274     }
00275 
00276     // copy the retry parameters from the link itself
00277     params->retry_interval_     = link->retry_interval_;
00278     params->min_retry_interval_ = link->params().min_retry_interval_;
00279     params->max_retry_interval_ = link->params().max_retry_interval_;
00280 
00281     link->set_cl_info(params);
00282 
00283     return true;
00284 }
00285 
00289 void
00290 BluetoothConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00291 {
00292     Params* params = (Params*) link->cl_info();
00293 
00294     char buff[18];
00295     buf->appendf("\tlocal_addr: %s\n",
00296                  oasys::Bluetooth::batostr(&params->local_addr_,buff));
00297     buf->appendf("\tremote_addr: %s\n",
00298                  oasys::Bluetooth::batostr(&params->remote_addr_,buff));
00299 }
00300 
00305 bool
00306 BluetoothConvergenceLayer::open_contact(const ContactRef& contact)
00307 {
00308     bdaddr_t addr;
00309 
00310     Link* link = contact->link();
00311     log_debug("opening contact on link *%p", link);
00312 
00313     // parse out the address / port from the nexthop address. note
00314     // that these should have been validated in init_link() above, so
00315     // we ASSERT as such
00316     bool valid = parse_nexthop(link->nexthop(),&addr);
00317     ASSERT(valid == true);
00318     ASSERT(bacmp(&addr,BDADDR_ANY) != 0);
00319 
00320     Params* params = (Params*)link->cl_info();
00321 
00322     // Using ConnectionManager factory method to manage bind() contention;
00323     // reuse existing passive or create new active
00324     Connection* sender = connections_.connection(this,addr,params);
00325 
00326     if (sender == NULL) return false;
00327 
00328     // save this contact
00329     sender->set_contact(contact);
00330     sender->start();
00331 
00332     return true;
00333 }
00334 
00338 bool
00339 BluetoothConvergenceLayer::close_contact(const ContactRef& contact)
00340 {
00341     Connection* sender = (Connection*)contact->cl_info();
00342     log_info("close_contact *%p", contact.object());
00343 
00344     if (sender != NULL) {
00345 
00346         if (!sender->is_stopped() && !sender->should_stop()) {
00347             log_debug("interrupting connection thread");
00348             sender->set_should_stop();
00349             sender->interrupt_from_io();
00350             oasys::Thread::yield();
00351         }
00352     
00353         // the connection thread will delete itself when it terminates
00354         // so we can't check any state in the thread itself (i.e. the
00355         // is_stopped flag). however, it first clears the cl_info slot
00356         // in the Contact class which is our indication that it has
00357         // exited, allowing us to return
00358 
00359         while (contact->cl_info() != NULL) {
00360             log_debug("waiting for connection thread to stop...");
00361             usleep(100000);
00362             oasys::Thread::yield();
00363         }
00364 
00365         log_debug("connection thread stopped...");
00366     }
00367 
00368     return true; 
00369 }
00370 
00375 void BluetoothConvergenceLayer::send_bundle(const ContactRef& contact,Bundle* bundle) 
00376 {
00377     log_info("send_bundle *%p to *%p",bundle,contact.object());
00378     Connection* sender = (Connection*)contact->cl_info();
00379 
00380     ASSERT(sender != NULL);
00381 
00382     // set the busy state to apply bundle backpressure
00383     if ((sender->queue_->size() + 1) >= 3)
00384          contact->link()->set_state(Link::BUSY);
00385 
00386     sender->queue_->push_back(bundle);
00387 }
00388 
00389 bool
00390 BluetoothConvergenceLayer::parse_nexthop(const char* nexthop, 
00391                                          bdaddr_t* addr)
00392 {
00393     std::string tmp;
00394     bdaddr_t ba;
00395     const char* p = nexthop;
00396     int numcolons = 5; // expecting 6 colons total
00397 
00398     while (numcolons > 0) {
00399         p = strchr(p+1, ':');
00400         if (p != NULL) {
00401             numcolons--;
00402         } else {
00403             log_warn("bad format for remote Bluetooth address: '%s'",
00404                      nexthop);
00405             return false;
00406         }
00407     }
00408     tmp.assign(nexthop, p - nexthop + 3);
00409     oasys::Bluetooth::strtoba(tmp.c_str(),&ba);
00410 
00411     bacpy(addr,&ba);
00412     return true;
00413 }
00414 
00415 
00416 /******************************************************************************
00417  *
00418  * BluetoothConvergenceLayer::ConnectionManager
00419  *
00420  *****************************************************************************/
00421 BluetoothConvergenceLayer::Listener*
00422 BluetoothConvergenceLayer::ConnectionManager::listener(
00423                                                 BluetoothConvergenceLayer *cl,
00424                                                 Params* params)
00425 {
00426     ASSERT(params);
00427     ASSERT(bacmp(&params->local_addr_,BDADDR_ANY) != 0);
00428 
00429     Listener *l = listener(params->local_addr_);
00430     if (l != NULL) return l;
00431 
00432     // Create a new one, store it, then return it
00433     l = new Listener(cl,params);
00434     addListener(l);
00435     return l;
00436 }
00437 
00438 BluetoothConvergenceLayer::Listener*
00439 BluetoothConvergenceLayer::ConnectionManager::listener(bdaddr_t& addr)
00440 {
00441     if (bacmp(&addr,BDADDR_ANY) == 0) {
00442         log_info("ConnectionManager::listener called with BDADDR_ANY");
00443         return NULL;
00444     }
00445 
00446     char buff[18];
00447     (void)buff;
00448 
00449     // Search listeners
00450     it_ = l_map_.find(addr);
00451     if( it_ != l_map_.end() ) {
00452         log_debug("Retrieving Listener(%p) for bdaddr %s",
00453                   (*it_).second,oasys::Bluetooth::batostr(&addr,buff));
00454         return (*it_).second;
00455     }
00456 
00457     // No luck!
00458     log_debug("Nothing found in ConnectionManager for bdaddr %s",
00459               oasys::Bluetooth::batostr(&addr,buff));
00460 
00461     return NULL;
00462 }
00463 
00464 bool
00465 BluetoothConvergenceLayer::ConnectionManager::delListener(Listener* l)
00466 {
00467     ASSERT(l != NULL);
00468 
00469     bdaddr_t addr = l->local_addr();
00470     return (l_map_.erase(addr) > 0);
00471 }
00472 
00473 void
00474 BluetoothConvergenceLayer::ConnectionManager::addListener(Listener* l)
00475 {
00476     ASSERT(l != NULL);
00477 
00478     char buff[18];
00479     (void)buff;
00480     
00481     bdaddr_t addr;
00482     l->local_addr(addr);
00483 
00484     ASSERT(bacmp(&addr,BDADDR_ANY) != 0);
00485 
00486     log_debug("Adding Listener(%p) for bdaddr %s",
00487               l,oasys::Bluetooth::batostr(&addr,buff));
00488     l_map_[addr]=l;
00489 }
00490 
00491 bool
00492 BluetoothConvergenceLayer::ConnectionManager::del_listener(Listener* l)
00493 {
00494     ASSERT(l != NULL);
00495 
00496     return delListener(l);
00497 }
00498 
00499 BluetoothConvergenceLayer::Connection*
00500 BluetoothConvergenceLayer::ConnectionManager::connection(
00501                                           BluetoothConvergenceLayer* cl,
00502                                           bdaddr_t& addr,
00503                                           Params* params)
00504 {
00505     ASSERT(bacmp(&addr,BDADDR_ANY) != 0);
00506     ASSERT(params != NULL);
00507     ASSERT(bacmp(&params->local_addr_,BDADDR_ANY) != 0);
00508 
00509     // search for passive connections first
00510     Listener* prev = listener(params->local_addr_);
00511 
00512     //XXX/wilson Need to keep some registry of Connections to
00513     // keep count and reflect the seven or less simultaneous Bluetooth
00514     // limitation here at the application layer
00515     if (prev != NULL ) {
00516 
00517         log_debug("Instantiating new connection");
00518         Connection *c = new Connection(cl,addr,params);
00519         c->listener_ = prev;
00520         return c;
00521 
00522     } else {
00523 
00524         // Failure state ... dump then panic
00525 
00526         char buff[18];
00527         // Dump listeners
00528         for(it_ = l_map_.begin(); it_ != l_map_.end(); it_++) {
00529             Listener *l = (*it_).second;
00530             bdaddr_t ba = (*it_).first;
00531             (void)l;
00532             (void)ba;
00533             log_debug("Listener\tListener %p bdaddr %s",
00534                       l,oasys::Bluetooth::batostr(&ba,buff));
00535         }
00536         // Complain loudly
00537         PANIC("ConnectionManager: new connection requested for %s "
00538               "where no previous listener existed",
00539               oasys::Bluetooth::batostr(&addr,buff));
00540     }
00541 
00542     // not reached
00543     return NULL;
00544 }
00545 
00546 
00547 /*****************************************************************************
00548  *
00549  * BluetoothConvergenceLayer::Listener
00550  *
00551  *****************************************************************************/
00552 BluetoothConvergenceLayer::Listener::Listener(BluetoothConvergenceLayer* cl,
00553                                        BluetoothConvergenceLayer::Params* params)
00554     : RFCOMMServerThread("/cl/bt/listener",oasys::Thread::INTERRUPTABLE),
00555       params_(*params),
00556       cl_(cl)
00557 {
00558     set_notifier(new oasys::Notifier("/cl/bt/listener"));
00559 
00560     ASSERT(bacmp(&params->local_addr_,BDADDR_ANY) != 0);
00561 
00562     set_local_addr(params->local_addr_);
00563 
00564     // pause every second to check for interrupt
00565     set_accept_timeout(1000);
00566 
00567     logfd_  = false;
00568     nd_ = NULL;
00569 }
00570 
00571 void
00572 BluetoothConvergenceLayer::Listener::accepted(int fd, bdaddr_t addr, u_int8_t channel)
00573 {
00574     ASSERT(cl_ != NULL);
00575     Connection *c = new Connection(cl_,fd,addr,channel,&params_);
00576     c->listener_ = this;
00577     c->start();
00578     // this channel is now taken over by the passive, so close() and rc_bind()
00579     // all over again
00580     close();
00581     ASSERT(rc_bind() == 0);
00582     ASSERT(listen() == 0);
00583 }
00584 
00585 
00586 /******************************************************************************
00587  *
00588  * BluetoothConvergenceLayer::Connection
00589  *
00590  *****************************************************************************/
00591 
00595 BluetoothConvergenceLayer::Connection::Connection(
00596         BluetoothConvergenceLayer* cl,
00597         bdaddr_t remote_addr,
00598         BluetoothConvergenceLayer::Params* params)
00599     : Thread("BluetoothConvergenceLayer::Connection"),
00600       Logger("BluetoothConvergenceLayer::Connection", "/dtn/cl/bt/conn"),
00601       params_(*params), listener_(NULL), cl_(cl), initiate_(true),
00602       contact_("BluetoothConvergenceLayer::Connection"),
00603       announce_("BluetoothConvergenceLayer::Connection")
00604 {
00605     char buff[18];
00606     logpath_appendf("/%s",oasys::Bluetooth::batostr(&remote_addr,buff));
00607 
00608     queue_ = new BlockingBundleList(logpath_);
00609     queue_->notifier()->logpath_appendf("/queue");
00610     Thread::set_flag(Thread::DELETE_ON_EXIT);
00611     sock_ = new oasys::RFCOMMClient();
00612     sock_->set_local_addr(params->local_addr_);
00613     sock_->logpathf("%s/sock",logpath_);
00614     sock_->set_logfd(false);
00615     sock_->set_remote_addr(remote_addr);
00616     sock_->set_notifier(new oasys::Notifier("/cl/bt/active-connection"));
00617     sock_->get_notifier()->logpath_appendf("/sock_notifier");
00618     event_notifier_ = new oasys::Notifier(logpath_);
00619     event_notifier_->logpath_appendf("/event_notifier");
00620 }
00621 
00625 BluetoothConvergenceLayer::Connection::Connection(
00626         BluetoothConvergenceLayer* cl,
00627         int fd,
00628         bdaddr_t remote_addr,
00629         u_int8_t channel,
00630         Params* params)
00631     : Thread("BluetoothConvergenceLayer::Connection"),
00632       Logger("BluetoothConvergenceLayer::Connection", "/dtn/cl/bt/conn"),
00633       params_(*params), listener_(NULL), cl_(cl), initiate_(false),
00634       contact_("BluetoothConvergenceLayer::Connection"),
00635       announce_("BluetoothConvergenceLayer::Connection")
00636 {
00637     char buff[18];
00638     logpathf("/dtn/cl/bt/passive-conn/%s:%d", oasys::Bluetooth::batostr(&remote_addr,buff),
00639              channel);
00640     queue_ = NULL;
00641     Thread::set_flag(Thread::DELETE_ON_EXIT);
00642     sock_ = new oasys::RFCOMMClient(fd,remote_addr,channel,logpath_);
00643     sock_->set_local_addr(params->local_addr_);
00644     sock_->logpathf("%s/sock",logpath_);
00645     sock_->set_logfd(false);
00646     sock_->set_notifier(new oasys::Notifier("/cl/bt/passive-connection"));
00647     sock_->get_notifier()->logpath_appendf("/sock_notifier");
00648     event_notifier_ = NULL;
00649 }
00650 
00654 BluetoothConvergenceLayer::Connection::~Connection()
00655 {
00656     if (queue_) delete queue_;
00657     delete sock_;
00658     if (event_notifier_) delete event_notifier_;
00659 }
00660 
00661 
00671 void
00672 BluetoothConvergenceLayer::Connection::run()
00673 {
00674     // XXX/demmer much of this could be abstracted into a generic CL
00675     // ConnectionThread class, assuming we had another
00676     // connection-oriented CL that we wanted to support (e.g. SCTP)
00677     
00678     while (1) {
00679         log_debug("connection main loop starting up...");
00680         
00681         if (initiate_)
00682         {
00683             ASSERT(contact_ != NULL);
00684             Link* link = contact_->link();
00685             
00686             if (! connect()) {
00687 
00688                 //XXX/wilson or demmer
00689                 // Add params for number of times to retry
00690                 // and for how long to sleep between tries
00691                 int param_num_retries = 3;
00692                 int param_how_long = 5;
00693 
00694                 if (link->type() == Link::OPPORTUNISTIC) {
00695                     do {
00696                         sleep(param_how_long);
00697                         if(--param_num_retries == 0) break;
00698                     } while (!connect());
00699                 }
00700                 log_debug("connection failed");
00701                 break_contact(ContactEvent::BROKEN);
00702                 goto broken;
00703             }
00704 
00705             // now that we've successfully connected, reset the retry timer
00706             // to the minimum interval
00707             params_.retry_interval_ = params_.min_retry_interval_;
00708 
00709             // signal the OPENING state
00710             if (link->state() != Link::OPENING) {
00711                 link->set_state(Link::OPENING);
00712             }
00713 
00714             send_loop();
00715 
00716         } else {
00717             /*
00718              * If accepting a connection failed, we always return
00719              * which triggers the thread to be deleted and therefore
00720              * cleans up our state.
00721              */
00722             if (! accept()) {
00723                 log_debug("accept failed");
00724                 return; // trigger a deletion
00725             }
00726 
00727 
00728             recv_loop();
00729         }
00730 
00731  broken:
00732         // use the thread's should_stop() indicator (set by
00733         // break_contact() as well as the interruption routines) to
00734         // determine when we should exit, either because we were
00735         // interrupted by a user action (i.e. link close), we're the
00736         // passive acceptor, or because an ondemand link is idle
00737         if (should_stop())
00738             return;
00739 
00740         // otherwise, we should really be the initiator, or else
00741         // something wierd happened
00742         if (!initiate_) {
00743             log_err("passive side exited loop but didn't set should_stop!");
00744             return;
00745         }
00746 
00747         // sleep for the appropriate retry amount (by waiting to see
00748         // if we're interrupted) and try to re-establish the connection
00749         ASSERT(sock_->get_notifier());
00750         ASSERT(params_.retry_interval_ != 0);
00751         log_info("waiting for %d seconds before retrying connection",
00752                  params_.retry_interval_);
00753         if (sock_->get_notifier()->wait(NULL, params_.retry_interval_ * 1000)) {
00754             log_info("socket interrupted from retry sleep -- exiting thread");
00755             return;
00756         }
00757 
00758         // double the retry timer up to the max for the next time around
00759         params_.retry_interval_ = params_.retry_interval_ * 2;
00760         if (params_.retry_interval_ > params_.max_retry_interval_) {
00761             params_.retry_interval_ = params_.max_retry_interval_;
00762         }
00763     }
00764 
00765     log_debug("connection thread main loop complete -- thread exiting");
00766 }
00767 
00768 
00772 bool
00773 BluetoothConvergenceLayer::Connection::send_announce()
00774 {
00775     char buff[18]; //used by oasys::batostr below
00776     (void)buff;
00777 
00778     bdaddr_t remote;
00779     sock_->remote_addr(remote);
00780 
00781     ASSERT( bacmp(&remote,BDADDR_ANY) != 0 );
00782 
00783     // create the ANNOUNCE bundle
00784     if (announce_ == NULL) {
00785         announce_ = new Bundle();
00786         // Mark this bundle as Admin,
00787         // set bundle type to ADMIN_ANNOUNCE, and
00788         // set source eid to local_eid
00789         AnnounceBundle::create_announce_bundle(announce_.object(),
00790             BundleDaemon::instance()->local_eid());
00791     }
00792 
00793     log_debug("attempting to contact %s with ANNOUNCE",
00794               oasys::Bluetooth::batostr(&remote,buff));
00795 
00796     oasys::StaticStringBuffer<1024> buf;
00797     buf.appendf("BTCL AnnounceBundle:\n");
00798     announce_->format_verbose(&buf);
00799     log_multiline(oasys::LOG_DEBUG, buf.c_str());
00800 
00801     // first reserve space in the buffers
00802     rcvbuf_.reserve(params_.readbuf_len_);
00803     sndbuf_.reserve(params_.writebuf_len_);
00804 
00805     // sock_ will be established if this is the receiving thread
00806     // sending back a response
00807     if (sock_->state() != oasys::BluetoothSocket::ESTABLISHED) {
00808         if (! connect()) {
00809             log_debug("connect failed");
00810             return false;
00811         }
00812     }
00813 
00814     return send_bundle(announce_.object());
00815 }
00816 
00817 
00821 void
00822 BluetoothConvergenceLayer::Connection::recv_announce()
00823 {
00824     int timeout;
00825     int ret;
00826     char typecode;
00827     // if there's nothing in the buffer,
00828     // block waiting for the one byte typecode
00829     if (rcvbuf_.fullbytes() == 0) {
00830         ASSERT(rcvbuf_.end() == rcvbuf_.data()); // sanity
00831 
00832         timeout = 2 * params_.keepalive_interval_ * 1000;
00833         log_debug("recv_announce: blocking on frame... (timeout %d)", timeout);
00834 
00835         ret = sock_->timeout_read(rcvbuf_.end(),
00836                                   rcvbuf_.tailbytes(),
00837                                   timeout);
00838         if (ret == oasys::IOEOF || ret == oasys::IOERROR) {
00839             log_info("recv_announce: remote connection unexpectedly closed");
00840 shutdown:
00841             break_contact(ContactEvent::BROKEN);
00842             return;
00843 
00844         } else if (ret == oasys::IOTIMEOUT) {
00845             log_info("recv_announce: no message heard for > %d msecs -- "
00846                      "breaking contact", timeout);
00847             goto shutdown;
00848         }
00849 
00850         rcvbuf_.fill(ret);
00851         note_data_rcvd();
00852     }
00853 
00854     typecode = *rcvbuf_.start();
00855     rcvbuf_.consume(1);
00856 
00857     if (typecode != BUNDLE_DATA)
00858         goto shutdown;
00859 
00860     if (! recv_bundle() )
00861         goto shutdown;
00862 }
00863 
00864 
00870 bool
00871 BluetoothConvergenceLayer::Connection::send_bundle(Bundle* bundle)
00872 {
00873     int header_len;
00874     size_t sdnv_len;
00875     size_t bthdr_len;
00876     u_char bthdr_buf[32];
00877     size_t block_len;
00878     size_t payload_len = bundle->payload_.length();
00879     size_t payload_offset = 0;
00880     const u_char* payload_data;
00881 
00882     // first put the bundle on the inflight_ queue
00883     if (AnnounceBundle::parse_announce_bundle(bundle) == false)
00884         inflight_.push_back(InFlightBundle(bundle));
00885 
00886     // Each bundle is preceded by a BUNDLE_DATA typecode and then a
00887     // BundleDataHeader that is variable-length since it includes an
00888     // SDNV for the total bundle length.
00889     //
00890     // So, first calculate the length of the bundle headers while we
00891     // put it into the send buffer, then fill in the small CL buffer
00892 
00893 retry_headers:
00894     header_len =
00895         BundleProtocol::format_header_blocks(bundle, sndbuf_.buf(),
00896                                              sndbuf_.buf_len());
00897     if (header_len < 0) {
00898         log_debug("send_bundle: bundle header too big for buffer len %zu -- "
00899                   "doubling size and retrying", sndbuf_.buf_len());
00900         goto retry_headers;
00901     }
00902 
00903     sdnv_len = SDNV::encoding_len(header_len + payload_len);
00904     bthdr_len = 1 + sizeof(BundleDataHeader) + sdnv_len;
00905 
00906     ASSERT(sizeof(bthdr_buf) >= bthdr_len);
00907 
00908     // Now fill in the type code and the bundle data header (with the
00909     // sdnv for the total bundle length)
00910     bthdr_buf[0] = BUNDLE_DATA;
00911     BundleDataHeader* dh = (BundleDataHeader*)(&bthdr_buf[1]);
00912     memcpy(&dh->bundle_id, &bundle->bundleid_, sizeof(bundle->bundleid_));
00913     SDNV::encode(header_len + payload_len, &dh->total_length[0], sdnv_len);
00914 
00915     // Build up a two element iovec
00916     struct iovec iov[2];
00917     iov[0].iov_base = (char*)bthdr_buf;
00918     iov[0].iov_len  = bthdr_len;
00919 
00920     iov[1].iov_base = (char*)sndbuf_.buf();
00921     iov[1].iov_len  = header_len;
00922 
00923     // send off the preamble and the headers
00924     log_debug("send_bundle: sending bundle id %d "
00925               "btcl hdr len: %zu, bundle header len: %zu payload len: %zu",
00926               bundle->bundleid_, bthdr_len, header_len, payload_len);
00927 
00928     int cc = sock_->writevall(iov, 2);
00929 
00930     if (cc == oasys::IOINTR) {
00931         log_info("send_bundle: interrupted while sending bundle header");
00932         break_contact(ContactEvent::USER);
00933         return false;
00934     } else if (cc != (int)bthdr_len + header_len) {
00935         if (cc == 0) {
00936             log_err("send_bundle: remote side closed connection");
00937         } else {
00938             log_err("send_bundle: "
00939                     "error sending bundle header (wrote %d/%zu): %s",
00940                     cc, (bthdr_len + header_len), strerror(errno));
00941         }
00942 
00943         break_contact(ContactEvent::BROKEN);
00944         return false;
00945     } 
00946 
00947     // now loop through the the payload, sending blocks of data and
00948     // checking for incoming acks along the way
00949     while (payload_len > 0) {
00950 
00951         if (payload_len <= params_.writebuf_len_) {
00952             block_len = payload_len;
00953         } else {
00954             block_len = params_.writebuf_len_;
00955         }
00956 
00957         // grab the next payload chunk
00958         payload_data = bundle->payload_.read_data(
00959                          payload_offset,
00960                          block_len,
00961                          sndbuf_.buf(block_len),
00962                          BundlePayload::KEEP_FILE_OPEN);
00963 
00964         log_debug("send_bundle: sending %zu byte block %p",
00965                   block_len, payload_data);
00966 
00967         cc = sock_->writeall((char*)payload_data, block_len);
00968 
00969         if (cc == oasys::IOINTR) {
00970             log_info("send_bundle: interrupted while sending bundle block");
00971             break_contact(ContactEvent::USER);
00972             bundle->payload_.close_file();
00973             return false;
00974         } else if (cc != (int)block_len) {
00975             if (cc == 0) {
00976                 log_err("send_bundle: remote side closed connection");
00977             } else {
00978                 log_err("send_bundle: "
00979                         "error sending bundle block (wrote %d/%zu): %s",
00980                         cc, block_len, strerror(errno));
00981             }
00982             break_contact(ContactEvent::BROKEN);
00983             bundle->payload_.close_file();
00984             return false;
00985         }
00986 
00987         // update payload_offset and payload_len
00988         payload_offset += block_len;
00989         payload_len    -= block_len;
00990 
00991         // call poll() to check for any pending ack replies on the
00992         // socket with a timeout of zero, indicating that we don't
00993         // want to block
00994         int revents;
00995         cc = sock_->poll_sockfd(POLLIN, &revents, 0);
00996 
00997         if (cc == 1) {
00998             log_debug("send_bundle: data available on the socket");
00999             if (! handle_reply()) {
01000                 return false;
01001             }
01002         } else if (cc == 0 || cc == oasys::IOTIMEOUT) {
01003             // nothing to do
01004         } else if (cc == oasys::IOINTR) {
01005             log_info("send_bundle: interrupted while checking for acks");
01006             break_contact(ContactEvent::USER);
01007             bundle->payload_.close_file();
01008             return false;
01009         } else {
01010             log_err("send_bundle: unexpected error while checking for acks");
01011             break_contact(ContactEvent::BROKEN);
01012             bundle->payload_.close_file();
01013             return false;
01014         }
01015     }
01016 
01017     // if we got here, we sent the whole bundle successfully, so if
01018     // we're not using acks, post an event for the router. if we are
01019     // using acks, the event is posted in handle_ack
01020     if (! params_.bundle_ack_enabled_ && 
01021           (AnnounceBundle::parse_announce_bundle(bundle) == false)) {
01022         inflight_.pop_front();
01023         BundleDaemon::post(
01024                 new BundleTransmittedEvent(bundle, contact_, payload_len, 0));
01025     }
01026 
01027     bundle->payload_.close_file();
01028     return true;
01029 }
01030 
01031 
01037 bool
01038 BluetoothConvergenceLayer::Connection::recv_bundle()
01039 {
01040     int cc;
01041     Bundle* bundle = new Bundle();
01042     BundleDataHeader datahdr;
01043     int header_len;
01044     u_int64_t total_len;
01045     int    sdnv_len = 0;
01046     size_t rcvd_len = 0;
01047     size_t acked_len = 0;
01048     size_t payload_len = 0;
01049 
01050     bool valid = false;
01051     bool recvok = false;
01052 
01053     log_debug("recv_bundle: consuming bundle headers...");
01054 
01055     // if we don't yet have enough bundle data, so read in as much as
01056     // we can, first making sure we have enough space in the buffer
01057     // for at least the bundle data header and the SDNV
01058     if (rcvbuf_.fullbytes() < sizeof(BundleDataHeader)) {
01059  incomplete_bt_header:
01060         rcvbuf_.reserve(sizeof(BundleDataHeader) + 10);
01061         cc = sock_->timeout_read(rcvbuf_.end(), rcvbuf_.tailbytes(),
01062                                  params_.rtt_timeout_);
01063         if (cc < 0) {
01064             log_err("recv_bundle: error reading bundle headers: %s",
01065                     strerror(errno));
01066             delete bundle;
01067             return false;
01068         }
01069 
01070         log_debug("recv_bundle: read %d bytes...", cc);
01071         rcvbuf_.fill(cc);
01072         note_data_rcvd();
01073     }
01074         
01075     // parse out the BundleDataHeader
01076     if (rcvbuf_.fullbytes() < sizeof(BundleDataHeader)) {
01077         log_err("recv_bundle: read too short to encode data header");
01078         goto incomplete_bt_header;
01079     }
01080 
01081     // copy out the data header but don't advance the buffer (yet)
01082     memcpy(&datahdr, rcvbuf_.start(), sizeof(BundleDataHeader));
01083     
01084     // parse out the SDNV that encodes the whole bundle length
01085     sdnv_len = SDNV::decode((u_char*)rcvbuf_.start() + sizeof(BundleDataHeader),
01086                             rcvbuf_.fullbytes() - sizeof(BundleDataHeader),
01087                             &total_len);
01088     if (sdnv_len < 0) {
01089         log_err("recv_bundle: read too short to encode SDNV");
01090         goto incomplete_bt_header;
01091     }
01092 
01093     // got the full bt header, so skip that much in the buffer
01094     rcvbuf_.consume(sizeof(BundleDataHeader) + sdnv_len);
01095 
01096  incomplete_bundle_header:
01097     // now try to parse the headers into the new bundle, which may
01098     // require reading in more data and possibly increasing the size
01099     // of the stream buffer
01100     header_len = BundleProtocol::parse_header_blocks(bundle,
01101                                                      (u_char*)rcvbuf_.start(),
01102                                                      rcvbuf_.fullbytes());
01103     
01104     if (header_len < 0) {
01105         if (rcvbuf_.tailbytes() == 0) {
01106             rcvbuf_.reserve(rcvbuf_.size() * 2);
01107         }
01108         cc = sock_->timeout_read(rcvbuf_.end(), rcvbuf_.tailbytes(),
01109                                  params_.rtt_timeout_);
01110         if (cc < 0) {
01111             log_err("recv_bundle: error reading bundle headers: %s",
01112                     strerror(errno));
01113             delete bundle;
01114             return false;
01115         }
01116         rcvbuf_.fill(cc);
01117         note_data_rcvd();
01118         goto incomplete_bundle_header;
01119     }
01120 
01121     log_debug("recv_bundle: got valid bundle header -- "
01122               "sender bundle id %d, header_length %zu, total_length %llu",
01123               datahdr.bundle_id, header_len, total_len);
01124     rcvbuf_.consume(header_len);
01125 
01126     // all lengths have been parsed, so we can do some length
01127     // validation checks
01128     payload_len = bundle->payload_.length();
01129     if (total_len != header_len + payload_len) {
01130         log_err("recv_bundle: bundle length mismatch -- "
01131                 "total_len %llu, header_len %zu, payload_len %zu",
01132                 total_len, header_len, payload_len);
01133         delete bundle;
01134         return false;
01135     }
01136 
01137     // so far so good, now loop until we've read the whole payload
01138     // done with the rest. note that all reads have a timeout. note
01139     // also that we may have some payload data in the buffer
01140     // initially, so we check for that before trying to read more
01141     do {
01142         if (rcvbuf_.fullbytes() == 0) {
01143             // read a chunk of data
01144             ASSERT(rcvbuf_.data() == rcvbuf_.end());
01145             
01146             cc = sock_->timeout_read(rcvbuf_.data(),
01147                                      rcvbuf_.tailbytes(),
01148                                      params_.rtt_timeout_);
01149             if (cc == oasys::IOTIMEOUT) {
01150                 log_warn("recv_bundle: timeout reading bundle data block");
01151                 goto done;
01152             } else if (cc == oasys::IOEOF) {
01153                 log_info("recv_bundle: eof reading bundle data block");
01154                 goto done;
01155             } else if (cc < 0) {
01156                 log_warn("recv_bundle: error reading bundle data block: %s",
01157                          strerror(errno));
01158                 goto done;
01159             }
01160             rcvbuf_.fill(cc);
01161             note_data_rcvd();
01162         }
01163         
01164         log_debug("recv_bundle: got %zu byte chunk, rcvd_len %zu",
01165                   rcvbuf_.fullbytes(), rcvd_len);
01166         
01167         // append the chunk of data up to the maximum size of the
01168         // bundle (which may be empty) and update the amount received
01169         cc = std::min(rcvbuf_.fullbytes(), payload_len - rcvd_len);
01170         if (cc != 0) {
01171             bundle->payload_.append_data((u_char*)rcvbuf_.start(), cc);
01172             rcvd_len += cc;
01173             rcvbuf_.consume(cc);
01174         }
01175         
01176         // at this point, we can make at least a valid bundle fragment
01177         // from what we've gotten thus far (assuming reactive
01178         // fragmentation is enabled)
01179         valid = true;
01180 
01181 
01182         // check if we've read enough to send an ack
01183         if (rcvd_len - acked_len > params_.partial_ack_len_ &&
01184                AnnounceBundle::parse_announce_bundle(bundle) == false) {
01185             log_debug("recv_bundle: "
01186                       "got %zu bytes acked %zu, sending partial ack",
01187                       rcvd_len, acked_len);
01188             
01189             if (! send_ack(datahdr.bundle_id, rcvd_len)) {
01190                 goto done;
01191             }
01192             acked_len = rcvd_len;
01193         }
01194         
01195     } while (rcvd_len < payload_len);
01196 
01197     // if the sender requested a bundle ack and we haven't yet sent
01198     // one for the whole bundle in the partial ack check above, send
01199     // one now
01200     if (params_.bundle_ack_enabled_ && 
01201         (AnnounceBundle::parse_announce_bundle(bundle) == false) &&
01202         (payload_len == 0 || (acked_len != rcvd_len)))
01203     {
01204         if (! send_ack(datahdr.bundle_id, payload_len)) {
01205             goto done;
01206         }
01207     }
01208     
01209     recvok = true;
01210 
01211  done:
01212     bundle->payload_.close_file();
01213     
01214     if ((!valid) || (!recvok)) {
01215         // the bundle isn't valid or we didn't get the whole thing
01216         // so just return
01217         if (bundle)
01218             delete bundle;
01219         
01220         return false;
01221     }
01222 
01223     log_debug("recv_bundle: "
01224               "new bundle id %d arrival, payload length %zu (rcvd %zu)",
01225               bundle->bundleid_, payload_len, rcvd_len);
01226     
01227     // inform the daemon that we got a valid bundle, though it may not
01228     // be complete (as indicated by passing the rcvd_len)
01229     ASSERT(rcvd_len <= bundle->payload_.length());
01230     BundleDaemon::post(
01231         new BundleReceivedEvent(bundle, EVENTSRC_PEER, rcvd_len));
01232     
01233     // snoop on bundles to see if Announce bundle crosses by
01234     EndpointID eid;
01235     if (AnnounceBundle::parse_announce_bundle(bundle,&eid)) {
01236 
01237         ASSERT(params_.neighbor_poll_interval_ > 0);
01238 
01239         char buff[18];
01240         bdaddr_t remote;
01241         sock_->remote_addr(remote);
01242         const char *nexthop =
01243             (const char *) oasys::Bluetooth::batostr(&remote,buff);
01244 
01245         // AnnounceBundle has been received, which either means
01246         // 1) this is the first news we've heard of remote
01247         // or
01248         // 2) remote is sending this AnnounceBundle as ack to our original
01249 
01250         // log the AnnounceBundle, source EID and remote BT addr
01251         oasys::StaticStringBuffer<1024> buf;
01252         buf.appendf("received BTCL AnnounceBundle from %s at %s\n",
01253                     eid.c_str(),
01254                     nexthop);
01255         bundle->format_verbose(&buf);
01256         log_multiline(oasys::LOG_INFO, buf.c_str());
01257 
01258         // if this is the first we've heard of it, announce_ will be NULL
01259         bool receiver = false;
01260         if (announce_ == NULL) {
01261             // so respond on the same connection
01262             send_announce();
01263             receiver = true;
01264         }
01265 
01266         // otherwise, we are the initiator, and this is the response.
01267 
01268         // either way, it's time to light up a link and post it up
01269         // to BundleDaemon
01270         ContactManager* cm = BundleDaemon::instance()->contactmgr();
01271 
01272         // Finds the link (if it exists) otherwise creates the link and
01273         // posts LinkCreatedEvent 
01274         // Either way, it posts LinkAvailableEvent for this link
01275         (void)cm->new_opportunistic_link(cl_,nexthop,eid);
01276 
01277         if (receiver) {
01278             // our work here is done; time to self delete
01279             set_should_stop();
01280             break_contact(ContactEvent::BROKEN);
01281             return false; // tells recv_loop to back out
01282         }
01283     }
01284     return true;
01285 }
01286 
01290 bool
01291 BluetoothConvergenceLayer::Connection::send_ack(u_int32_t bundle_id,
01292                                           size_t acked_len)
01293 {
01294     char typecode = BUNDLE_ACK;
01295     
01296     BundleAckHeader ackhdr;
01297     ackhdr.bundle_id = bundle_id;
01298     ackhdr.acked_length = htonl(acked_len);
01299 
01300     struct iovec iov[2];
01301     iov[0].iov_base = &typecode;
01302     iov[0].iov_len  = 1;
01303     iov[1].iov_base = (char*)&ackhdr;
01304     iov[1].iov_len  = sizeof(BundleAckHeader);
01305     
01306     int total = 1 + sizeof(BundleAckHeader);
01307     int cc = sock_->writevall(iov, 2);
01308 
01309     if (cc != total) {
01310         log_info("send_ack: problem sending ack (wrote %d/%d): %s",
01311                 cc, total, strerror(errno));
01312         return false;
01313     }
01314 
01315     return true;
01316 }
01317 
01318 
01322 bool
01323 BluetoothConvergenceLayer::Connection::send_keepalive()
01324 {
01325     char typecode = KEEPALIVE;
01326     int ret = sock_->write(&typecode, 1);
01327 
01328     if (ret == oasys::IOINTR) { 
01329         log_info("send_keepalive: connection interrupted");
01330         break_contact(ContactEvent::USER);
01331         return false;
01332     }
01333 
01334     if (ret != 1) {
01335         log_info("remote connection unexpectedly closed");
01336         break_contact(ContactEvent::BROKEN);
01337         return false;
01338     }
01339     
01340     return true;
01341 }
01342 
01343 
01347 void
01348 BluetoothConvergenceLayer::Connection::note_data_rcvd()
01349 {
01350     log_debug("noting receipt of data");
01351     ::gettimeofday(&data_rcvd_, 0);
01352 }
01353 
01354 
01358 int
01359 BluetoothConvergenceLayer::Connection::handle_ack()
01360 {
01361     if (inflight_.empty()) {
01362         log_err("handle_ack: received ack but no bundles in flight!");
01363  protocol_error:
01364         break_contact(ContactEvent::BROKEN);
01365         return EINVAL;
01366     }
01367     InFlightBundle* ifbundle = &inflight_.front();
01368 
01369     // now see if we got a complete ack header
01370     if (rcvbuf_.fullbytes() < (1 + sizeof(BundleAckHeader))) {
01371         log_debug("handle_ack: not enough space in buffer (got %zu, need %zu...)",
01372                   rcvbuf_.fullbytes(), sizeof(BundleAckHeader));
01373         return ENOMEM;
01374     }
01375 
01376     // if we do, copy it out, after skipping the typecode
01377     BundleAckHeader ackhdr;
01378     memcpy(&ackhdr, rcvbuf_.start() + 1, sizeof(BundleAckHeader));
01379     rcvbuf_.consume(1 + sizeof(BundleAckHeader));
01380 
01381     Bundle* bundle = ifbundle->bundle_.object();
01382     size_t new_acked_len = ntohl(ackhdr.acked_length);
01383     size_t payload_len = bundle->payload_.length();
01384     
01385     log_debug("handle_ack: got ack length %zu for bundle id %d length %zu",
01386               new_acked_len, ackhdr.bundle_id, payload_len);
01387     
01388     if (ackhdr.bundle_id != bundle->bundleid_) {
01389         log_err("handle_ack: error: bundle id mismatch %d != %d",
01390                 ackhdr.bundle_id, bundle->bundleid_);
01391         goto protocol_error;
01392     }
01393 
01394     if (new_acked_len < ifbundle->acked_len_ || new_acked_len > payload_len)
01395     {
01396         log_err("handle_ack: invalid acked length %zu (acked %zu, bundle %zu)",
01397                 new_acked_len, ifbundle->acked_len_,
01398                 payload_len);
01399         goto protocol_error;
01400     }
01401 
01402     // check if we're done with the bundle and if we need to unblock the link
01403     if (new_acked_len == payload_len) {
01404         inflight_.pop_front();
01405         
01406         BundleDaemon::post(
01407             new BundleTransmittedEvent(bundle, contact_, payload_len, new_acked_len));
01408         
01409         if (contact_->link()->state() == Link::BUSY) {
01410             BundleDaemon::post_and_wait(
01411                 new LinkStateChangeRequest(contact_->link(), Link::AVAILABLE,
01412                                            ContactEvent::NO_INFO),
01413                 event_notifier_);
01414         }
01415 
01416     } else {
01417         ifbundle->acked_len_ = new_acked_len;
01418     }
01419 
01420     return 0;
01421 }
01422 
01426 bool
01427 BluetoothConvergenceLayer::Connection::send_contact_header()
01428 {
01429     BTCLHeader contacthdr;
01430     contacthdr.magic = htonl(MAGIC);
01431     contacthdr.version = BTCL_VERSION;
01432 
01433     contacthdr.flags = 0;
01434     if (params_.bundle_ack_enabled_)
01435         contacthdr.flags |= BUNDLE_ACK_ENABLED;
01436 
01437     contacthdr.partial_ack_len    = htons(params_.partial_ack_len_);
01438     contacthdr.keepalive_interval = htons(params_.keepalive_interval_);
01439     contacthdr.xx__unused         = 0;
01440 
01441     int cc = sock_->writeall((char*)&contacthdr, sizeof(BTCLHeader));
01442     if (cc != sizeof(BTCLHeader)) {
01443         log_err("error writing contact header (wrote %d/%zu): %s",
01444                 cc, sizeof(BTCLHeader), strerror(errno));
01445         return false;
01446     } 
01447 
01448     return true;
01449 }
01450 
01451 
01457 bool
01458 BluetoothConvergenceLayer::Connection::recv_contact_header(int timeout)
01459 {
01460     BTCLHeader contacthdr;
01461 
01462     ASSERT(timeout != 0);
01463     int cc = sock_->timeout_readall((char*)&contacthdr,
01464                                     sizeof(BTCLHeader),
01465                                     timeout);
01466 
01467     if (cc == oasys::IOTIMEOUT) {
01468         log_warn("timeout reading contact header");
01469         return false;
01470     }
01471 
01472     if (cc != sizeof(BTCLHeader)) {
01473         log_err("error reading contact header (read %d/%zu): %d-%s",
01474                 cc, sizeof(BTCLHeader),
01475                 errno, strerror(errno));
01476         return false;
01477     }
01478 
01479     /*
01480      * Check for valid magic number and version.
01481      */
01482     if (ntohl(contacthdr.magic) != MAGIC) {
01483          log_warn("remote sent magic number 0x%.8x, expected 0x%.8x "
01484                   "-- disconnecting.", contacthdr.magic,
01485                   MAGIC);
01486          return false;
01487     }
01488 
01489     if (contacthdr.version != BTCL_VERSION) {
01490          log_warn("remote sent version %d, expected version %d "
01491                   "-- disconnecting.", contacthdr.version,
01492                   BTCL_VERSION);
01493          return false;
01494     }
01495 
01496     /*
01497      * Now do parameter negotiation.
01498      */
01499     params_.partial_ack_len_    = MIN(params_.partial_ack_len_,
01500     ntohs(contacthdr.partial_ack_len));
01501     params_.keepalive_interval_ = MIN(params_.keepalive_interval_,
01502             ntohs(contacthdr.keepalive_interval));
01503     params_.bundle_ack_enabled_ = params_.bundle_ack_enabled_ &&
01504         (contacthdr.flags & BUNDLE_ACK_ENABLED); 
01505 
01506     note_data_rcvd();
01507     return true;
01508 }
01509 
01515 bool
01516 BluetoothConvergenceLayer::Connection::connect()
01517 {
01518     ASSERT(sock_->state() != oasys::BluetoothSocket::ESTABLISHED);
01519     char buff[18];
01520     (void)buff;
01521     
01522     bdaddr_t ba;
01523     sock_->remote_addr(ba);
01524     ASSERT(bacmp(&ba,BDADDR_ANY) != 0);
01525     log_debug("connect: connecting to %s ... ",
01526               oasys::Bluetooth::batostr(&ba,buff));
01527 
01528     // before attempting to scan the remote host for available channels,
01529     // shut down the local listener to release its channel for bind()
01530     // NOTE:  This is only the listener.  Child sockets from previous
01531     // calls to accepted() are not (and should not be) affected
01532     ASSERT(listener_ != NULL);
01533     listener_->set_should_stop();
01534     listener_->interrupt_from_io();
01535     while(!listener_->is_stopped()) {
01536         oasys::Thread::yield();
01537     }
01538     listener_->close();
01539 
01540     // scan all 30 channels
01541     int ret = sock_->rc_connect();
01542 
01543     // re-enable local listener
01544     // Scan each of RFCOMM's 30 channels, bind to first available
01545     if (listener_->rc_bind() == 0 &&
01546         listener_->listen() == 0) {
01547         listener_->start();
01548     }
01549 
01550     if (ret != 0) return false;
01551     log_debug("connect: connection established (channel %d), "
01552               "sending contact header ... ", sock_->channel());
01553     if (!send_contact_header()) return false;
01554     log_debug("connect: waiting for contact header reply ... ");
01555     if (!recv_contact_header(params_.rtt_timeout_)) return false;
01556     return true;
01557 }
01558 
01559 
01563 bool
01564 BluetoothConvergenceLayer::Connection::accept()
01565 {
01566     // even with passive reuse, this should originally be true
01567     ASSERT(contact_ == NULL);
01568     log_debug("accept: sending contact header ...");
01569     if (!send_contact_header()) return false;
01570     log_debug("accept: waiting for peer contact header ...");
01571     if (!recv_contact_header(params_.rtt_timeout_)) return false;
01572     return true;
01573 }
01574 
01575 
01587 void
01588 BluetoothConvergenceLayer::Connection::break_contact(ContactEvent::reason_t reason)
01589 {
01590     ASSERT(sock_);
01591     char typecode = SHUTDOWN;
01592     if (sock_->state() != oasys::BluetoothSocket::CLOSED) {
01593         sock_->set_nonblocking(true);
01594         sock_->write(&typecode,1);
01595         sock_->close();
01596     }
01597 
01598     if (contact_ != NULL) {
01599         while (!inflight_.empty()) {
01600             InFlightBundle* inflight = &inflight_.front();
01601             if (inflight->acked_len_ != 0) {
01602                 BundleDaemon::post(
01603                     new BundleTransmittedEvent(inflight->bundle_.object(),
01604                                           contact_,
01605                                           inflight->acked_len_,
01606                                           inflight->acked_len_));
01607             } else {
01608                 BundleDaemon::post(
01609                     new BundleTransmittedEvent(inflight->bundle_.object(),
01610                                           contact_,
01611                                           inflight->bundle_->payload_.length(),
01612                                           0));
01613             }
01614             inflight_.pop_front();
01615         }
01616         if (reason != ContactEvent::USER) {
01617             // if the connection isn't being closed by the user, and
01618             // the link is open, we need to notify the daemon.
01619             // typically, we then just bounce back to the main run
01620             // loop to try to re-establish the connection...
01621             //
01622             // we block until the daemon has processed the event to
01623             // make sure that we don't clear the cl_info slot too
01624             // early (triggering a crash)
01625             if (contact_->link()->isopen())
01626             {
01627                 bool ok = BundleDaemon::post_and_wait(
01628                     new ContactDownEvent(contact_, reason),
01629                     event_notifier_, 5000);
01630                 // one particularly annoying condition occurs if we
01631                 // attempt to close the link at the same time that the
01632                 // daemon does -- the thread in close_contact is
01633                 // blocked waiting for us to clear the cl_info slot
01634                 // below.
01635                 //
01636                 // XXX/demmer maybe this should be done for all calls
01637                 // to post_and_wait??
01638                 int total = 0;
01639                 while (!ok) {
01640                     total += 5;
01641                     if (should_stop()) {
01642                         log_notice("bundle daemon took > %d seconds to process event: "
01643                                    "breaking close_contact deadlock", total);
01644                         break;
01645                     }
01646                     if (total >= 60) 
01647                         PANIC("bundle daemon took > 60 seconds to process event: "
01648                               "fatal deadlock condition");
01649                     log_warn("bundle daemon took > %d seconds to process event", total);
01650                     ok = event_notifier_->wait(0,5000);
01651                 }
01652             }
01653         }
01654         if (queue_->size() > 0 )
01655             log_warn("%d bundles still in queue", queue_->size());
01656         // Once the main thread knows the contact is down (by the event above)
01657         // we need to signal that we've quit, by clearing the cl_info slot
01658         if (contact_->cl_info() != NULL ) {
01659             ASSERT(contact_->cl_info() == this);
01660             contact_->set_cl_info(NULL);
01661         }
01662     } else {
01663         ASSERT(inflight_.empty());
01664     }
01665     // for the passive acceptor or idle link, exit the main loop
01666     if (!initiate_ || (reason == ContactEvent::IDLE)) set_should_stop();
01667 
01668 }
01669 
01670 
01677 bool
01678 BluetoothConvergenceLayer::Connection::handle_reply()
01679 {
01680     // Reserve at least one byte of space, which has the side-effect
01681     // of moving up any needed buffer space if we're at the end.
01682     //
01683     // However, the buffer should have been pre-reserved with the
01684     // configured receive buffer size.
01685     rcvbuf_.reserve(1);
01686 
01687     int ret = sock_->read(rcvbuf_.end(), rcvbuf_.tailbytes());
01688 
01689     if (ret == oasys::IOINTR) {
01690         log_info("connection interrupted");
01691         break_contact(ContactEvent::USER);
01692         return false;
01693     }
01694 
01695     if (ret < 1) {
01696         log_info("remote connection unexpectedly closed");
01697         break_contact(ContactEvent::BROKEN);
01698         return false;
01699     }
01700 
01701     rcvbuf_.fill(ret);
01702     note_data_rcvd();
01703 
01704     do {
01705         char typecode = *rcvbuf_.start();
01706         if (typecode == BUNDLE_ACK) {
01707             int ret = handle_ack();
01708 
01709             if (ret == 0) {
01710                 // complete ack, nothing to do
01711             } else if (ret == ENOMEM) {
01712                 break; // incomplete ack message
01713             } else if (ret == EINVAL) {
01714                 return false; // internal error, break contact was called
01715             } else {
01716                 PANIC("unexpected return %d from handle_ack", ret);
01717             }
01718         } else if (typecode == BUNDLE_DATA) {
01719             rcvbuf_.consume(1);
01720             if(!recv_bundle()) {
01721                 break_contact(ContactEvent::BROKEN);
01722                 return false;
01723             }
01724             return true;
01725         } else if (typecode == KEEPALIVE) {
01726             rcvbuf_.consume(1);
01727             if (!initiate_) send_keepalive();
01728         } else if (typecode == SHUTDOWN) {
01729             rcvbuf_.consume(1);
01730             log_info("got shutdown request from other side");
01731             break_contact(ContactEvent::SHUTDOWN);
01732             return false;
01733         } else {
01734             log_err("got unexpected frame code %d", typecode);
01735             break_contact(ContactEvent::BROKEN);
01736             return false;
01737         }
01738     } while (rcvbuf_.fullbytes() > 0);
01739     return true;
01740 }
01741 
01742 
01746 void
01747 BluetoothConvergenceLayer::Connection::send_loop()
01748 {
01749     // someone should already have established the session
01750     ASSERT(sock_->state() == oasys::BluetoothSocket::ESTABLISHED);
01751 
01752     // store our state in the contact's cl info slot
01753     ASSERT(contact_ != NULL);
01754     ASSERT(contact_->cl_info() == NULL);
01755     contact_->set_cl_info(this);
01756 
01757     // inform the daemon that the contact is available
01758     BundleDaemon::post_and_wait(new ContactUpEvent(contact_), event_notifier_);
01759 
01760     // reserve space in the buffers
01761     rcvbuf_.reserve(params_.readbuf_len_);
01762     sndbuf_.reserve(params_.writebuf_len_);
01763 
01764     log_info("connection established -- (keepalive time %d seconds)",
01765              params_.keepalive_interval_);
01766 
01767     // build up a poll vector since we need to block below on input
01768     // from the socket and the bundle list
01769     struct pollfd pollfds[2];
01770 
01771     struct pollfd* bundle_poll = &pollfds[0];
01772     bundle_poll->fd          = queue_->notifier()->read_fd();
01773     bundle_poll->events      = POLLIN;
01774 
01775     struct pollfd* sock_poll = &pollfds[1];
01776     sock_poll->fd            = sock_->fd();
01777     sock_poll->events        = POLLIN;
01778 
01779     // flag for whether or not we're in idle state
01780     bool idle = false;
01781 
01782     // main loop
01783     while (true) {
01784         // keep track of the time we got data and idle
01785         struct timeval now;
01786         struct timeval idle_start;
01787 
01788         BundleRef bundle("BTCL::send_loop temporary");
01789 
01790         // if we've been interrupted, then the link should close
01791         if (should_stop()) {
01792             break_contact(ContactEvent::USER);
01793             return;
01794         }
01795 
01796         // pop the bundle (if any) off the queue, which gives us a
01797         // local reference on it
01798         bundle = queue_->pop_front();
01799 
01800         if (bundle != NULL) {
01801             // clear the idle bit
01802             idle = false;
01803 
01804             // send the bundle off and remove our local reference
01805             bool sentok = send_bundle(bundle.object());
01806             bundle = NULL;
01807 
01808             // if the last transmission wasn't completely successful,
01809             // it's time to break the contact
01810             if (!sentok) {
01811                 return;
01812             }
01813 
01814             // otherwise, we loop back to the beginning and check for
01815             // more bundles on the queue as an optimization to check
01816             // the list before calling poll
01817             continue;
01818         }
01819 
01820         // Check for whether or not we've just become idle, in which
01821         // case we record the current time
01822         if (!idle && inflight_.empty()) {
01823             idle = true;
01824             ::gettimeofday(&idle_start, 0);
01825         }
01826 
01827         // No bundle, so we'll block for:
01828         // 1) some activity on the socket, (i.e. keepalive, ack, or shutdown)
01829         // 2) the bundle list notifier that indicates new bundle to send
01830         //
01831         // Note that we pass the negotiated keepalive timer (if set)
01832         // as the timeout to the poll call so we know when we should
01833         // send a keepalive
01834         pollfds[0].revents = 0;
01835         pollfds[1].revents = 0;
01836 
01837         int timeout = params_.keepalive_interval_ * 1000;
01838         if (timeout == 0) {
01839             timeout = -1; // block forever
01840         }
01841 
01842         log_debug("send_loop: calling poll (timeout %d)", timeout);
01843         int cc = oasys::IO::poll_multiple(pollfds, 2, timeout,
01844                                       sock_->get_notifier(), logpath_);
01845 
01846         if (cc == oasys::IOINTR) {
01847             log_info("send_loop: interrupted from poll, breaking connection");
01848             break_contact(ContactEvent::USER);
01849             return;
01850         }
01851 
01852         // check for a message from the other side
01853         if (sock_poll->revents != 0) {
01854             if ((sock_poll->revents & POLLHUP) ||
01855                 (sock_poll->revents & POLLERR))
01856             {
01857                 log_info("send_loop: remote connection error");
01858                 break_contact(ContactEvent::BROKEN);
01859                 return;
01860             }
01861 
01862             if (sock_poll->revents & POLLNVAL)
01863             {
01864                 PANIC("send_loop: sock_poll->revents returned with "
01865                       "POLLNVAL (0x%x)", sock_poll->revents);
01866                 return;
01867             }
01868 
01869             if (! (sock_poll->revents & POLLIN)) {
01870                 PANIC("unknown revents value 0x%x", sock_poll->revents);
01871             }
01872 
01873             log_debug("send_loop: data available on the socket");
01874             if (!handle_reply()) {
01875                 return;
01876             }
01877         }
01878 
01879         // check if it's time to send a keepalive
01880         if (cc == oasys::IOTIMEOUT) {
01881             log_debug("timeout from poll, sending keepalive");
01882             ASSERT(params_.keepalive_interval_ != 0);
01883             if (send_keepalive() == false) return;
01884             continue;
01885         }
01886 
01887         // check that it hasn't been too long since the other side
01888         // sent us some data
01889         ::gettimeofday(&now,0);
01890         u_int elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
01891         if (elapsed > (2 * params_.keepalive_interval_ * 1000)) {
01892             log_info("send_loop: no data rcvd for %d msecs "
01893                      "(sent %zu.%zu, rcvd %zu.%zu, now %zu.%zu) -- closing contact",
01894                      elapsed,
01895                      (u_int)keepalive_sent_.tv_sec,
01896                      (u_int)keepalive_sent_.tv_usec,
01897                      (u_int)data_rcvd_.tv_sec, (u_int)data_rcvd_.tv_usec,
01898                      (u_int)now.tv_sec, (u_int)now.tv_usec);
01899             break_contact(ContactEvent::BROKEN);
01900             return;
01901         }
01902 
01903         // check if the connection has been idle for too long (ONDEMAND only)
01904         if (idle && (contact_->link()->type() == Link::ONDEMAND)) {
01905             u_int idle_close_time = contact_->link()->params().idle_close_time_;
01906 
01907             elapsed = TIMEVAL_DIFF_MSEC(now, idle_start);
01908             if (idle_close_time != 0 && (elapsed > idle_close_time * 1000))
01909             {
01910                 log_info("connection idle for %d msecs, closing.",elapsed);
01911                 set_should_stop();
01912                 break_contact(ContactEvent::IDLE);
01913                 return;
01914             } else {
01915                 log_debug("connection not idle: %d <= %d",
01916                           elapsed, idle_close_time * 1000);
01917             }
01918         }
01919     }
01920 }
01921 
01922 void
01923 BluetoothConvergenceLayer::Connection::recv_loop()
01924 {
01925     int timeout;
01926     int ret;
01927     char typecode;
01928 
01929     // reserve space in the buffer
01930     rcvbuf_.reserve(params_.readbuf_len_);
01931 
01932     while (1) {
01933         // if there's nothing in the buffer,
01934         // block waiting for the one byte typecode
01935         if (rcvbuf_.fullbytes() == 0) {
01936             ASSERT(rcvbuf_.end() == rcvbuf_.data()); // sanity
01937 
01938             timeout = 2 * params_.keepalive_interval_ * 1000;
01939             log_debug("recv_loop: blocking on frame... (timeout %d)", timeout);
01940 
01941             ret = sock_->timeout_read(rcvbuf_.end(),
01942                                       rcvbuf_.tailbytes(),
01943                                       timeout);
01944 
01945             if (ret == oasys::IOEOF || ret == oasys::IOERROR) {
01946                 log_info("recv_loop: remote connection unexpectedly closed");
01947 shutdown:
01948                 break_contact(ContactEvent::BROKEN);
01949                 return;
01950 
01951             } else if (ret == oasys::IOTIMEOUT) {
01952                 log_info("recv_loop: no message heard for > %d msecs -- "
01953                          "breaking contact", timeout);
01954                 goto shutdown;
01955             }
01956 
01957             rcvbuf_.fill(ret);
01958             note_data_rcvd();
01959         }
01960 
01961         typecode = *rcvbuf_.start();
01962         rcvbuf_.consume(1);
01963 
01964         log_debug("recv_loop: got frame packet type 0x%x...", typecode);
01965         
01966         if (typecode == SHUTDOWN) {
01967             break_contact(ContactEvent::SHUTDOWN);
01968             return;
01969         }
01970 
01971         if (typecode == KEEPALIVE) {
01972             log_debug("recv_loop: " "got keepalive, sending response");
01973 
01974             if (!send_keepalive()) {
01975                 return;
01976             }
01977             continue;
01978         }
01979 
01980         if (typecode != BUNDLE_DATA) {
01981             log_err("recv_loop: "
01982                     "unexpected typecode 0x%x waiting for BUNDLE_DATA",
01983                     typecode);
01984             goto shutdown;
01985         }
01986 
01987         // process the bundle
01988         if (! recv_bundle()) {
01989             goto shutdown;
01990         }
01991     }
01992 }
01993 
01994 void
01995 BluetoothConvergenceLayer::NeighborDiscovery::send_announce(bdaddr_t remote)
01996 {
01997     char buff[18]; // used by oasys::batostr
01998     const char *nexthop = oasys::Bluetooth::batostr(&remote,buff);
01999 
02000     ContactManager* cm = BundleDaemon::instance()->contactmgr();
02001     Link* link = cm->find_link_to(cl_, nexthop);
02002     if (link == NULL) {
02003         // Using ConnectionManager factory method to manage bind()
02004         // contention ... returns NULL if error
02005         Connection *sender = cl_->connections_.connection(
02006                                       cl_,remote,&params_);
02007         if (sender != NULL) {
02008             log_info("sending announce bundle to %s", nexthop);
02009             if (sender->send_announce()) {
02010                 // new OpportunisticLink gets created within recv_announce,
02011                 // if successful
02012                 sender->recv_announce();
02013             }
02014             // thread never started, so cleanup is not automatic
02015             delete sender;
02016             sender = NULL;
02017         }
02018     }
02019 }
02020 
02021 void
02022 BluetoothConvergenceLayer::NeighborDiscovery::run()
02023 { 
02024     if (poll_interval_ == 0) {
02025         return;
02026     }
02027 
02028     // register DTN service with local SDP daemon
02029     oasys::BluetoothServiceRegistration sdp_reg(&params_.local_addr_);
02030     if (sdp_reg.success() == false) {
02031         log_err("SDP registration failed");
02032         return;
02033     }
02034 
02035     // loop forever (until interrupted)
02036     while (true) {
02037 
02038         // randomize the sleep time:
02039         // the point is that two nodes with zero prior knowledge of each other
02040         // need to be able to discover each other in a reasonably short time.
02041         // if common practice is that all set their poll_interval to 1 or 30 or x
02042         // then there's the chance of synchronization or syncopation such that 
02043         // discovery doesn't happen.
02044         int sleep_time = oasys::Random::rand(poll_interval_);
02045 
02046         log_debug("sleep_time %d",sleep_time);
02047         sleep(sleep_time);
02048 
02049         // initiate inquiry on local Bluetooth controller
02050         int nr = inquire(); // blocks until inquiry process completes
02051 
02052         if (should_stop()) break;
02053 
02054         if (nr < 0) {
02055             log_debug("no Bluetooth devices in range");
02056             continue;
02057         }
02058 
02059         // enumerate any remote Bluetooth adapters in range
02060         oasys::BluetoothInquiryInfo bii;
02061         while (next(bii) != -1) {
02062 
02063             // query SDP daemon on remote host for DTN's registration
02064             oasys::BluetoothServiceDiscoveryClient sdpclient;
02065             sdpclient.set_local_addr(params_.local_addr_);
02066             if (sdpclient.is_dtn_router(bii.addr_)) {
02067                 send_announce(bii.addr_);
02068             }
02069             if (should_stop()) break;
02070         }
02071         if (should_stop()) break;
02072 
02073         // flush results of previous inquiry
02074         reset();
02075     }
02076 
02077     // interrupted! unregister
02078     log_info("Bluetooth inquiry interrupted by user");
02079 }
02080 
02081 } // namespace dtn
02082 
02083 #endif  /* OASYS_BLUETOOTH_ENABLED */

Generated on Fri Dec 22 14:47:57 2006 for DTN Reference Implementation by  doxygen 1.5.1