UDPConvergenceLayer.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2004-2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 #include <sys/poll.h>
00018 
00019 #include <oasys/io/NetUtils.h>
00020 #include <oasys/thread/Timer.h>
00021 #include <oasys/util/OptParser.h>
00022 #include <oasys/util/StringBuffer.h>
00023 #include <oasys/util/URL.h>
00024 
00025 #include "UDPConvergenceLayer.h"
00026 #include "bundling/Bundle.h"
00027 #include "bundling/BundleEvent.h"
00028 #include "bundling/BundleDaemon.h"
00029 #include "bundling/BundleList.h"
00030 #include "bundling/BundleProtocol.h"
00031 
00032 namespace dtn {
00033 
00034 struct UDPConvergenceLayer::Params UDPConvergenceLayer::defaults_;
00035 
00036 //----------------------------------------------------------------------
00037 void
00038 UDPConvergenceLayer::Params::serialize(oasys::SerializeAction *a)
00039 {
00040     oasys::Intoa local_addr(local_addr_);
00041     const char *local_addr_str = local_addr.buf();
00042 
00043     oasys::Intoa remote_addr(remote_addr_);
00044     const char *remote_addr_str = remote_addr.buf();
00045 
00046     a->process("local_addr",
00047         (u_char *) local_addr_str, strlen(local_addr_str));
00048     a->process("remote_addr",
00049         (u_char *) remote_addr_str, strlen(remote_addr_str));
00050     a->process("local_port", &local_port_);
00051     a->process("remote_port", &remote_port_);
00052     a->process("rate", &rate_);
00053     a->process("bucket_depth", &bucket_depth_);
00054 }
00055 
00056 //----------------------------------------------------------------------
00057 UDPConvergenceLayer::UDPConvergenceLayer()
00058     : IPConvergenceLayer("UDPConvergenceLayer", "udp")
00059 {
00060     defaults_.local_addr_               = INADDR_ANY;
00061     defaults_.local_port_               = UDPCL_DEFAULT_PORT;
00062     defaults_.remote_addr_              = INADDR_NONE;
00063     defaults_.remote_port_              = 0;
00064     defaults_.rate_                     = 0; // unlimited
00065     defaults_.bucket_depth_             = 0; // default
00066 }
00067 
00068 //----------------------------------------------------------------------
00069 bool
00070 UDPConvergenceLayer::parse_params(Params* params,
00071                                   int argc, const char** argv,
00072                                   const char** invalidp)
00073 {
00074     oasys::OptParser p;
00075 
00076     p.addopt(new oasys::InAddrOpt("local_addr", &params->local_addr_));
00077     p.addopt(new oasys::UInt16Opt("local_port", &params->local_port_));
00078     p.addopt(new oasys::InAddrOpt("remote_addr", &params->remote_addr_));
00079     p.addopt(new oasys::UInt16Opt("remote_port", &params->remote_port_));
00080     p.addopt(new oasys::UIntOpt("rate", &params->rate_));
00081     p.addopt(new oasys::UIntOpt("bucket_depth_", &params->bucket_depth_));
00082 
00083     if (! p.parse(argc, argv, invalidp)) {
00084         return false;
00085     }
00086 
00087     return true;
00088 };
00089 
00090 //----------------------------------------------------------------------
00091 bool
00092 UDPConvergenceLayer::interface_up(Interface* iface,
00093                                   int argc, const char* argv[])
00094 {
00095     log_debug("adding interface %s", iface->name().c_str());
00096     
00097     // parse options (including overrides for the local_addr and
00098     // local_port settings from the defaults)
00099     Params params = UDPConvergenceLayer::defaults_;
00100     const char* invalid;
00101     if (!parse_params(&params, argc, argv, &invalid)) {
00102         log_err("error parsing interface options: invalid option '%s'",
00103                 invalid);
00104         return false;
00105     }
00106 
00107     // check that the local interface / port are valid
00108     if (params.local_addr_ == INADDR_NONE) {
00109         log_err("invalid local address setting of 0");
00110         return false;
00111     }
00112 
00113     if (params.local_port_ == 0) {
00114         log_err("invalid local port setting of 0");
00115         return false;
00116     }
00117     
00118     // create a new server socket for the requested interface
00119     Receiver* receiver = new Receiver(&params);
00120     receiver->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
00121     
00122     if (receiver->bind(params.local_addr_, params.local_port_) != 0) {
00123         return false; // error log already emitted
00124     }
00125     
00126     // check if the user specified a remote addr/port to connect to
00127     if (params.remote_addr_ != INADDR_NONE) {
00128         if (receiver->connect(params.remote_addr_, params.remote_port_) != 0) {
00129             return false; // error log already emitted
00130         }
00131     }
00132     
00133     // start the thread which automatically listens for data
00134     receiver->start();
00135     
00136     // store the new listener object in the cl specific portion of the
00137     // interface
00138     iface->set_cl_info(receiver);
00139     
00140     return true;
00141 }
00142 
00143 //----------------------------------------------------------------------
00144 bool
00145 UDPConvergenceLayer::interface_down(Interface* iface)
00146 {
00147     // grab the listener object, set a flag for the thread to stop and
00148     // then close the socket out from under it, which should cause the
00149     // thread to break out of the blocking call to accept() and
00150     // terminate itself
00151     Receiver* receiver = (Receiver*)iface->cl_info();
00152     receiver->set_should_stop();
00153     receiver->interrupt_from_io();
00154     
00155     while (! receiver->is_stopped()) {
00156         oasys::Thread::yield();
00157     }
00158 
00159     delete receiver;
00160     return true;
00161 }
00162 
00163 //----------------------------------------------------------------------
00164 void
00165 UDPConvergenceLayer::dump_interface(Interface* iface,
00166                                     oasys::StringBuffer* buf)
00167 {
00168     Params* params = &((Receiver*)iface->cl_info())->params_;
00169     
00170     buf->appendf("\tlocal_addr: %s local_port: %d\n",
00171                  intoa(params->local_addr_), params->local_port_);
00172     
00173     if (params->remote_addr_ != INADDR_NONE) {
00174         buf->appendf("\tconnected remote_addr: %s remote_port: %d\n",
00175                      intoa(params->remote_addr_), params->remote_port_);
00176     } else {
00177         buf->appendf("\tnot connected\n");
00178     }
00179 }
00180 
00181 //----------------------------------------------------------------------
00182 bool
00183 UDPConvergenceLayer::init_link(Link* link, int argc, const char* argv[])
00184 {
00185     in_addr_t addr;
00186     u_int16_t port = 0;
00187     
00188     log_debug("adding %s link %s", link->type_str(), link->nexthop());
00189 
00190     // Parse the nexthop address but don't bail if the parsing fails,
00191     // since the remote host may not be resolvable at initialization
00192     // time and we retry in open_contact
00193     parse_nexthop(link->nexthop(), &addr, &port);
00194 
00195     // Create a new parameters structure, parse the options, and store
00196     // them in the link's cl info slot
00197     Params* params = new Params(defaults_);
00198     params->local_addr_ = INADDR_NONE;
00199     params->local_port_ = 0;
00200 
00201     const char* invalid;
00202     if (! parse_params(params, argc, argv, &invalid)) {
00203         log_err("error parsing link options: invalid option '%s'", invalid);
00204         delete params;
00205         return false;
00206     }
00207 
00208     if (link->params().mtu_ > MAX_BUNDLE_LEN) {
00209         log_err("error parsing link options: mtu %d > maximum %d",
00210                 link->params().mtu_, MAX_BUNDLE_LEN);
00211         delete params;
00212         return false;
00213     }
00214 
00215     link->set_cl_info(params);
00216     return true;
00217 }
00218 
00219 //----------------------------------------------------------------------
00220 void
00221 UDPConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00222 {
00223     Params* params = (Params*)link->cl_info();
00224     
00225     buf->appendf("\tlocal_addr: %s local_port: %d\n",
00226                  intoa(params->local_addr_), params->local_port_);
00227 
00228     buf->appendf("\tremote_addr: %s remote_port: %d\n",
00229                  intoa(params->remote_addr_), params->remote_port_);
00230 }
00231 
00232 //----------------------------------------------------------------------
00233 bool
00234 UDPConvergenceLayer::open_contact(const ContactRef& contact)
00235 {
00236     in_addr_t addr;
00237     u_int16_t port;
00238 
00239     Link* link = contact->link();
00240     log_debug("opening contact for link *%p", link);
00241     
00242     // parse out the address / port from the nexthop address
00243     if (! parse_nexthop(link->nexthop(), &addr, &port)) {
00244         log_err("invalid next hop address '%s'", link->nexthop());
00245         return false;
00246     }
00247 
00248     // make sure it's really a valid address
00249     if (addr == INADDR_ANY || addr == INADDR_NONE) {
00250         log_err("can't lookup hostname in next hop address '%s'",
00251                 link->nexthop());
00252         return false;
00253     }
00254 
00255     // make sure the port was specified
00256     if (port == 0) {
00257         log_err("port not specified in next hop address '%s'",
00258                 link->nexthop());
00259         return false;
00260     }
00261 
00262     Params* params = (Params*)link->cl_info();
00263     
00264     // create a new sender structure
00265     Sender* sender = new Sender(link->contact());
00266     
00267     if (!sender->init(params, addr, port)) {
00268         log_err("error initializing contact");
00269         BundleDaemon::post(
00270             new LinkStateChangeRequest(link, Link::UNAVAILABLE,
00271                                        ContactEvent::NO_INFO));
00272         delete sender;
00273         return false;
00274     }
00275         
00276     contact->set_cl_info(sender);
00277     BundleDaemon::post(new ContactUpEvent(link->contact()));
00278     
00279     return true;
00280 }
00281 
00282 //----------------------------------------------------------------------
00283 bool
00284 UDPConvergenceLayer::close_contact(const ContactRef& contact)
00285 {
00286     Sender* sender = (Sender*)contact->cl_info();
00287     
00288     log_info("close_contact *%p", contact.object());
00289 
00290     if (sender) {
00291         delete sender;
00292         contact->set_cl_info(NULL);
00293     }
00294     
00295     return true;
00296 }
00297 
00298 //----------------------------------------------------------------------
00299 void
00300 UDPConvergenceLayer::send_bundle(const ContactRef& contact, Bundle* bundle)
00301 {
00302     Sender* sender = (Sender*)contact->cl_info();
00303     if (!sender) {
00304         log_crit("send_bundles called on contact *%p with no Sender!!",
00305                  contact.object());
00306         return;
00307     }
00308     ASSERT(contact == sender->contact_);
00309 
00310     int len = sender->send_bundle(bundle); // consumes bundle reference
00311 
00312     if (len > 0) {
00313         BundleDaemon::post(
00314             new BundleTransmittedEvent(bundle, contact,
00315                                        contact->link(), len, 0));
00316     } else {
00317         BundleDaemon::post(
00318             new BundleTransmitFailedEvent(bundle, contact,contact->link()));
00319     }
00320 }
00321 
00322 //----------------------------------------------------------------------
00323 UDPConvergenceLayer::Receiver::Receiver(UDPConvergenceLayer::Params* params)
00324     : IOHandlerBase(new oasys::Notifier("/dtn/cl/udp/receiver")),
00325       UDPClient("/dtn/cl/udp/receiver"),
00326       Thread("UDPConvergenceLayer::Receiver")
00327 {
00328     logfd_  = false;
00329     params_ = *params;
00330 }
00331 
00332 //----------------------------------------------------------------------
00333 void
00334 UDPConvergenceLayer::Receiver::process_data(u_char* bp, size_t len)
00335 {
00336     // the payload should contain a full bundle
00337     Bundle* bundle = new Bundle();
00338     
00339     bool complete = false;
00340     int cc = BundleProtocol::consume(bundle, bp, len, &complete);
00341 
00342     if (cc < 0) {
00343         log_err("process_data: bundle protocol error");
00344         delete bundle;
00345         return;
00346     }
00347 
00348     if (!complete) {
00349         log_err("process_data: incomplete bundle");
00350         delete bundle;
00351         return;
00352     }
00353     
00354     log_debug("process_data: new bundle id %d arrival, length %zu (payload %zu)",
00355               bundle->bundleid_, len, bundle->payload_.length());
00356     
00357     BundleDaemon::post(
00358         new BundleReceivedEvent(bundle, EVENTSRC_PEER, len));
00359 }
00360 
00361 //----------------------------------------------------------------------
00362 void
00363 UDPConvergenceLayer::Receiver::run()
00364 {
00365     int ret;
00366     in_addr_t addr;
00367     u_int16_t port;
00368     u_char buf[MAX_UDP_PACKET];
00369 
00370     while (1) {
00371         if (should_stop())
00372             break;
00373         
00374         ret = recvfrom((char*)buf, MAX_UDP_PACKET, 0, &addr, &port);
00375         if (ret <= 0) {   
00376             if (errno == EINTR) {
00377                 continue;
00378             }
00379             log_err("error in recvfrom(): %d %s",
00380                     errno, strerror(errno));
00381             close();
00382             break;
00383         }
00384         
00385         log_debug("got %d byte packet from %s:%d",
00386                   ret, intoa(addr), port);               
00387         process_data(buf, ret);
00388     }
00389 }
00390 
00391 //----------------------------------------------------------------------
00392 UDPConvergenceLayer::Sender::Sender(const ContactRef& contact)
00393     : Logger("UDPConvergenceLayer::Sender",
00394              "/dtn/cl/udp/sender/%p", this),
00395       socket_(logpath_),
00396       rate_socket_(logpath_, 0, 0),
00397       contact_(contact.object(), "UDPCovergenceLayer::Sender")
00398 {
00399 }
00400 
00401 //----------------------------------------------------------------------
00402 bool
00403 UDPConvergenceLayer::Sender::init(Params* params,
00404                                   in_addr_t addr, u_int16_t port)
00405     
00406 {
00407     log_debug("initializing sender");
00408 
00409     params_ = params;
00410     
00411     socket_.logpathf("%s/conn/%s:%d", logpath_, intoa(addr), port);
00412     socket_.set_logfd(false);
00413 
00414     if (params->local_addr_ != INADDR_NONE || params->local_port_ != 0)
00415     {
00416         if (socket_.bind(params->local_addr_, params->local_port_) != 0) {
00417             log_err("error binding to %s:%d: %s",
00418                     intoa(params->local_addr_), params->local_port_,
00419                     strerror(errno));
00420             return false;
00421         }
00422     }
00423     
00424     if (socket_.connect(addr, port) != 0) {
00425         log_err("error issuing udp connect to %s:%d: %s",
00426                 intoa(addr), port, strerror(errno));
00427         return false;
00428     }
00429 
00430     if (params->rate_ != 0) {
00431         rate_socket_.bucket()->set_rate(params->rate_);
00432 
00433         if (params->bucket_depth_ != 0) {
00434             rate_socket_.bucket()->set_depth(params->bucket_depth_);
00435         }
00436         
00437         log_debug("initialized rate controller: rate %u depth %u",
00438                   rate_socket_.bucket()->rate(),
00439                   rate_socket_.bucket()->depth());
00440     }
00441 
00442     return true;
00443 }
00444     
00445 //----------------------------------------------------------------------
00446 int
00447 UDPConvergenceLayer::Sender::send_bundle(Bundle* bundle)
00448 {
00449     BlockInfoVec* blocks = bundle->xmit_blocks_.find_blocks(contact_->link());
00450     ASSERT(blocks != NULL);
00451 
00452     bool complete = false;
00453     size_t total_len = BundleProtocol::produce(bundle, blocks,
00454                                                buf_, 0, sizeof(buf_),
00455                                                &complete);
00456     if (!complete) {
00457         size_t formatted_len = BundleProtocol::total_length(blocks);
00458         log_err("send_bundle: bundle too big (%zu > %u)",
00459                 formatted_len, UDPConvergenceLayer::MAX_BUNDLE_LEN);
00460         return -1;
00461     }
00462         
00463     // write it out the socket and make sure we wrote it all
00464     int cc = socket_.write((char*)buf_, total_len);
00465     if (cc == (int)total_len) {
00466         log_info("send_bundle: successfully sent bundle length %d", cc);
00467         return total_len;
00468     } else {
00469         log_err("send_bundle: error sending bundle (wrote %d/%zu): %s",
00470                 cc, total_len, strerror(errno));
00471         return -1;
00472     }
00473 }
00474 
00475 } // namespace dtn

Generated on Thu Jun 7 12:54:30 2007 for DTN Reference Implementation by  doxygen 1.5.1