ConnectionConvergenceLayer.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 
00018 #include <oasys/util/OptParser.h>
00019 
00020 #include "ConnectionConvergenceLayer.h"
00021 #include "CLConnection.h"
00022 #include "bundling/BundleDaemon.h"
00023 
00024 namespace dtn {
00025 
00026 //----------------------------------------------------------------------
00027 ConnectionConvergenceLayer::LinkParams::LinkParams(bool init_defaults)
00028     : busy_queue_depth_(10),
00029       reactive_frag_enabled_(true),
00030       sendbuf_len_(32768),
00031       recvbuf_len_(32768),
00032       data_timeout_(30000), // msec
00033       test_read_delay_(0),
00034       test_write_delay_(0),
00035       test_recv_delay_(0)
00036 {
00037     (void)init_defaults;
00038 }
00039 
00040 //----------------------------------------------------------------------
00041 ConnectionConvergenceLayer::ConnectionConvergenceLayer(const char* classname,
00042                                                        const char* cl_name)
00043     : ConvergenceLayer(classname, cl_name)
00044 {
00045 }
00046 
00047 //----------------------------------------------------------------------
00048 bool
00049 ConnectionConvergenceLayer::parse_link_params(LinkParams* params,
00050                                               int argc, const char** argv,
00051                                               const char** invalidp)
00052 {
00053     oasys::OptParser p;
00054     
00055     p.addopt(new oasys::UIntOpt("busy_queue_depth",
00056                                 &params->busy_queue_depth_));    
00057     p.addopt(new oasys::BoolOpt("reactive_frag_enabled",
00058                                 &params->reactive_frag_enabled_));
00059     p.addopt(new oasys::UIntOpt("sendbuf_len", &params->sendbuf_len_));
00060     p.addopt(new oasys::UIntOpt("recvbuf_len", &params->recvbuf_len_));
00061     p.addopt(new oasys::UIntOpt("data_timeout", &params->data_timeout_));
00062     
00063     p.addopt(new oasys::UIntOpt("test_read_delay",
00064                                 &params->test_read_delay_));
00065     p.addopt(new oasys::UIntOpt("test_write_delay",
00066                                 &params->test_write_delay_));
00067     p.addopt(new oasys::UIntOpt("test_recv_delay",
00068                                 &params->test_recv_delay_));
00069     
00070     if (! p.parse(argc, argv, invalidp)) {
00071         return false;
00072     }
00073     
00074     if (params->sendbuf_len_ == 0) {
00075         *invalidp = "sendbuf_len must not be zero";
00076         return false;
00077     }
00078 
00079     if (params->recvbuf_len_ == 0) {
00080         *invalidp = "recvbuf_len must not be zero";
00081         return false;
00082     }
00083     
00084     return true;
00085 }
00086 
00087 //----------------------------------------------------------------------
00088 void
00089 ConnectionConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00090 {
00091     LinkParams* params = dynamic_cast<LinkParams*>(link->cl_info());
00092     ASSERT(params != NULL);
00093     
00094     buf->appendf("busy_queue_depth: %u\n", params->busy_queue_depth_);
00095     buf->appendf("reactive_frag_enabled: %u\n", params->reactive_frag_enabled_);
00096     buf->appendf("sendbuf_len: %u\n", params->sendbuf_len_);
00097     buf->appendf("recvbuf_len: %u\n", params->recvbuf_len_);
00098     buf->appendf("data_timeout: %u\n", params->data_timeout_);
00099     buf->appendf("test_read_delay: %u\n", params->test_read_delay_);
00100     buf->appendf("test_write_delay: %u\n", params->test_write_delay_);
00101     buf->appendf("test_recv_delay: %u\n",params->test_recv_delay_);
00102 }
00103 
00104 //----------------------------------------------------------------------
00105 bool
00106 ConnectionConvergenceLayer::init_link(Link* link, int argc, const char* argv[])
00107 {
00108     log_debug("adding %s link %s", link->type_str(), link->nexthop());
00109 
00110     // Create a new parameters structure, parse the options, and store
00111     // them in the link's cl info slot.
00112     LinkParams* params = new_link_params();
00113 
00114     // Try to parse the link's next hop, but continue on even if the
00115     // parse fails since the hostname may not be resolvable when we
00116     // initialize the link. Each subclass is responsible for
00117     // re-checking when opening the link.
00118     parse_nexthop(link, params);
00119     
00120     const char* invalid;
00121     if (! parse_link_params(params, argc, argv, &invalid)) {
00122         log_err("error parsing link options: invalid option '%s'", invalid);
00123         delete params;
00124         return false;
00125     }
00126 
00127     if (! finish_init_link(link, params)) {
00128         log_err("error in finish_init_link");
00129         delete params;
00130         return false;
00131     }
00132 
00133     link->set_cl_info(params);
00134 
00135     return true;
00136 }
00137 
00138 //----------------------------------------------------------------------
00139 bool
00140 ConnectionConvergenceLayer::finish_init_link(Link* link, LinkParams* params)
00141 {
00142     (void)link;
00143     (void)params;
00144     return true;
00145 }
00146 
00147 //----------------------------------------------------------------------
00148 bool
00149 ConnectionConvergenceLayer::reconfigure_link(Link* link,
00150                                              int argc, const char* argv[])
00151 {
00152     LinkParams* params = dynamic_cast<LinkParams*>(link->cl_info());
00153     ASSERT(params != NULL);
00154     
00155     const char* invalid;
00156     if (! parse_link_params(params, argc, argv, &invalid)) {
00157         log_err("reconfigure_link: invalid parameter %s", invalid);
00158         return false;
00159     }
00160 
00161     if (link->isopen()) {
00162         LinkParams* params = dynamic_cast<LinkParams*>(link->cl_info());
00163         ASSERT(params != NULL);
00164         
00165         CLConnection* conn = dynamic_cast<CLConnection*>(link->contact()->cl_info());
00166         ASSERT(conn != NULL);
00167         
00168         if ((params->sendbuf_len_ != conn->sendbuf_.size()) &&
00169             (params->sendbuf_len_ >= conn->sendbuf_.fullbytes()))
00170         {
00171             log_info("resizing link *%p send buffer from %zu -> %u",
00172                      link, conn->sendbuf_.size(), params->sendbuf_len_);
00173             conn->sendbuf_.set_size(params->sendbuf_len_);
00174         }
00175 
00176         if ((params->recvbuf_len_ != conn->recvbuf_.size()) &&
00177             (params->recvbuf_len_ >= conn->recvbuf_.fullbytes()))
00178         {
00179             log_info("resizing link *%p recv buffer from %zu -> %u",
00180                      link, conn->recvbuf_.size(), params->recvbuf_len_);
00181             conn->recvbuf_.set_size(params->recvbuf_len_);
00182         }
00183     }
00184 
00185     return true;
00186 }
00187 
00188 //----------------------------------------------------------------------
00189 bool
00190 ConnectionConvergenceLayer::open_contact(const ContactRef& contact)
00191 {
00192     Link* link = contact->link();
00193     log_debug("opening contact on link *%p", link);
00194     
00195     LinkParams* params = dynamic_cast<LinkParams*>(link->cl_info());
00196     ASSERT(params != NULL);
00197     
00198     // create a new connection for the contact, set up to use the
00199     // link's configured parameters
00200     CLConnection* conn = new_connection(params);
00201     conn->set_contact(contact);
00202     contact->set_cl_info(conn);
00203     conn->start();
00204 
00205     return true;
00206 }
00207 
00208 //----------------------------------------------------------------------
00209 bool
00210 ConnectionConvergenceLayer::close_contact(const ContactRef& contact)
00211 {
00212     log_info("close_contact *%p", contact.object());
00213 
00214     CLConnection* conn = dynamic_cast<CLConnection*>(contact->cl_info());
00215     ASSERT(conn != NULL);
00216 
00217     // if the connection isn't already broken, then we need to tell it
00218     // to do so
00219     if (! conn->contact_broken_) {
00220         conn->cmdqueue_.push_back(
00221             CLConnection::CLMsg(CLConnection::CLMSG_BREAK_CONTACT));
00222     }
00223     
00224     while (!conn->is_stopped()) {
00225         log_debug("waiting for connection thread to stop...");
00226         usleep(100000);
00227         oasys::Thread::yield();
00228     }
00229 
00230     conn->close_contact();
00231     delete conn;
00232 
00233     contact->set_cl_info(NULL);
00234 
00235     return true;
00236 }
00237 
00238 //----------------------------------------------------------------------
00239 void
00240 ConnectionConvergenceLayer::send_bundle(const ContactRef& contact,
00241                                         Bundle* bundle)
00242 {
00243     log_debug("send_bundle *%p to *%p", bundle, contact.object());
00244 
00245     CLConnection* conn = dynamic_cast<CLConnection*>(contact->cl_info());
00246     ASSERT(conn != NULL);
00247 
00248     conn->queue_bundle(bundle);
00249 }
00250 
00251 } // namespace dtn

Generated on Thu Jun 7 16:56:49 2007 for DTN Reference Implementation by  doxygen 1.5.1