TCPConvergenceLayer.cc

Go to the documentation of this file.
00001 /*
00002  * IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. By
00003  * downloading, copying, installing or using the software you agree to
00004  * this license. If you do not agree to this license, do not download,
00005  * install, copy or use the software.
00006  * 
00007  * Intel Open Source License 
00008  * 
00009  * Copyright (c) 2004 Intel Corporation. All rights reserved. 
00010  * 
00011  * Redistribution and use in source and binary forms, with or without
00012  * modification, are permitted provided that the following conditions are
00013  * met:
00014  * 
00015  *   Redistributions of source code must retain the above copyright
00016  *   notice, this list of conditions and the following disclaimer.
00017  * 
00018  *   Redistributions in binary form must reproduce the above copyright
00019  *   notice, this list of conditions and the following disclaimer in the
00020  *   documentation and/or other materials provided with the distribution.
00021  * 
00022  *   Neither the name of the Intel Corporation nor the names of its
00023  *   contributors may be used to endorse or promote products derived from
00024  *   this software without specific prior written permission.
00025  *  
00026  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00027  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00028  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
00029  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE INTEL OR
00030  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00031  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00032  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00033  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00034  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00035  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00036  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00037  */
00038 
00039 #include <sys/poll.h>
00040 #include <stdlib.h>
00041 
00042 #include <oasys/io/NetUtils.h>
00043 #include <oasys/util/OptParser.h>
00044 
00045 #include "TCPConvergenceLayer.h"
00046 #include "IPConvergenceLayerUtils.h"
00047 
00048 namespace dtn {
00049 
00050 TCPConvergenceLayer::TCPLinkParams
00051     TCPConvergenceLayer::default_link_params_(true);
00052 
00053 //----------------------------------------------------------------------
00054 TCPConvergenceLayer::TCPLinkParams::TCPLinkParams(bool init_defaults)
00055     : StreamLinkParams(init_defaults),
00056       local_addr_(INADDR_ANY),
00057       remote_addr_(INADDR_NONE),
00058       remote_port_(TCPCL_DEFAULT_PORT)
00059 {
00060 }
00061 
00062 //----------------------------------------------------------------------
00063 TCPConvergenceLayer::TCPConvergenceLayer()
00064     : StreamConvergenceLayer("TCPConvergenceLayer", "tcp", TCPCL_VERSION)
00065 {
00066 }
00067 
00068 //----------------------------------------------------------------------
00069 ConnectionConvergenceLayer::LinkParams*
00070 TCPConvergenceLayer::new_link_params()
00071 {
00072     return new TCPLinkParams(default_link_params_);
00073 }
00074 
00075 //----------------------------------------------------------------------
00076 bool
00077 TCPConvergenceLayer::parse_link_params(LinkParams* lparams,
00078                                        int argc, const char** argv,
00079                                        const char** invalidp)
00080 {
00081     TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(lparams);
00082     ASSERT(params != NULL);
00083 
00084     oasys::OptParser p;
00085     
00086     p.addopt(new oasys::InAddrOpt("local_addr", &params->local_addr_));
00087     
00088     int count = p.parse_and_shift(argc, argv, invalidp);
00089     if (count == -1) {
00090         return false; // bogus value
00091     }
00092     argc -= count;
00093     
00094     if (params->local_addr_ == INADDR_NONE) {
00095         log_err("invalid local address setting of INADDR_NONE");
00096         return false;
00097     }
00098     
00099     // continue up to parse the parent class
00100     return StreamConvergenceLayer::parse_link_params(lparams, argc, argv,
00101                                                      invalidp);
00102 }
00103 
00104 //----------------------------------------------------------------------
00105 void
00106 TCPConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00107 {
00108     StreamConvergenceLayer::dump_link(link, buf);
00109     
00110     TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(link->cl_info());
00111     ASSERT(params != NULL);
00112     
00113     buf->appendf("local_addr: %s\n", intoa(params->local_addr_));
00114     buf->appendf("remote_addr: %s\n", intoa(params->remote_addr_));
00115     buf->appendf("remote_port: %d\n", params->remote_port_);
00116 }
00117 
00118 //----------------------------------------------------------------------
00119 bool
00120 TCPConvergenceLayer::set_link_defaults(int argc, const char* argv[],
00121                                        const char** invalidp)
00122 {
00123     return parse_link_params(&default_link_params_, argc, argv, invalidp);
00124 }
00125 
00126 //----------------------------------------------------------------------
00127 bool
00128 TCPConvergenceLayer::parse_nexthop(Link* link, LinkParams* lparams)
00129 {
00130     TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(lparams);
00131     ASSERT(params != NULL);
00132 
00133     if (! IPConvergenceLayerUtils::parse_nexthop(link->nexthop(),
00134                                                  &params->remote_addr_,
00135                                                  &params->remote_port_)) {
00136         log_err("invalid next hop address '%s'", link->nexthop());
00137         return false;
00138     }
00139     
00140     if (params->remote_addr_ == INADDR_ANY ||
00141         params->remote_addr_ == INADDR_NONE)
00142     {
00143         log_err("invalid host in next hop address '%s'", link->nexthop());
00144         return false;
00145     }
00146     
00147     // make sure the port was specified
00148     if (params->remote_port_ == 0) {
00149         log_err("port not specified in next hop address '%s'",
00150                 link->nexthop());
00151         return false;
00152     }
00153     
00154     return true;
00155 }
00156 
00157 //----------------------------------------------------------------------
00158 CLConnection*
00159 TCPConvergenceLayer::new_connection(LinkParams* p)
00160 {
00161     TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(p);
00162     ASSERT(params != NULL);
00163     return new Connection(this, params);
00164 }
00165 
00166 //----------------------------------------------------------------------
00167 bool
00168 TCPConvergenceLayer::interface_up(Interface* iface,
00169                                   int argc, const char* argv[])
00170 {
00171     log_debug("adding interface %s", iface->name().c_str());
00172     in_addr_t local_addr = INADDR_ANY;
00173     u_int16_t local_port = TCPCL_DEFAULT_PORT;
00174 
00175     oasys::OptParser p;
00176     p.addopt(new oasys::InAddrOpt("local_addr", &local_addr));
00177     p.addopt(new oasys::UInt16Opt("local_port", &local_port));
00178 
00179     const char* invalid = NULL;
00180     if (! p.parse(argc, argv, &invalid)) {
00181         log_err("error parsing interface options: invalid option '%s'",
00182                 invalid);
00183         return false;
00184     }
00185     
00186     // check that the local interface / port are valid
00187     if (local_addr == INADDR_NONE) {
00188         log_err("invalid local address setting of INADDR_NONE");
00189         return false;
00190     }
00191 
00192     if (local_port == 0) {
00193         log_err("invalid local port setting of 0");
00194         return false;
00195     }
00196 
00197     // create a new server socket for the requested interface
00198     Listener* listener = new Listener(this);
00199     listener->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
00200     
00201     int ret = listener->bind(local_addr, local_port);
00202 
00203     // be a little forgiving -- if the address is in use, wait for a
00204     // bit and try again
00205     if (ret != 0 && errno == EADDRINUSE) {
00206         listener->logf(oasys::LOG_WARN,
00207                        "WARNING: error binding to requested socket: %s",
00208                        strerror(errno));
00209         listener->logf(oasys::LOG_WARN,
00210                        "waiting for 10 seconds then trying again");
00211         sleep(10);
00212         
00213         ret = listener->bind(local_addr, local_port);
00214     }
00215 
00216     if (ret != 0) {
00217         return false; // error already logged
00218     }
00219 
00220     // start listening and then start the thread to loop calling accept()
00221     listener->listen();
00222     listener->start();
00223 
00224     // store the new listener object in the cl specific portion of the
00225     // interface
00226     iface->set_cl_info(listener);
00227     
00228     return true;
00229 }
00230 
00231 //----------------------------------------------------------------------
00232 bool
00233 TCPConvergenceLayer::interface_down(Interface* iface)
00234 {
00235     // grab the listener object, set a flag for the thread to stop and
00236     // then close the socket out from under it, which should cause the
00237     // thread to break out of the blocking call to accept() and
00238     // terminate itself
00239     Listener* listener = dynamic_cast<Listener*>(iface->cl_info());
00240     ASSERT(listener != NULL);
00241     
00242     listener->set_should_stop();
00243     
00244     listener->interrupt_from_io();
00245     
00246     while (! listener->is_stopped()) {
00247         oasys::Thread::yield();
00248     }
00249 
00250     delete listener;
00251     return true;
00252 }
00253 
00254 //----------------------------------------------------------------------
00255 void
00256 TCPConvergenceLayer::dump_interface(Interface* iface,
00257                                     oasys::StringBuffer* buf)
00258 {
00259     Listener* listener = dynamic_cast<Listener*>(iface->cl_info());
00260     ASSERT(listener != NULL);
00261     
00262     buf->appendf("\tlocal_addr: %s local_port: %d\n",
00263                  intoa(listener->local_addr()), listener->local_port());
00264 }
00265 
00266 //----------------------------------------------------------------------
00267 TCPConvergenceLayer::Listener::Listener(TCPConvergenceLayer* cl)
00268     : IOHandlerBase(new oasys::Notifier("/dtn/cl/tcp/listener")), 
00269       TCPServerThread("TCPConvergenceLayer::Listener",
00270                       "/dtn/cl/tcp/listener"),
00271       cl_(cl)
00272 {
00273     logfd_  = false;
00274 }
00275 
00276 //----------------------------------------------------------------------
00277 void
00278 TCPConvergenceLayer::Listener::accepted(int fd, in_addr_t addr, u_int16_t port)
00279 {
00280     log_debug("new connection from %s:%d", intoa(addr), port);
00281     
00282     Connection* conn =
00283         new Connection(cl_, &TCPConvergenceLayer::default_link_params_,
00284                        fd, addr, port);
00285     conn->start();
00286 }
00287 
00288 //----------------------------------------------------------------------
00289 TCPConvergenceLayer::Connection::Connection(TCPConvergenceLayer* cl,
00290                                             TCPLinkParams* params)
00291     : StreamConvergenceLayer::Connection("TCPConvergenceLayer::Connection",
00292                                          cl->logpath(), cl, params,
00293                                          true /* call connect() */)
00294 {
00295     logpathf("%s/conn/%p", cl->logpath(), this);
00296 
00297     // set up the base class' nexthop parameter
00298     oasys::StringBuffer nexthop("%s:%d",
00299                                 intoa(params->remote_addr_),
00300                                 params->remote_port_);
00301     set_nexthop(nexthop.c_str());
00302     
00303     // the actual socket
00304     sock_ = new oasys::TCPClient(logpath_);
00305 
00306     // XXX/demmer the basic socket logging emits errors and the like
00307     // when connections break. that may not be great since we kinda
00308     // expect them to happen... so either we should add some flag as
00309     // to the severity of error messages that can be passed into the
00310     // IO routines, or just suppress the IO output altogether
00311     sock_->logpathf("%s/sock", logpath_);
00312     sock_->set_logfd(false);
00313 
00314     // cache the remote addr and port in the fields in the socket
00315     // since we want to actually connect to the peer side from the
00316     // Connection thread, not from the caller thread
00317     sock_->set_remote_addr(params->remote_addr_);
00318     sock_->set_remote_port(params->remote_port_);
00319 
00320     sock_->init_socket();
00321     sock_->set_nonblocking(true);
00322 
00323     // if the parameters specify a local address, do the bind here --
00324     // however if it fails, we can't really do anything about it, so
00325     // just log and go on
00326     if (params->local_addr_ != INADDR_ANY)
00327     {
00328         if (sock_->bind(params->local_addr_, 0) != 0) {
00329             log_err("error binding to %s: %s",
00330                     intoa(params->local_addr_),
00331                     strerror(errno));
00332         }
00333     }
00334 }
00335 
00336 //----------------------------------------------------------------------
00337 TCPConvergenceLayer::Connection::Connection(TCPConvergenceLayer* cl,
00338                                             TCPLinkParams* params,
00339                                             int fd,
00340                                             in_addr_t remote_addr,
00341                                             u_int16_t remote_port)
00342     : StreamConvergenceLayer::Connection("TCPConvergenceLayer::Connection",
00343                                          cl->logpath(), cl, params,
00344                                          false /* call accept() */)
00345 {
00346     logpathf("%s/conn/%p", cl->logpath(), this);
00347     
00348     // set up the base class' nexthop parameter
00349     oasys::StringBuffer nexthop("%s:%d", intoa(remote_addr), remote_port);
00350     set_nexthop(nexthop.c_str());
00351     
00352     sock_ = new oasys::TCPClient(fd, remote_addr, remote_port, logpath_);
00353     sock_->set_logfd(false);
00354     sock_->set_nonblocking(true);
00355 }
00356 
00357 //----------------------------------------------------------------------
00358 TCPConvergenceLayer::Connection::~Connection()
00359 {
00360     delete sock_;
00361 }
00362 
00363 //----------------------------------------------------------------------
00364 void
00365 TCPConvergenceLayer::Connection::initialize_pollfds()
00366 {
00367     sock_pollfd_ = &pollfds_[0];
00368     num_pollfds_ = 1;
00369     
00370     sock_pollfd_->fd     = sock_->fd();
00371     sock_pollfd_->events = POLLIN;
00372     
00373     TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(params_);
00374     ASSERT(params != NULL);
00375     
00376     poll_timeout_ = params->data_timeout_;
00377     
00378     if (params->keepalive_interval_ != 0 &&
00379         (params->keepalive_interval_ * 1000) < params->data_timeout_)
00380     {
00381         poll_timeout_ = params->keepalive_interval_ * 1000;
00382     }
00383 }
00384 
00385 //----------------------------------------------------------------------
00386 void
00387 TCPConvergenceLayer::Connection::connect()
00388 {
00389     // start a connection to the other side... in most cases, this
00390     // returns EINPROGRESS, in which case we wait for a call to
00391     // handle_poll_activity
00392     log_debug("connect: connecting to %s:%d...",
00393               intoa(sock_->remote_addr()), sock_->remote_port());
00394     ASSERT(contact_ != NULL);
00395     ASSERT(contact_->link()->isopening());
00396     ASSERT(sock_->state() != oasys::IPSocket::ESTABLISHED);
00397     int ret = sock_->connect(sock_->remote_addr(), sock_->remote_port());
00398 
00399     if (ret == 0) {
00400         log_debug("connect: succeeded immediately");
00401         ASSERT(sock_->state() == oasys::IPSocket::ESTABLISHED);
00402 
00403         initiate_contact();
00404         
00405     } else if (ret == -1 && errno == EINPROGRESS) {
00406         log_debug("connect: EINPROGRESS returned, waiting for write ready");
00407         sock_pollfd_->events |= POLLOUT;
00408 
00409     } else {
00410         log_info("connection attempt to %s:%d failed... %s",
00411                  intoa(sock_->remote_addr()), sock_->remote_port(),
00412                  strerror(errno));
00413         break_contact(ContactEvent::BROKEN);
00414     }
00415 }
00416 
00417 //----------------------------------------------------------------------
00418 void
00419 TCPConvergenceLayer::Connection::accept()
00420 {
00421     ASSERT(sock_->state() == oasys::IPSocket::ESTABLISHED);
00422     
00423     log_debug("accept: got connection from %s:%d...",
00424               intoa(sock_->remote_addr()), sock_->remote_port());
00425     initiate_contact();
00426 }
00427 
00428 //----------------------------------------------------------------------
00429 void
00430 TCPConvergenceLayer::Connection::disconnect()
00431 {
00432     if (sock_->state() != oasys::IPSocket::CLOSED) {
00433         // we can only send a shutdown byte if we're not in the middle
00434         // of sending a segment, otherwise the shutdown byte could be
00435         // interpreted as a part of the payload
00436         if (send_segment_todo_ == 0) {
00437             log_debug("disconnect: trying to send shutdown byte");
00438             char typecode = SHUTDOWN;
00439             sock_->write(&typecode, 1);
00440         }
00441         
00442         sock_->close();
00443     }
00444 }
00445 
00446 //----------------------------------------------------------------------
00447 void
00448 TCPConvergenceLayer::Connection::handle_poll_activity()
00449 {
00450     if (sock_pollfd_->revents & POLLHUP) {
00451         log_info("remote socket closed connection -- returned POLLHUP");
00452         break_contact(ContactEvent::BROKEN);
00453         return;
00454     }
00455     
00456     if (sock_pollfd_->revents & POLLERR) {
00457         log_info("error condition on remote socket -- returned POLLERR");
00458         break_contact(ContactEvent::BROKEN);
00459         return;
00460     }
00461     
00462     // first check for write readiness, meaning either we're getting a
00463     // notification that the deferred connect() call completed, or
00464     // that we are no longer write blocked
00465     if (sock_pollfd_->revents & POLLOUT)
00466     {
00467         log_debug("poll returned write ready, clearing POLLOUT bit");
00468         sock_pollfd_->events &= ~POLLOUT;
00469             
00470         if (sock_->state() == oasys::IPSocket::CONNECTING) {
00471             int result = sock_->async_connect_result();
00472             if (result == 0) {
00473                 log_debug("delayed_connect to %s:%d succeeded",
00474                           intoa(sock_->remote_addr()), sock_->remote_port());
00475                 initiate_contact();
00476                 
00477             } else {
00478                 log_info("connection attempt to %s:%d failed... %s",
00479                          intoa(sock_->remote_addr()), sock_->remote_port(),
00480                          strerror(errno));
00481                 break_contact(ContactEvent::BROKEN);
00482             }
00483 
00484             return;
00485         }
00486         
00487         send_data();
00488     }
00489 
00490     // finally, check for incoming data
00491     if (sock_pollfd_->revents & POLLIN) {
00492         recv_data();
00493         process_data();
00494     }
00495 }
00496 
00497 //----------------------------------------------------------------------
00498 void
00499 TCPConvergenceLayer::Connection::send_data()
00500 {
00501     // XXX/demmer this assertion is mostly for debugging to catch call
00502     // chains where the contact is broken but we're still using the
00503     // socket
00504     ASSERT(! contact_broken_);
00505     
00506     if (params_->test_write_delay_ != 0) {
00507         log_debug("send_data: sleeping for test_write_delay msecs %u",
00508                   params_->test_write_delay_);
00509         
00510         usleep(params_->test_write_delay_ * 1000);
00511     }
00512             
00513     log_debug("send_data: trying to drain %zu bytes from send buffer...",
00514               sendbuf_.fullbytes());
00515     ASSERT(sendbuf_.fullbytes() > 0);
00516     int cc = sock_->write(sendbuf_.start(), sendbuf_.fullbytes());
00517     if (cc > 0) {
00518         log_debug("send_data: wrote %d/%zu bytes from send buffer",
00519                   cc, sendbuf_.fullbytes());
00520         sendbuf_.consume(cc);
00521         
00522         if (sendbuf_.fullbytes() != 0) {
00523             log_debug("send_data: incomplete write, setting POLLOUT bit");
00524             sock_pollfd_->events |= POLLOUT;
00525 
00526         } else {
00527             if (sock_pollfd_->events & POLLOUT) {
00528                 log_debug("send_data: drained buffer, clearing POLLOUT bit");
00529                 sock_pollfd_->events &= ~POLLOUT;
00530             }
00531         }
00532     } else if (errno == EWOULDBLOCK) {
00533         log_debug("send_data: write returned EWOULDBLOCK, setting POLLOUT bit");
00534         sock_pollfd_->events |= POLLOUT;
00535         
00536     } else {
00537         log_info("send_data: remote connection unexpectedly closed: %s",
00538                  strerror(errno));
00539         break_contact(ContactEvent::BROKEN);
00540     }
00541 }
00542 
00543 //----------------------------------------------------------------------
00544 void
00545 TCPConvergenceLayer::Connection::recv_data()
00546 {
00547     // XXX/demmer this assertion is mostly for debugging to catch call
00548     // chains where the contact is broken but we're still using the
00549     // socket
00550     ASSERT(! contact_broken_);
00551     
00552     // this shouldn't ever happen
00553     if (recvbuf_.tailbytes() == 0) {
00554         log_err("no space in receive buffer to accept data!!!");
00555         return;
00556     }
00557     
00558     if (params_->test_read_delay_ != 0) {
00559         log_debug("recv_data: sleeping for test_read_delay msecs %u",
00560                   params_->test_read_delay_);
00561         
00562         usleep(params_->test_read_delay_ * 1000);
00563     }
00564             
00565     log_debug("recv_data: draining up to %zu bytes into recv buffer...",
00566               recvbuf_.tailbytes());
00567     int cc = sock_->read(recvbuf_.end(), recvbuf_.tailbytes());
00568     if (cc < 1) {
00569         log_info("remote connection unexpectedly closed");
00570         break_contact(ContactEvent::BROKEN);
00571         return;
00572     }
00573 
00574     log_debug("recv_data: read %d bytes, rcvbuf has %zu bytes",
00575               cc, recvbuf_.fullbytes());
00576     recvbuf_.fill(cc);
00577 }
00578 
00579 } // namespace dtn

Generated on Fri Dec 22 14:48:01 2006 for DTN Reference Implementation by  doxygen 1.5.1