ConnectionConvergenceLayer.cc

Go to the documentation of this file.
00001 /*
00002  * IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. By
00003  * downloading, copying, installing or using the software you agree to
00004  * this license. If you do not agree to this license, do not download,
00005  * install, copy or use the software.
00006  * 
00007  * Intel Open Source License 
00008  * 
00009  * Copyright (c) 2006 Intel Corporation. All rights reserved. 
00010  * 
00011  * Redistribution and use in source and binary forms, with or without
00012  * modification, are permitted provided that the following conditions are
00013  * met:
00014  * 
00015  *   Redistributions of source code must retain the above copyright
00016  *   notice, this list of conditions and the following disclaimer.
00017  * 
00018  *   Redistributions in binary form must reproduce the above copyright
00019  *   notice, this list of conditions and the following disclaimer in the
00020  *   documentation and/or other materials provided with the distribution.
00021  * 
00022  *   Neither the name of the Intel Corporation nor the names of its
00023  *   contributors may be used to endorse or promote products derived from
00024  *   this software without specific prior written permission.
00025  *  
00026  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00027  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00028  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
00029  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE INTEL OR
00030  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00031  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00032  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00033  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00034  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00035  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00036  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00037  */
00038 
00039 #include <oasys/util/OptParser.h>
00040 
00041 #include "ConnectionConvergenceLayer.h"
00042 #include "CLConnection.h"
00043 #include "bundling/BundleDaemon.h"
00044 
00045 namespace dtn {
00046 
00047 //----------------------------------------------------------------------
00048 ConnectionConvergenceLayer::LinkParams::LinkParams(bool init_defaults)
00049     : busy_queue_depth_(10),
00050       reactive_frag_enabled_(true),
00051       sendbuf_len_(32768),
00052       recvbuf_len_(32768),
00053       data_timeout_(30000), // msec
00054       test_read_delay_(0),
00055       test_write_delay_(0),
00056       test_recv_delay_(0)
00057 {
00058     (void)init_defaults;
00059 }
00060 
00061 //----------------------------------------------------------------------
00062 ConnectionConvergenceLayer::ConnectionConvergenceLayer(const char* classname,
00063                                                        const char* cl_name)
00064     : ConvergenceLayer(classname, cl_name)
00065 {
00066 }
00067 
00068 //----------------------------------------------------------------------
00069 bool
00070 ConnectionConvergenceLayer::parse_link_params(LinkParams* params,
00071                                               int argc, const char** argv,
00072                                               const char** invalidp)
00073 {
00074     oasys::OptParser p;
00075     
00076     p.addopt(new oasys::UIntOpt("busy_queue_depth",
00077                                 &params->busy_queue_depth_));    
00078     p.addopt(new oasys::BoolOpt("reactive_frag_enabled",
00079                                 &params->reactive_frag_enabled_));
00080     p.addopt(new oasys::UIntOpt("sendbuf_len", &params->sendbuf_len_));
00081     p.addopt(new oasys::UIntOpt("recvbuf_len", &params->recvbuf_len_));
00082     p.addopt(new oasys::UIntOpt("data_timeout", &params->data_timeout_));
00083     
00084     p.addopt(new oasys::UIntOpt("test_read_delay",
00085                                 &params->test_read_delay_));
00086     p.addopt(new oasys::UIntOpt("test_write_delay",
00087                                 &params->test_write_delay_));
00088     p.addopt(new oasys::UIntOpt("test_recv_delay",
00089                                 &params->test_recv_delay_));
00090     
00091     if (! p.parse(argc, argv, invalidp)) {
00092         return false;
00093     }
00094     
00095     if (params->sendbuf_len_ == 0) {
00096         *invalidp = "sendbuf_len must not be zero";
00097         return false;
00098     }
00099 
00100     if (params->recvbuf_len_ == 0) {
00101         *invalidp = "recvbuf_len must not be zero";
00102         return false;
00103     }
00104     
00105     return true;
00106 }
00107 
00108 //----------------------------------------------------------------------
00109 void
00110 ConnectionConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00111 {
00112     LinkParams* params = dynamic_cast<LinkParams*>(link->cl_info());
00113     ASSERT(params != NULL);
00114     
00115     buf->appendf("busy_queue_depth: %u\n", params->busy_queue_depth_);
00116     buf->appendf("reactive_frag_enabled: %u\n", params->reactive_frag_enabled_);
00117     buf->appendf("sendbuf_len: %u\n", params->sendbuf_len_);
00118     buf->appendf("recvbuf_len: %u\n", params->recvbuf_len_);
00119     buf->appendf("data_timeout: %u\n", params->data_timeout_);
00120     buf->appendf("test_read_delay: %u\n", params->test_read_delay_);
00121     buf->appendf("test_write_delay: %u\n", params->test_write_delay_);
00122     buf->appendf("test_recv_delay: %u\n",params->test_recv_delay_);
00123 }
00124 
00125 //----------------------------------------------------------------------
00126 bool
00127 ConnectionConvergenceLayer::init_link(Link* link, int argc, const char* argv[])
00128 {
00129     log_debug("adding %s link %s", link->type_str(), link->nexthop());
00130 
00131     // Create a new parameters structure, parse the options, and store
00132     // them in the link's cl info slot.
00133     LinkParams* params = new_link_params();
00134 
00135     if (! parse_nexthop(link, params)) {
00136         log_err("error parsing link nexthop '%s'", link->nexthop());
00137         delete params;
00138         return false;
00139     }
00140     
00141     const char* invalid;
00142     if (! parse_link_params(params, argc, argv, &invalid)) {
00143         log_err("error parsing link options: invalid option '%s'", invalid);
00144         delete params;
00145         return false;
00146     }
00147 
00148     if (! finish_init_link(link, params)) {
00149         log_err("error in finish_init_link");
00150         delete params;
00151         return false;
00152     }
00153 
00154     link->set_cl_info(params);
00155 
00156     return true;
00157 }
00158 
00159 //----------------------------------------------------------------------
00160 bool
00161 ConnectionConvergenceLayer::finish_init_link(Link* link, LinkParams* params)
00162 {
00163     (void)link;
00164     (void)params;
00165     return true;
00166 }
00167 
00168 //----------------------------------------------------------------------
00169 bool
00170 ConnectionConvergenceLayer::reconfigure_link(Link* link,
00171                                              int argc, const char* argv[])
00172 {
00173     LinkParams* params = dynamic_cast<LinkParams*>(link->cl_info());
00174     ASSERT(params != NULL);
00175     
00176     const char* invalid;
00177     if (! parse_link_params(params, argc, argv, &invalid)) {
00178         log_err("reconfigure_link: invalid parameter %s", invalid);
00179         return false;
00180     }
00181 
00182     if (link->isopen()) {
00183         LinkParams* params = dynamic_cast<LinkParams*>(link->cl_info());
00184         ASSERT(params != NULL);
00185         
00186         CLConnection* conn = dynamic_cast<CLConnection*>(link->contact()->cl_info());
00187         ASSERT(conn != NULL);
00188         
00189         if ((params->sendbuf_len_ != conn->sendbuf_.size()) &&
00190             (params->sendbuf_len_ >= conn->sendbuf_.fullbytes()))
00191         {
00192             log_info("resizing link *%p send buffer from %zu -> %u",
00193                      link, conn->sendbuf_.size(), params->sendbuf_len_);
00194             conn->sendbuf_.set_size(params->sendbuf_len_);
00195         }
00196 
00197         if ((params->recvbuf_len_ != conn->recvbuf_.size()) &&
00198             (params->recvbuf_len_ >= conn->recvbuf_.fullbytes()))
00199         {
00200             log_info("resizing link *%p recv buffer from %zu -> %u",
00201                      link, conn->recvbuf_.size(), params->recvbuf_len_);
00202             conn->recvbuf_.set_size(params->recvbuf_len_);
00203         }
00204     }
00205 
00206     return true;
00207 }
00208 
00209 //----------------------------------------------------------------------
00210 bool
00211 ConnectionConvergenceLayer::open_contact(const ContactRef& contact)
00212 {
00213     Link* link = contact->link();
00214     log_debug("opening contact on link *%p", link);
00215     
00216     LinkParams* params = dynamic_cast<LinkParams*>(link->cl_info());
00217     ASSERT(params != NULL);
00218     
00219     // create a new connection for the contact, set up to use the
00220     // link's configured parameters
00221     CLConnection* conn = new_connection(params);
00222     conn->set_contact(contact);
00223     contact->set_cl_info(conn);
00224     conn->start();
00225 
00226     return true;
00227 }
00228 
00229 //----------------------------------------------------------------------
00230 bool
00231 ConnectionConvergenceLayer::close_contact(const ContactRef& contact)
00232 {
00233     log_info("close_contact *%p", contact.object());
00234 
00235     CLConnection* conn = dynamic_cast<CLConnection*>(contact->cl_info());
00236     ASSERT(conn != NULL);
00237 
00238     // if the connection isn't already broken, then we need to tell it
00239     // to do so
00240     if (! conn->contact_broken_) {
00241         conn->cmdqueue_.push_back(
00242             CLConnection::CLMsg(CLConnection::CLMSG_BREAK_CONTACT));
00243     }
00244     
00245     while (!conn->is_stopped()) {
00246         log_debug("waiting for connection thread to stop...");
00247         usleep(100000);
00248         oasys::Thread::yield();
00249     }
00250 
00251     conn->close_contact();
00252     delete conn;
00253 
00254     contact->set_cl_info(NULL);
00255 
00256     return true;
00257 }
00258 
00259 //----------------------------------------------------------------------
00260 void
00261 ConnectionConvergenceLayer::send_bundle(const ContactRef& contact,
00262                                         Bundle* bundle)
00263 {
00264     log_debug("send_bundle *%p to *%p", bundle, contact.object());
00265 
00266     CLConnection* conn = dynamic_cast<CLConnection*>(contact->cl_info());
00267     ASSERT(conn != NULL);
00268 
00269     LinkParams* params = dynamic_cast<LinkParams*>(contact->link()->cl_info());
00270     ASSERT(params != NULL);
00271     
00272     conn->cmdqueue_.push_back(
00273         CLConnection::CLMsg(CLConnection::CLMSG_SEND_BUNDLE, bundle));
00274     
00275     // to prevent the queue from filling up, set the busy state to
00276     // apply backpressure
00277     if (conn->cmdqueue_.size() >= params->busy_queue_depth_)
00278     {
00279         contact->link()->set_state(Link::BUSY);
00280     }
00281 }
00282 
00283 } // namespace dtn

Generated on Fri Dec 22 14:47:58 2006 for DTN Reference Implementation by  doxygen 1.5.1