TCPConvergenceLayer.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2004-2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 
00018 #include <sys/poll.h>
00019 #include <stdlib.h>
00020 
00021 #include <oasys/io/NetUtils.h>
00022 #include <oasys/util/OptParser.h>
00023 
00024 #include "TCPConvergenceLayer.h"
00025 #include "IPConvergenceLayerUtils.h"
00026 #include "bundling/BundleDaemon.h"
00027 #include "contacts/ContactManager.h"
00028 
00029 namespace dtn {
00030 
00031 TCPConvergenceLayer::TCPLinkParams
00032     TCPConvergenceLayer::default_link_params_(true);
00033 
00034 //----------------------------------------------------------------------
00035 TCPConvergenceLayer::TCPLinkParams::TCPLinkParams(bool init_defaults)
00036     : StreamLinkParams(init_defaults),
00037       local_addr_(INADDR_ANY),
00038       remote_addr_(INADDR_NONE),
00039       remote_port_(TCPCL_DEFAULT_PORT)
00040 {
00041 }
00042 
00043 //----------------------------------------------------------------------
00044 TCPConvergenceLayer::TCPConvergenceLayer()
00045     : StreamConvergenceLayer("TCPConvergenceLayer", "tcp", TCPCL_VERSION)
00046 {
00047 }
00048 
00049 //----------------------------------------------------------------------
00050 ConnectionConvergenceLayer::LinkParams*
00051 TCPConvergenceLayer::new_link_params()
00052 {
00053     return new TCPLinkParams(default_link_params_);
00054 }
00055 
00056 //----------------------------------------------------------------------
00057 bool
00058 TCPConvergenceLayer::parse_link_params(LinkParams* lparams,
00059                                        int argc, const char** argv,
00060                                        const char** invalidp)
00061 {
00062     TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(lparams);
00063     ASSERT(params != NULL);
00064 
00065     oasys::OptParser p;
00066     
00067     p.addopt(new oasys::InAddrOpt("local_addr", &params->local_addr_));
00068     
00069     int count = p.parse_and_shift(argc, argv, invalidp);
00070     if (count == -1) {
00071         return false; // bogus value
00072     }
00073     argc -= count;
00074     
00075     if (params->local_addr_ == INADDR_NONE) {
00076         log_err("invalid local address setting of INADDR_NONE");
00077         return false;
00078     }
00079     
00080     // continue up to parse the parent class
00081     return StreamConvergenceLayer::parse_link_params(lparams, argc, argv,
00082                                                      invalidp);
00083 }
00084 
00085 //----------------------------------------------------------------------
00086 void
00087 TCPConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00088 {
00089     StreamConvergenceLayer::dump_link(link, buf);
00090     
00091     TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(link->cl_info());
00092     ASSERT(params != NULL);
00093     
00094     buf->appendf("local_addr: %s\n", intoa(params->local_addr_));
00095     buf->appendf("remote_addr: %s\n", intoa(params->remote_addr_));
00096     buf->appendf("remote_port: %d\n", params->remote_port_);
00097 }
00098 
00099 //----------------------------------------------------------------------
00100 bool
00101 TCPConvergenceLayer::set_link_defaults(int argc, const char* argv[],
00102                                        const char** invalidp)
00103 {
00104     return parse_link_params(&default_link_params_, argc, argv, invalidp);
00105 }
00106 
00107 //----------------------------------------------------------------------
00108 bool
00109 TCPConvergenceLayer::parse_nexthop(Link* link, LinkParams* lparams)
00110 {
00111     TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(lparams);
00112     ASSERT(params != NULL);
00113 
00114     if (params->remote_addr_ == INADDR_NONE || params->remote_port_ == 0)
00115     {
00116         if (! IPConvergenceLayerUtils::parse_nexthop(logpath_, link->nexthop(),
00117                                                      &params->remote_addr_,
00118                                                      &params->remote_port_)) {
00119             return false;
00120         }
00121     }
00122     
00123     if (params->remote_addr_ == INADDR_ANY ||
00124         params->remote_addr_ == INADDR_NONE)
00125     {
00126         log_warn("can't lookup hostname in next hop address '%s'",
00127                  link->nexthop());
00128         return false;
00129     }
00130     
00131     // make sure the port was specified
00132     if (params->remote_port_ == 0) {
00133         log_err("port not specified in next hop address '%s'",
00134                 link->nexthop());
00135         return false;
00136     }
00137     
00138     return true;
00139 }
00140 
00141 //----------------------------------------------------------------------
00142 CLConnection*
00143 TCPConvergenceLayer::new_connection(LinkParams* p)
00144 {
00145     TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(p);
00146     ASSERT(params != NULL);
00147     return new Connection(this, params);
00148 }
00149 
00150 //----------------------------------------------------------------------
00151 bool
00152 TCPConvergenceLayer::interface_up(Interface* iface,
00153                                   int argc, const char* argv[])
00154 {
00155     log_debug("adding interface %s", iface->name().c_str());
00156     in_addr_t local_addr = INADDR_ANY;
00157     u_int16_t local_port = TCPCL_DEFAULT_PORT;
00158 
00159     oasys::OptParser p;
00160     p.addopt(new oasys::InAddrOpt("local_addr", &local_addr));
00161     p.addopt(new oasys::UInt16Opt("local_port", &local_port));
00162 
00163     const char* invalid = NULL;
00164     if (! p.parse(argc, argv, &invalid)) {
00165         log_err("error parsing interface options: invalid option '%s'",
00166                 invalid);
00167         return false;
00168     }
00169     
00170     // check that the local interface / port are valid
00171     if (local_addr == INADDR_NONE) {
00172         log_err("invalid local address setting of INADDR_NONE");
00173         return false;
00174     }
00175 
00176     if (local_port == 0) {
00177         log_err("invalid local port setting of 0");
00178         return false;
00179     }
00180 
00181     // create a new server socket for the requested interface
00182     Listener* listener = new Listener(this);
00183     listener->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
00184     
00185     int ret = listener->bind(local_addr, local_port);
00186 
00187     // be a little forgiving -- if the address is in use, wait for a
00188     // bit and try again
00189     if (ret != 0 && errno == EADDRINUSE) {
00190         listener->logf(oasys::LOG_WARN,
00191                        "WARNING: error binding to requested socket: %s",
00192                        strerror(errno));
00193         listener->logf(oasys::LOG_WARN,
00194                        "waiting for 10 seconds then trying again");
00195         sleep(10);
00196         
00197         ret = listener->bind(local_addr, local_port);
00198     }
00199 
00200     if (ret != 0) {
00201         return false; // error already logged
00202     }
00203 
00204     // start listening and then start the thread to loop calling accept()
00205     listener->listen();
00206     listener->start();
00207 
00208     // store the new listener object in the cl specific portion of the
00209     // interface
00210     iface->set_cl_info(listener);
00211     
00212     return true;
00213 }
00214 
00215 //----------------------------------------------------------------------
00216 bool
00217 TCPConvergenceLayer::interface_down(Interface* iface)
00218 {
00219     // grab the listener object, set a flag for the thread to stop and
00220     // then close the socket out from under it, which should cause the
00221     // thread to break out of the blocking call to accept() and
00222     // terminate itself
00223     Listener* listener = dynamic_cast<Listener*>(iface->cl_info());
00224     ASSERT(listener != NULL);
00225     
00226     listener->set_should_stop();
00227     
00228     listener->interrupt_from_io();
00229     
00230     while (! listener->is_stopped()) {
00231         oasys::Thread::yield();
00232     }
00233 
00234     delete listener;
00235     return true;
00236 }
00237 
00238 //----------------------------------------------------------------------
00239 void
00240 TCPConvergenceLayer::dump_interface(Interface* iface,
00241                                     oasys::StringBuffer* buf)
00242 {
00243     Listener* listener = dynamic_cast<Listener*>(iface->cl_info());
00244     ASSERT(listener != NULL);
00245     
00246     buf->appendf("\tlocal_addr: %s local_port: %d\n",
00247                  intoa(listener->local_addr()), listener->local_port());
00248 }
00249 
00250 //----------------------------------------------------------------------
00251 TCPConvergenceLayer::Listener::Listener(TCPConvergenceLayer* cl)
00252     : IOHandlerBase(new oasys::Notifier("/dtn/cl/tcp/listener")), 
00253       TCPServerThread("TCPConvergenceLayer::Listener",
00254                       "/dtn/cl/tcp/listener"),
00255       cl_(cl)
00256 {
00257     logfd_  = false;
00258 }
00259 
00260 //----------------------------------------------------------------------
00261 void
00262 TCPConvergenceLayer::Listener::accepted(int fd, in_addr_t addr, u_int16_t port)
00263 {
00264     log_debug("new connection from %s:%d", intoa(addr), port);
00265     
00266     Connection* conn =
00267         new Connection(cl_, &TCPConvergenceLayer::default_link_params_,
00268                        fd, addr, port);
00269     conn->start();
00270 }
00271 
00272 //----------------------------------------------------------------------
00273 TCPConvergenceLayer::Connection::Connection(TCPConvergenceLayer* cl,
00274                                             TCPLinkParams* params)
00275     : StreamConvergenceLayer::Connection("TCPConvergenceLayer::Connection",
00276                                          cl->logpath(), cl, params,
00277                                          true /* call connect() */)
00278 {
00279     logpathf("%s/conn/%p", cl->logpath(), this);
00280 
00281     // set up the base class' nexthop parameter
00282     oasys::StringBuffer nexthop("%s:%d",
00283                                 intoa(params->remote_addr_),
00284                                 params->remote_port_);
00285     set_nexthop(nexthop.c_str());
00286     
00287     // the actual socket
00288     sock_ = new oasys::TCPClient(logpath_);
00289 
00290     // XXX/demmer the basic socket logging emits errors and the like
00291     // when connections break. that may not be great since we kinda
00292     // expect them to happen... so either we should add some flag as
00293     // to the severity of error messages that can be passed into the
00294     // IO routines, or just suppress the IO output altogether
00295     sock_->logpathf("%s/sock", logpath_);
00296     sock_->set_logfd(false);
00297 
00298     sock_->init_socket();
00299     sock_->set_nonblocking(true);
00300 
00301     // if the parameters specify a local address, do the bind here --
00302     // however if it fails, we can't really do anything about it, so
00303     // just log and go on
00304     if (params->local_addr_ != INADDR_ANY)
00305     {
00306         if (sock_->bind(params->local_addr_, 0) != 0) {
00307             log_err("error binding to %s: %s",
00308                     intoa(params->local_addr_),
00309                     strerror(errno));
00310         }
00311     }
00312 }
00313 
00314 //----------------------------------------------------------------------
00315 TCPConvergenceLayer::Connection::Connection(TCPConvergenceLayer* cl,
00316                                             TCPLinkParams* params,
00317                                             int fd,
00318                                             in_addr_t remote_addr,
00319                                             u_int16_t remote_port)
00320     : StreamConvergenceLayer::Connection("TCPConvergenceLayer::Connection",
00321                                          cl->logpath(), cl, params,
00322                                          false /* call accept() */)
00323 {
00324     logpathf("%s/conn/%p", cl->logpath(), this);
00325     
00326     // set up the base class' nexthop parameter
00327     oasys::StringBuffer nexthop("%s:%d", intoa(remote_addr), remote_port);
00328     set_nexthop(nexthop.c_str());
00329     
00330     sock_ = new oasys::TCPClient(fd, remote_addr, remote_port, logpath_);
00331     sock_->set_logfd(false);
00332     sock_->set_nonblocking(true);
00333 }
00334 
00335 //----------------------------------------------------------------------
00336 TCPConvergenceLayer::Connection::~Connection()
00337 {
00338     delete sock_;
00339 }
00340 
00341 //----------------------------------------------------------------------
00342 void
00343 TCPConvergenceLayer::Connection::serialize(oasys::SerializeAction *a)
00344 {
00345     TCPLinkParams *params = tcp_lparams();
00346     if (! params) return;
00347 
00348     oasys::Intoa local_addr = oasys::Intoa(params->local_addr_);
00349     const char * local_addr_str = local_addr.buf();
00350     oasys::Intoa remote_addr = oasys::Intoa(params->remote_addr_);
00351     const char * remote_addr_str = remote_addr.buf();
00352 
00353     a->process("local_addr",
00354         (u_char *) local_addr_str, strlen(local_addr_str));
00355     a->process("remote_addr",
00356         (u_char *) remote_addr_str, strlen(remote_addr_str));
00357     a->process("remote_port", &params->remote_port_);
00358 
00359     // from StreamLinkParams
00360     a->process("segment_ack_enabled", &params->segment_ack_enabled_);
00361     a->process("negative_ack_enabled", &params->negative_ack_enabled_);
00362     a->process("keepalive_interval", &params->keepalive_interval_);
00363     a->process("segment_length", &params->segment_length_);
00364 
00365     // from LinkParams
00366     a->process("busy_queue_depth", &params->busy_queue_depth_);
00367     a->process("reactive_frag_enabled", &params->reactive_frag_enabled_);
00368     a->process("sendbuf_length", &params->sendbuf_len_);
00369     a->process("recvbuf_length", &params->recvbuf_len_);
00370     a->process("data_timeout", &params->data_timeout_);
00371 }
00372 
00373 //----------------------------------------------------------------------
00374 void
00375 TCPConvergenceLayer::Connection::initialize_pollfds()
00376 {
00377     sock_pollfd_ = &pollfds_[0];
00378     num_pollfds_ = 1;
00379     
00380     sock_pollfd_->fd     = sock_->fd();
00381     sock_pollfd_->events = POLLIN;
00382     
00383     TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(params_);
00384     ASSERT(params != NULL);
00385     
00386     poll_timeout_ = params->data_timeout_;
00387     
00388     if (params->keepalive_interval_ != 0 &&
00389         (params->keepalive_interval_ * 1000) < params->data_timeout_)
00390     {
00391         poll_timeout_ = params->keepalive_interval_ * 1000;
00392     }
00393 }
00394 
00395 //----------------------------------------------------------------------
00396 void
00397 TCPConvergenceLayer::Connection::connect()
00398 {
00399     // the first thing we do is try to parse the next hop address...
00400     // if we're unable to do so, the link can't be opened.
00401     if (! cl_->parse_nexthop(contact_->link(), params_)) {
00402         log_info("can't resolve nexthop address '%s'",
00403                  contact_->link()->nexthop());
00404         break_contact(ContactEvent::BROKEN);
00405         return;
00406     }
00407 
00408     // cache the remote addr and port in the fields in the socket
00409     TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(params_);
00410     ASSERT(params != NULL);
00411     sock_->set_remote_addr(params->remote_addr_);
00412     sock_->set_remote_port(params->remote_port_);
00413 
00414     // start a connection to the other side... in most cases, this
00415     // returns EINPROGRESS, in which case we wait for a call to
00416     // handle_poll_activity
00417     log_debug("connect: connecting to %s:%d...",
00418               intoa(sock_->remote_addr()), sock_->remote_port());
00419     ASSERT(contact_ == NULL || contact_->link()->isopening());
00420     ASSERT(sock_->state() != oasys::IPSocket::ESTABLISHED);
00421     int ret = sock_->connect(sock_->remote_addr(), sock_->remote_port());
00422 
00423     if (ret == 0) {
00424         log_debug("connect: succeeded immediately");
00425         ASSERT(sock_->state() == oasys::IPSocket::ESTABLISHED);
00426 
00427         initiate_contact();
00428         
00429     } else if (ret == -1 && errno == EINPROGRESS) {
00430         log_debug("connect: EINPROGRESS returned, waiting for write ready");
00431         sock_pollfd_->events |= POLLOUT;
00432 
00433     } else {
00434         log_info("connection attempt to %s:%d failed... %s",
00435                  intoa(sock_->remote_addr()), sock_->remote_port(),
00436                  strerror(errno));
00437         break_contact(ContactEvent::BROKEN);
00438     }
00439 }
00440 
00441 //----------------------------------------------------------------------
00442 void
00443 TCPConvergenceLayer::Connection::accept()
00444 {
00445     ASSERT(sock_->state() == oasys::IPSocket::ESTABLISHED);
00446     
00447     log_debug("accept: got connection from %s:%d...",
00448               intoa(sock_->remote_addr()), sock_->remote_port());
00449     initiate_contact();
00450 }
00451 
00452 //----------------------------------------------------------------------
00453 void
00454 TCPConvergenceLayer::Connection::disconnect()
00455 {
00456     if (sock_->state() != oasys::IPSocket::CLOSED) {
00457         sock_->close();
00458     }
00459 }
00460 
00461 //----------------------------------------------------------------------
00462 void
00463 TCPConvergenceLayer::Connection::handle_poll_activity()
00464 {
00465     if (sock_pollfd_->revents & POLLHUP) {
00466         log_info("remote socket closed connection -- returned POLLHUP");
00467         break_contact(ContactEvent::BROKEN);
00468         return;
00469     }
00470     
00471     if (sock_pollfd_->revents & POLLERR) {
00472         log_info("error condition on remote socket -- returned POLLERR");
00473         break_contact(ContactEvent::BROKEN);
00474         return;
00475     }
00476     
00477     // first check for write readiness, meaning either we're getting a
00478     // notification that the deferred connect() call completed, or
00479     // that we are no longer write blocked
00480     if (sock_pollfd_->revents & POLLOUT)
00481     {
00482         log_debug("poll returned write ready, clearing POLLOUT bit");
00483         sock_pollfd_->events &= ~POLLOUT;
00484             
00485         if (sock_->state() == oasys::IPSocket::CONNECTING) {
00486             int result = sock_->async_connect_result();
00487             if (result == 0 && sendbuf_.fullbytes() == 0) {
00488                 log_debug("delayed_connect to %s:%d succeeded",
00489                           intoa(sock_->remote_addr()), sock_->remote_port());
00490                 initiate_contact();
00491                 
00492             } else {
00493                 log_info("connection attempt to %s:%d failed... %s",
00494                          intoa(sock_->remote_addr()), sock_->remote_port(),
00495                          strerror(errno));
00496                 break_contact(ContactEvent::BROKEN);
00497             }
00498 
00499             return;
00500         }
00501         
00502         send_data();
00503     }
00504     
00505     //check that the connection was not broken during the data send
00506     if (contact_broken_)
00507     {
00508         return;
00509     }
00510     
00511     // finally, check for incoming data
00512     if (sock_pollfd_->revents & POLLIN) {
00513         recv_data();
00514         process_data();
00515 
00516         // Sanity check to make sure that there's space in the buffer
00517         // for a subsequent read_data() call
00518         if (recvbuf_.tailbytes() == 0) {
00519             log_err("process_data left no space in recvbuf!!");
00520         }
00521 
00522         if (! contact_broken_) {
00523             check_keepalive();
00524         }
00525 
00526     }
00527 
00528 }
00529 
00530 //----------------------------------------------------------------------
00531 void
00532 TCPConvergenceLayer::Connection::send_data()
00533 {
00534     // XXX/demmer this assertion is mostly for debugging to catch call
00535     // chains where the contact is broken but we're still using the
00536     // socket
00537     ASSERT(! contact_broken_);
00538     
00539     if (params_->test_write_delay_ != 0) {
00540         log_debug("send_data: sleeping for test_write_delay msecs %u",
00541                   params_->test_write_delay_);
00542         
00543         usleep(params_->test_write_delay_ * 1000);
00544     }
00545             
00546     log_debug("send_data: trying to drain %zu bytes from send buffer...",
00547               sendbuf_.fullbytes());
00548     ASSERT(sendbuf_.fullbytes() > 0);
00549     int cc = sock_->write(sendbuf_.start(), sendbuf_.fullbytes());
00550     if (cc > 0) {
00551         log_debug("send_data: wrote %d/%zu bytes from send buffer",
00552                   cc, sendbuf_.fullbytes());
00553         sendbuf_.consume(cc);
00554         
00555         if (sendbuf_.fullbytes() != 0) {
00556             log_debug("send_data: incomplete write, setting POLLOUT bit");
00557             sock_pollfd_->events |= POLLOUT;
00558 
00559         } else {
00560             if (sock_pollfd_->events & POLLOUT) {
00561                 log_debug("send_data: drained buffer, clearing POLLOUT bit");
00562                 sock_pollfd_->events &= ~POLLOUT;
00563             }
00564         }
00565     } else if (errno == EWOULDBLOCK) {
00566         log_debug("send_data: write returned EWOULDBLOCK, setting POLLOUT bit");
00567         sock_pollfd_->events |= POLLOUT;
00568         
00569     } else {
00570         log_info("send_data: remote connection unexpectedly closed: %s",
00571                  strerror(errno));
00572         break_contact(ContactEvent::BROKEN);
00573     }
00574 }
00575 
00576 //----------------------------------------------------------------------
00577 void
00578 TCPConvergenceLayer::Connection::recv_data()
00579 {
00580     // XXX/demmer this assertion is mostly for debugging to catch call
00581     // chains where the contact is broken but we're still using the
00582     // socket
00583     ASSERT(! contact_broken_);
00584     
00585     // this shouldn't ever happen
00586     if (recvbuf_.tailbytes() == 0) {
00587         log_err("no space in receive buffer to accept data!!!");
00588         return;
00589     }
00590     
00591     if (params_->test_read_delay_ != 0) {
00592         log_debug("recv_data: sleeping for test_read_delay msecs %u",
00593                   params_->test_read_delay_);
00594         
00595         usleep(params_->test_read_delay_ * 1000);
00596     }
00597             
00598     log_debug("recv_data: draining up to %zu bytes into recv buffer...",
00599               recvbuf_.tailbytes());
00600     int cc = sock_->read(recvbuf_.end(), recvbuf_.tailbytes());
00601     if (cc < 1) {
00602         log_info("remote connection unexpectedly closed");
00603         break_contact(ContactEvent::BROKEN);
00604         return;
00605     }
00606 
00607     log_debug("recv_data: read %d bytes, rcvbuf has %zu bytes",
00608               cc, recvbuf_.fullbytes());
00609     recvbuf_.fill(cc);
00610 }
00611 
00612 } // namespace dtn

Generated on Thu Jun 7 12:54:29 2007 for DTN Reference Implementation by  doxygen 1.5.1