CLConnection.cc

Go to the documentation of this file.
00001 /*
00002  * IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. By
00003  * downloading, copying, installing or using the software you agree to
00004  * this license. If you do not agree to this license, do not download,
00005  * install, copy or use the software.
00006  * 
00007  * Intel Open Source License 
00008  * 
00009  * Copyright (c) 2006 Intel Corporation. All rights reserved. 
00010  * 
00011  * Redistribution and use in source and binary forms, with or without
00012  * modification, are permitted provided that the following conditions are
00013  * met:
00014  * 
00015  *   Redistributions of source code must retain the above copyright
00016  *   notice, this list of conditions and the following disclaimer.
00017  * 
00018  *   Redistributions in binary form must reproduce the above copyright
00019  *   notice, this list of conditions and the following disclaimer in the
00020  *   documentation and/or other materials provided with the distribution.
00021  * 
00022  *   Neither the name of the Intel Corporation nor the names of its
00023  *   contributors may be used to endorse or promote products derived from
00024  *   this software without specific prior written permission.
00025  *  
00026  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00027  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00028  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
00029  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE INTEL OR
00030  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00031  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00032  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00033  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00034  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00035  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00036  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00037  */
00038 
00039 #include <oasys/util/OptParser.h>
00040 
00041 #include "CLConnection.h"
00042 #include "bundling/BundleDaemon.h"
00043 #include "bundling/BundlePayload.h"
00044 #include "contacts/ContactManager.h"
00045 
00046 namespace dtn {
00047 
00048 //----------------------------------------------------------------------
00049 CLConnection::CLConnection(const char*       classname,
00050                            const char*       logpath,
00051                            ConvergenceLayer* cl,
00052                            LinkParams*       params,
00053                            bool              active_connector)
00054     : Thread(classname),
00055       Logger(classname, logpath),
00056       contact_(classname),
00057       contact_up_(false),
00058       cmdqueue_(logpath),
00059       cl_(cl),
00060       params_(params),
00061       active_connector_(active_connector),
00062       num_pollfds_(0),
00063       poll_timeout_(-1),
00064       contact_broken_(false)
00065 {
00066     sendbuf_.reserve(params_->sendbuf_len_);
00067     recvbuf_.reserve(params_->recvbuf_len_);
00068 }
00069 
00070 //----------------------------------------------------------------------
00071 CLConnection::~CLConnection()
00072 {
00073 }
00074 
00075 //----------------------------------------------------------------------
00076 void
00077 CLConnection::run()
00078 {
00079     struct pollfd* cmdqueue_poll;
00080 
00081     initialize_pollfds();
00082 
00083     cmdqueue_poll         = &pollfds_[num_pollfds_];
00084     cmdqueue_poll->fd     = cmdqueue_.read_fd();
00085     cmdqueue_poll->events = POLLIN;
00086 
00087     // based on the parameter passed to the constructor, we either
00088     // initiate a connection or accept one, then move on to the main
00089     // run() loop. it is the responsibility of the underlying CL to
00090     // make sure that a contact_ structure is found / created
00091     if (active_connector_) {
00092         connect();
00093     } else {
00094         accept();
00095     }
00096 
00097     while (true) {
00098         if (contact_broken_) {
00099             log_debug("contact_broken set, exiting main loop");
00100             return;
00101         }
00102 
00103         // check the comand queue coming in from the bundle daemon
00104         // if any arrive, we continue to the top of the loop to check
00105         // contact_broken and then process any other commands before
00106         // checking for data to/from the remote side
00107         if (cmdqueue_.size() != 0) {
00108             process_command();
00109             continue;
00110         }
00111 
00112         // send any data there is to send, and if something was sent
00113         // out, we'll call poll() with a zero timeout so we can read
00114         // any data there is to consume, then return to send another
00115         // chunk.
00116         bool more_to_send = send_pending_data();
00117 
00118         // check again here for contact broken since we don't want to
00119         // poll if the socket's been closed
00120         if (contact_broken_) {
00121             log_debug("contact_broken set, exiting main loop");
00122             return;
00123         }
00124         
00125         // now we poll() to wait for a new command (indicated by the
00126         // notifier on the command queue), data arriving from the
00127         // remote side, or write-readiness on the socket indicating
00128         // that we can send more data.
00129         for (int i = 0; i < num_pollfds_ + 1; ++i) {
00130             pollfds_[i].revents = 0;
00131         }
00132 
00133         int timeout = more_to_send ? 0 : poll_timeout_;
00134         
00135         int cc = oasys::IO::poll_multiple(pollfds_, num_pollfds_ + 1,
00136                                           timeout, NULL, logpath_);
00137 
00138         if (cc == oasys::IOTIMEOUT)
00139         {
00140             handle_poll_timeout();
00141         }
00142         else if (cc > 0)
00143         {
00144             if (cc == 1 && cmdqueue_poll->revents != 0) {
00145                 continue; // activity on the command queue only
00146             }
00147             handle_poll_activity();
00148         }
00149         else
00150         {
00151             log_err("unexpected return from poll_multiple: %d", cc);
00152             break_contact(ContactEvent::BROKEN);
00153             return;
00154         }
00155     }
00156 }
00157 
00158 //----------------------------------------------------------------------
00159 void
00160 CLConnection::process_command()
00161 {
00162     CLMsg msg;
00163     bool ok = cmdqueue_.try_pop(&msg);
00164     ASSERT(ok); // shouldn't be called if the queue is empty
00165     
00166     switch(msg.type_) {
00167     case CLMSG_SEND_BUNDLE:
00168         log_debug("processing CLMSG_SEND_BUNDLE");
00169         handle_send_bundle(msg.bundle_.object());
00170         break;
00171         
00172     case CLMSG_CANCEL_BUNDLE:
00173         log_debug("processing CLMSG_CANCEL_BUNDLE");
00174         handle_cancel_bundle(msg.bundle_.object());
00175         break;
00176         
00177     case CLMSG_BREAK_CONTACT:
00178         log_debug("processing CLMSG_BREAK_CONTACT");
00179         break_contact(ContactEvent::USER);
00180         break;
00181     default:
00182         PANIC("invalid CLMsg typecode %d", msg.type_);
00183     }
00184 
00185     // check if we need to unblock the link by clearing the BUSY
00186     // state. note that we only post the BUSY->AVAILABLE event when
00187     // the command queue passes back to be within the threshold to
00188     // limit the number of redundant events posted to the daemon
00189     if (contact_ != NULL && contact_->link()->state() == Link::BUSY)
00190     {
00191         if (cmdqueue_.size() == (params_->busy_queue_depth_ - 1)) {
00192             BundleDaemon::post(
00193                 new LinkStateChangeRequest(contact_->link(),
00194                                            Link::AVAILABLE,
00195                                            ContactEvent::UNBLOCKED));
00196         }
00197     }
00198 }
00199 
00200 //----------------------------------------------------------------------
00201 void
00202 CLConnection::contact_up()
00203 {
00204     log_debug("contact_up");
00205     ASSERT(contact_ != NULL);
00206     
00207     ASSERT(!contact_up_);
00208     contact_up_ = true;
00209     
00210     BundleDaemon::post(new ContactUpEvent(contact_));
00211 }
00212 
00213 //----------------------------------------------------------------------
00214 void
00215 CLConnection::break_contact(ContactEvent::reason_t reason)
00216 {
00217     log_debug("break_contact: %s", ContactEvent::reason_to_str(reason));
00218 
00219     if (reason != ContactEvent::BROKEN) {
00220         disconnect();
00221     }
00222 
00223     contact_broken_ = true;
00224     
00225     // if the connection isn't being closed by the user, we need to
00226     // notify the daemon that either the contact ended or the link
00227     // became unavailable before a contact began.
00228     //
00229     // we need to check that there is in fact a contact, since a
00230     // connection may be accepted and then break before establishing a
00231     // contact
00232     if ((reason != ContactEvent::USER) && (contact_ != NULL)) {
00233         BundleDaemon::post(
00234             new LinkStateChangeRequest(contact_->link(),
00235                                        Link::CLOSED,
00236                                        reason));
00237     }
00238 }
00239 
00240 //----------------------------------------------------------------------
00241 void
00242 CLConnection::close_contact()
00243 {
00244     LinkParams* params = dynamic_cast<LinkParams*>(contact_->link()->cl_info());
00245     ASSERT(params != NULL);
00246     
00247     // drain the inflight queue, posting transmitted or transmit
00248     // failed events
00249     while (! inflight_.empty()) {
00250         InFlightBundle* inflight = inflight_.front();
00251 
00252         // make sure the payload file is closed
00253         ASSERT(inflight->bundle_.object() != NULL);
00254         inflight->bundle_->payload_.close_file();
00255 
00256         size_t sent_bytes  = inflight->sent_data_.num_contiguous();
00257         size_t acked_bytes = inflight->ack_data_.num_contiguous();
00258         
00259         if ((! params->reactive_frag_enabled_) ||
00260             (sent_bytes == 0) ||
00261             (contact_->link()->is_reliable() && acked_bytes == 0))
00262         {
00263             log_debug("posting transmission failed event "
00264                       "(reactive fragmentation %s, %s link, acked_bytes %zu)",
00265                       params->reactive_frag_enabled_ ? "enabled" : "disabled",
00266                       contact_->link()->is_reliable() ? "reliable" : "unreliable",
00267                       acked_bytes);
00268             
00269             BundleDaemon::post(
00270                 new BundleTransmitFailedEvent(inflight->bundle_.object(),
00271                                               contact_));
00272             
00273         } else {
00274             // XXX/demmer the reactive fragmentation code needs to be
00275             // fixed to include the header bytes
00276             sent_bytes  -= inflight->header_block_length_;
00277             if (acked_bytes > inflight->header_block_length_) {
00278                 acked_bytes -= inflight->header_block_length_;
00279             } else {
00280                 acked_bytes = 0;
00281             }
00282             
00283             BundleDaemon::post(
00284                 new BundleTransmittedEvent(inflight->bundle_.object(),
00285                                            contact_,
00286                                            sent_bytes, acked_bytes));
00287         }
00288 
00289         inflight_.pop_front();
00290     }
00291 
00292     // check the tail of the incoming queue to see if there's a
00293     // partially-received bundle that we need to post a received event
00294     // for (if reactive fragmentation is enabled)
00295     if (! incoming_.empty()) {
00296         IncomingBundle* incoming = incoming_.back();
00297             
00298         size_t rcvd_len = incoming->rcvd_data_.last() + 1;
00299         
00300         if ((incoming->total_length_ == 0) && 
00301             params->reactive_frag_enabled_ &&
00302             (rcvd_len > incoming->header_block_length_))
00303         {
00304             log_debug("partial arrival of bundle: "
00305                       "got %zu bytes [hdr %zu payload %zu]",
00306                       rcvd_len, incoming->header_block_length_,
00307                       incoming->bundle_->payload_.length());
00308             
00309             // XXX/demmer need to fix the fragmentation code to assume the
00310             // event includes the header bytes as well as the payload.
00311             
00312             size_t payload_rcvd = rcvd_len - incoming->header_block_length_;
00313             
00314             // make sure the payload file is closed
00315             ASSERT(incoming->bundle_.object() != NULL);
00316             incoming->bundle_->payload_.close_file();
00317             
00318             BundleDaemon::post(
00319                 new BundleReceivedEvent(incoming->bundle_.object(),
00320                                         EVENTSRC_PEER,
00321                                         payload_rcvd));
00322         }
00323     }
00324 
00325     // now drain the incoming queue
00326     while (!incoming_.empty()) {
00327         IncomingBundle* incoming = incoming_.back();
00328         incoming_.pop_back();
00329         delete incoming;
00330     }
00331     
00332     // finally, drain the message queue, posting transmit failed
00333     // events for any send bundle commands that may be in there
00334     // (though this is unlikely to happen)
00335     if (cmdqueue_.size() > 0) {
00336         log_warn("close_contact: %zu CL commands still in queue: ",
00337                  cmdqueue_.size());
00338         
00339         while (cmdqueue_.size() != 0) {
00340             CLMsg msg;
00341             bool ok = cmdqueue_.try_pop(&msg);
00342             ASSERT(ok);
00343 
00344             log_warn("close_contact: %s still in queue", clmsg_to_str(msg.type_));
00345             
00346             if (msg.type_ == CLMSG_SEND_BUNDLE) {
00347                 BundleDaemon::post(
00348                     new BundleTransmitFailedEvent(msg.bundle_.object(),
00349                                                   contact_));
00350             }
00351         }
00352     }
00353 }
00354 
00355 //----------------------------------------------------------------------
00356 void
00357 CLConnection::handle_announce_bundle(Bundle* announce)
00358 {
00359     log_debug("got announce bundle: source eid %s", announce->source_.c_str());
00360 
00361     /*
00362      * Now we may need to find or create an appropriate opportunistic
00363      * link for the connection.
00364      *
00365      * First, we check if there's an idle (i.e. UNAVAILABLE) link to
00366      * the remote eid. We explicitly ignore the nexthop address, since
00367      * that can change (due to things like TCP/UDP port number
00368      * assignment), but we pass in the remote eid to match for a link.
00369      *
00370      * If we can't find one, then we create a new opportunistic link
00371      * for the connection.
00372      */
00373     if (contact_ == NULL) {
00374 
00375         ASSERT(nexthop_ != ""); // the derived class must have set the
00376                                 // nexthop in the constructor
00377         
00378         ContactManager* cm = BundleDaemon::instance()->contactmgr();
00379 
00380         Link* link = cm->find_link_to(cl_, "", announce->source_,
00381                                       Link::OPPORTUNISTIC,
00382                                       Link::AVAILABLE | Link::UNAVAILABLE);
00383 
00384         if (link != NULL) {
00385             link->set_nexthop(nexthop_);
00386             log_debug("found idle opportunistic link *%p", link);
00387             
00388         } else {
00389             link = cm->new_opportunistic_link(cl_,
00390                                               nexthop_.c_str(),
00391                                               announce->source_);
00392             log_debug("created new opportunistic link *%p", link);
00393         }
00394         
00395         ASSERT(! link->isopen());
00396 
00397         contact_ = new Contact(link);
00398         contact_->set_cl_info(this);
00399         link->set_contact(contact_.object());
00400 
00401         /*
00402          * Now that the connection is established, we swing the
00403          * params_ pointer to those of the link, since there's a
00404          * chance they've been modified by the user in the past.
00405          */
00406         LinkParams* lparams = dynamic_cast<LinkParams*>(link->cl_info());
00407         ASSERT(lparams != NULL);
00408         params_ = lparams;
00409     }
00410 }
00411 
00412 
00413 } // namespace dtn

Generated on Fri Dec 22 14:47:58 2006 for DTN Reference Implementation by  doxygen 1.5.1