UDPConvergenceLayer.cc

Go to the documentation of this file.
00001 /*
00002  * IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. By
00003  * downloading, copying, installing or using the software you agree to
00004  * this license. If you do not agree to this license, do not download,
00005  * install, copy or use the software.
00006  * 
00007  * Intel Open Source License 
00008  * 
00009  * Copyright (c) 2004 Intel Corporation. All rights reserved. 
00010  * 
00011  * Redistribution and use in source and binary forms, with or without
00012  * modification, are permitted provided that the following conditions are
00013  * met:
00014  * 
00015  *   Redistributions of source code must retain the above copyright
00016  *   notice, this list of conditions and the following disclaimer.
00017  * 
00018  *   Redistributions in binary form must reproduce the above copyright
00019  *   notice, this list of conditions and the following disclaimer in the
00020  *   documentation and/or other materials provided with the distribution.
00021  * 
00022  *   Neither the name of the Intel Corporation nor the names of its
00023  *   contributors may be used to endorse or promote products derived from
00024  *   this software without specific prior written permission.
00025  *  
00026  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00027  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00028  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
00029  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE INTEL OR
00030  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00031  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00032  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00033  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00034  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00035  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00036  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00037  */
00038 #include <sys/poll.h>
00039 
00040 #include <oasys/io/NetUtils.h>
00041 #include <oasys/thread/Timer.h>
00042 #include <oasys/util/OptParser.h>
00043 #include <oasys/util/StringBuffer.h>
00044 #include <oasys/util/URL.h>
00045 
00046 #include "UDPConvergenceLayer.h"
00047 #include "bundling/Bundle.h"
00048 #include "bundling/BundleEvent.h"
00049 #include "bundling/BundleDaemon.h"
00050 #include "bundling/BundleList.h"
00051 #include "bundling/BundleProtocol.h"
00052 
00053 namespace dtn {
00054 
00055 struct UDPConvergenceLayer::Params UDPConvergenceLayer::defaults_;
00056 
00057 
00058 //----------------------------------------------------------------------
00059 UDPConvergenceLayer::UDPConvergenceLayer()
00060     : IPConvergenceLayer("UDPConvergenceLayer", "udp")
00061 {
00062     defaults_.local_addr_               = INADDR_ANY;
00063     defaults_.local_port_               = 5000;
00064     defaults_.remote_addr_              = INADDR_NONE;
00065     defaults_.remote_port_              = 0;
00066     defaults_.rate_                     = 0; // unlimited
00067     defaults_.bucket_depth_             = 0; // default
00068 }
00069 
00070 //----------------------------------------------------------------------
00071 bool
00072 UDPConvergenceLayer::parse_params(Params* params,
00073                                   int argc, const char** argv,
00074                                   const char** invalidp)
00075 {
00076     oasys::OptParser p;
00077 
00078     p.addopt(new oasys::InAddrOpt("local_addr", &params->local_addr_));
00079     p.addopt(new oasys::UInt16Opt("local_port", &params->local_port_));
00080     p.addopt(new oasys::InAddrOpt("remote_addr", &params->remote_addr_));
00081     p.addopt(new oasys::UInt16Opt("remote_port", &params->remote_port_));
00082     p.addopt(new oasys::UIntOpt("rate", &params->rate_));
00083     p.addopt(new oasys::UIntOpt("bucket_depth_", &params->bucket_depth_));
00084 
00085     if (! p.parse(argc, argv, invalidp)) {
00086         return false;
00087     }
00088 
00089     return true;
00090 };
00091 
00092 //----------------------------------------------------------------------
00093 bool
00094 UDPConvergenceLayer::interface_up(Interface* iface,
00095                                   int argc, const char* argv[])
00096 {
00097     log_debug("adding interface %s", iface->name().c_str());
00098     
00099     // parse options (including overrides for the local_addr and
00100     // local_port settings from the defaults)
00101     Params params = UDPConvergenceLayer::defaults_;
00102     const char* invalid;
00103     if (!parse_params(&params, argc, argv, &invalid)) {
00104         log_err("error parsing interface options: invalid option '%s'",
00105                 invalid);
00106         return false;
00107     }
00108 
00109     // check that the local interface / port are valid
00110     if (params.local_addr_ == INADDR_NONE) {
00111         log_err("invalid local address setting of 0");
00112         return false;
00113     }
00114 
00115     if (params.local_port_ == 0) {
00116         log_err("invalid local port setting of 0");
00117         return false;
00118     }
00119     
00120     // create a new server socket for the requested interface
00121     Receiver* receiver = new Receiver(&params);
00122     receiver->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
00123     
00124     if (receiver->bind(params.local_addr_, params.local_port_) != 0) {
00125         return false; // error log already emitted
00126     }
00127     
00128     // check if the user specified a remote addr/port to connect to
00129     if (params.remote_addr_ != INADDR_NONE) {
00130         if (receiver->connect(params.remote_addr_, params.remote_port_) != 0) {
00131             return false; // error log already emitted
00132         }
00133     }
00134     
00135     // start the thread which automatically listens for data
00136     receiver->start();
00137     
00138     // store the new listener object in the cl specific portion of the
00139     // interface
00140     iface->set_cl_info(receiver);
00141     
00142     return true;
00143 }
00144 
00145 //----------------------------------------------------------------------
00146 bool
00147 UDPConvergenceLayer::interface_down(Interface* iface)
00148 {
00149     // grab the listener object, set a flag for the thread to stop and
00150     // then close the socket out from under it, which should cause the
00151     // thread to break out of the blocking call to accept() and
00152     // terminate itself
00153     Receiver* receiver = (Receiver*)iface->cl_info();
00154     receiver->set_should_stop();
00155     receiver->interrupt_from_io();
00156     
00157     while (! receiver->is_stopped()) {
00158         oasys::Thread::yield();
00159     }
00160 
00161     delete receiver;
00162     return true;
00163 }
00164 
00165 //----------------------------------------------------------------------
00166 void
00167 UDPConvergenceLayer::dump_interface(Interface* iface,
00168                                     oasys::StringBuffer* buf)
00169 {
00170     Params* params = &((Receiver*)iface->cl_info())->params_;
00171     
00172     buf->appendf("\tlocal_addr: %s local_port: %d\n",
00173                  intoa(params->local_addr_), params->local_port_);
00174     
00175     if (params->remote_addr_ != INADDR_NONE) {
00176         buf->appendf("\tconnected remote_addr: %s remote_port: %d\n",
00177                      intoa(params->remote_addr_), params->remote_port_);
00178     } else {
00179         buf->appendf("\tnot connected\n");
00180     }
00181 }
00182 
00183 //----------------------------------------------------------------------
00184 bool
00185 UDPConvergenceLayer::init_link(Link* link, int argc, const char* argv[])
00186 {
00187     in_addr_t addr;
00188     u_int16_t port = 0;
00189     
00190     log_debug("adding %s link %s", link->type_str(), link->nexthop());
00191 
00192     // validate the link next hop address
00193     if (! IPConvergenceLayer::parse_nexthop(link->nexthop(), &addr, &port)) {
00194         log_err("invalid next hop address '%s'", link->nexthop());
00195         return false;
00196     }
00197 
00198     // make sure it's really a valid address
00199     if (addr == INADDR_ANY || addr == INADDR_NONE) {
00200         log_err("invalid host in next hop address '%s'", link->nexthop());
00201         return false;
00202     }
00203     
00204     // make sure the port was specified
00205     if (port == 0) {
00206         log_err("port not specified in next hop address '%s'",
00207                 link->nexthop());
00208         return false;
00209     }
00210 
00211     // Create a new parameters structure, parse the options, and store
00212     // them in the link's cl info slot
00213     Params* params = new Params(defaults_);
00214     params->local_addr_ = INADDR_NONE;
00215     params->local_port_ = 0;
00216 
00217     const char* invalid;
00218     if (! parse_params(params, argc, argv, &invalid)) {
00219         log_err("error parsing link options: invalid option '%s'", invalid);
00220         delete params;
00221         return false;
00222     }
00223 
00224     if (link->params().mtu_ > MAX_BUNDLE_LEN) {
00225         log_err("error parsing link options: mtu %d > maximum %d",
00226                 link->params().mtu_, MAX_BUNDLE_LEN);
00227         delete params;
00228         return false;
00229     }
00230 
00231     link->set_cl_info(params);
00232     return true;
00233 }
00234 
00235 //----------------------------------------------------------------------
00236 void
00237 UDPConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00238 {
00239     Params* params = (Params*)link->cl_info();
00240     
00241     buf->appendf("\tlocal_addr: %s local_port: %d\n",
00242                  intoa(params->local_addr_), params->local_port_);
00243 
00244     buf->appendf("\tremote_addr: %s remote_port: %d\n",
00245                  intoa(params->remote_addr_), params->remote_port_);
00246 }
00247 
00248 //----------------------------------------------------------------------
00249 bool
00250 UDPConvergenceLayer::open_contact(const ContactRef& contact)
00251 {
00252     in_addr_t addr;
00253     u_int16_t port;
00254 
00255     Link* link = contact->link();
00256     log_debug("opening contact for link *%p", link);
00257     
00258     // parse out the address / port from the nexthop address. note
00259     // that these should have been validated in init_link() above, so
00260     // we ASSERT as such
00261     bool valid = parse_nexthop(link->nexthop(), &addr, &port);
00262     ASSERT(valid == true);
00263     ASSERT(addr != INADDR_NONE && addr != INADDR_ANY);
00264     ASSERT(port != 0);
00265     
00266     Params* params = (Params*)link->cl_info();
00267 
00268     // create a new sender structure
00269     Sender* sender = new Sender(link->contact());
00270 
00271     if (!sender->init(params, addr, port)) {
00272         log_err("error initializing contact");
00273         BundleDaemon::post(
00274             new LinkStateChangeRequest(link, Link::UNAVAILABLE,
00275                                        ContactEvent::NO_INFO));
00276         delete sender;
00277         return false;
00278     }
00279         
00280     contact->set_cl_info(sender);
00281     BundleDaemon::post(new ContactUpEvent(link->contact()));
00282     
00283     return true;
00284 }
00285 
00286 //----------------------------------------------------------------------
00287 bool
00288 UDPConvergenceLayer::close_contact(const ContactRef& contact)
00289 {
00290     Sender* sender = (Sender*)contact->cl_info();
00291     
00292     log_info("close_contact *%p", contact.object());
00293 
00294     if (sender) {
00295         delete sender;
00296         contact->set_cl_info(NULL);
00297     }
00298     
00299     return true;
00300 }
00301 
00302 //----------------------------------------------------------------------
00303 void
00304 UDPConvergenceLayer::send_bundle(const ContactRef& contact, Bundle* bundle)
00305 {
00306     Sender* sender = (Sender*)contact->cl_info();
00307     if (!sender) {
00308         log_crit("send_bundles called on contact *%p with no Sender!!",
00309                  contact.object());
00310         return;
00311     }
00312     ASSERT(contact == sender->contact_);
00313 
00314     bool ok = sender->send_bundle(bundle); // consumes bundle reference
00315 
00316     if (ok) {
00317         BundleDaemon::post(
00318             new BundleTransmittedEvent(bundle, contact,
00319                                        bundle->payload_.length(),
00320                                        0));
00321     } else {
00322         BundleDaemon::post(
00323             new BundleTransmitFailedEvent(bundle, contact));
00324     }
00325 }
00326 
00327 //----------------------------------------------------------------------
00328 UDPConvergenceLayer::Receiver::Receiver(UDPConvergenceLayer::Params* params)
00329     : IOHandlerBase(new oasys::Notifier("/dtn/cl/udp/receiver")),
00330       UDPClient("/dtn/cl/udp/receiver"),
00331       Thread("UDPConvergenceLayer::Receiver")
00332 {
00333     logfd_  = false;
00334     params_ = *params;
00335 }
00336 
00337 //----------------------------------------------------------------------
00338 void
00339 UDPConvergenceLayer::Receiver::process_data(u_char* bp, size_t len)
00340 {
00341     Bundle* bundle = NULL;       
00342     int header_len;
00343     
00344     // parse the headers into a new bundle. this sets the payload_len
00345     // appropriately in the new bundle and returns the number of bytes
00346     // consumed for the bundle headers
00347     bundle = new Bundle();
00348     header_len = BundleProtocol::parse_header_blocks(bundle, bp, len);
00349 
00350     if (header_len < 0) {
00351         log_err("process_data: invalid or too short bundle header");
00352         delete bundle;
00353         return;
00354     }
00355     
00356     size_t payload_len = bundle->payload_.length();
00357 
00358     if (len != header_len + payload_len) {
00359         log_err("process_data: error in bundle lengths: "
00360                 "bundle_length %zu, header_length %d, payload_length %zu",
00361                 len, header_len, payload_len);
00362         delete bundle;
00363         return;
00364     }
00365 
00366     // store the payload and notify the daemon
00367     bundle->payload_.set_data(bp + header_len, payload_len);
00368     
00369     log_debug("process_data: new bundle id %d arrival, payload length %zu",
00370               bundle->bundleid_, bundle->payload_.length());
00371     
00372     BundleDaemon::post(
00373         new BundleReceivedEvent(bundle, EVENTSRC_PEER, payload_len));
00374 }
00375 
00376 //----------------------------------------------------------------------
00377 void
00378 UDPConvergenceLayer::Receiver::run()
00379 {
00380     int ret;
00381     in_addr_t addr;
00382     u_int16_t port;
00383     u_char buf[MAX_UDP_PACKET];
00384 
00385     while (1) {
00386         if (should_stop())
00387             break;
00388         
00389         ret = recvfrom((char*)buf, MAX_UDP_PACKET, 0, &addr, &port);
00390         if (ret <= 0) {   
00391             if (errno == EINTR) {
00392                 continue;
00393             }
00394             log_err("error in recvfrom(): %d %s",
00395                     errno, strerror(errno));
00396             close();
00397             break;
00398         }
00399         
00400         log_debug("got %d byte packet from %s:%d",
00401                   ret, intoa(addr), port);               
00402         process_data(buf, ret);
00403     }
00404 }
00405 
00406 //----------------------------------------------------------------------
00407 UDPConvergenceLayer::Sender::Sender(const ContactRef& contact)
00408     : Logger("UDPConvergenceLayer::Sender",
00409              "/dtn/cl/udp/sender/%p", this),
00410       socket_(logpath_),
00411       rate_socket_(logpath_, 0, 0),
00412       contact_(contact.object(), "UDPCovergenceLayer::Sender")
00413 {
00414 }
00415 
00416 //----------------------------------------------------------------------
00417 bool
00418 UDPConvergenceLayer::Sender::init(Params* params,
00419                                   in_addr_t addr, u_int16_t port)
00420     
00421 {
00422     log_debug("initializing sender");
00423 
00424     params_ = params;
00425     
00426     socket_.logpathf("%s/conn/%s:%d", logpath_, intoa(addr), port);
00427     socket_.set_logfd(false);
00428 
00429     if (params->local_addr_ != INADDR_NONE || params->local_port_ != 0)
00430     {
00431         if (socket_.bind(params->local_addr_, params->local_port_) != 0) {
00432             log_err("error binding to %s:%d: %s",
00433                     intoa(params->local_addr_), params->local_port_,
00434                     strerror(errno));
00435             return false;
00436         }
00437     }
00438     
00439     if (socket_.connect(addr, port) != 0) {
00440         log_err("error issuing udp connect to %s:%d: %s",
00441                 intoa(addr), port, strerror(errno));
00442         return false;
00443     }
00444 
00445     if (params->rate_ != 0) {
00446         rate_socket_.bucket()->set_rate(params->rate_);
00447 
00448         if (params->bucket_depth_ != 0) {
00449             rate_socket_.bucket()->set_depth(params->bucket_depth_);
00450         }
00451         
00452         log_debug("initialized rate controller: rate %u depth %u",
00453                   rate_socket_.bucket()->rate(),
00454                   rate_socket_.bucket()->depth());
00455     }
00456 
00457     return true;
00458 }
00459     
00460 //----------------------------------------------------------------------
00461 bool 
00462 UDPConvergenceLayer::Sender::send_bundle(Bundle* bundle)
00463 {
00464     int header_len;
00465 
00466     size_t formatted_len = BundleProtocol::formatted_length(bundle);
00467     if (formatted_len > UDPConvergenceLayer::MAX_BUNDLE_LEN) {
00468         log_err("send_bundle: bundle too big (%zu > %zu)",
00469                 formatted_len, UDPConvergenceLayer::MAX_BUNDLE_LEN);
00470         return false;
00471     }
00472         
00473     size_t payload_len = bundle->payload_.length();
00474 
00475     // stuff in the bundle headers
00476     header_len =
00477         BundleProtocol::format_header_blocks(bundle, buf_, sizeof(buf_));
00478     if (header_len < 0) {
00479         log_err("send_bundle: bundle header too big for buffer (len %zu)",
00480                 sizeof(buf_));
00481         return false;
00482     }
00483 
00484     // check that the payload isn't too big (it should have been
00485     // fragmented by the higher layers)
00486     if (payload_len > (sizeof(buf_) - header_len)) {
00487         log_err("send_bundle: bundle payload + headers (length %zu) too big",
00488                 header_len + payload_len);
00489         return false;
00490     }
00491 
00492     // read the payload data into the buffer
00493     bundle->payload_.read_data(0, payload_len, &buf_[header_len],
00494                                BundlePayload::FORCE_COPY);
00495 
00496     // write it out the socket and make sure we wrote it all
00497     int cc = socket_.write((char*)buf_, header_len + payload_len);
00498     if (cc == (int)(header_len + payload_len)) {
00499         log_info("send_bundle: successfully sent bundle length %d", cc);
00500         return true;
00501     } else {
00502         
00503         log_err("send_bundle: error sending bundle (wrote %d/%zu): %s",
00504                 cc, (header_len + payload_len), strerror(errno));
00505         return false;
00506     }
00507 }
00508 
00509 } // namespace dtn

Generated on Fri Dec 22 14:48:01 2006 for DTN Reference Implementation by  doxygen 1.5.1