TCPTunnel.cc

Go to the documentation of this file.
00001 /*
00002  * IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. By
00003  * downloading, copying, installing or using the software you agree to
00004  * this license. If you do not agree to this license, do not download,
00005  * install, copy or use the software.
00006  * 
00007  * Intel Open Source License 
00008  * 
00009  * Copyright (c) 2006 Intel Corporation. All rights reserved. 
00010  * 
00011  * Redistribution and use in source and binary forms, with or without
00012  * modification, are permitted provided that the following conditions are
00013  * met:
00014  * 
00015  *   Redistributions of source code must retain the above copyright
00016  *   notice, this list of conditions and the following disclaimer.
00017  * 
00018  *   Redistributions in binary form must reproduce the above copyright
00019  *   notice, this list of conditions and the following disclaimer in the
00020  *   documentation and/or other materials provided with the distribution.
00021  * 
00022  *   Neither the name of the Intel Corporation nor the names of its
00023  *   contributors may be used to endorse or promote products derived from
00024  *   this software without specific prior written permission.
00025  *  
00026  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00027  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00028  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
00029  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE INTEL OR
00030  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00031  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00032  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00033  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00034  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00035  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00036  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00037  */
00038 
00039 #include <oasys/io/NetUtils.h>
00040 #include "DTNTunnel.h"
00041 #include "TCPTunnel.h"
00042 
00043 namespace dtntunnel {
00044 
00045 //----------------------------------------------------------------------
00046 TCPTunnel::TCPTunnel()
00047     : IPTunnel("TCPTunnel", "/dtntunnel/tcp")
00048 {
00049 }
00050 
00051 //----------------------------------------------------------------------
00052 void
00053 TCPTunnel::add_listener(in_addr_t listen_addr, u_int16_t listen_port,
00054                         in_addr_t remote_addr, u_int16_t remote_port)
00055 {
00056     new Listener(this, listen_addr, listen_port,
00057                  remote_addr, remote_port);
00058 }
00059 
00060 //----------------------------------------------------------------------
00061 void
00062 TCPTunnel::new_connection(Connection* c)
00063 {
00064     oasys::ScopeLock l(&lock_, "TCPTunnel::new_connection");
00065     
00066     ConnTable::iterator i;
00067     ConnKey key(c->client_addr_,
00068                 c->client_port_,
00069                 c->remote_addr_,
00070                 c->remote_port_);
00071     
00072     i = connections_.find(key);
00073     
00074     if (i != connections_.end()) {
00075         log_err("got duplicate connection %s:%d -> %s:%d",
00076                 intoa(c->client_addr_),
00077                 c->client_port_,
00078                 intoa(c->remote_addr_),
00079                 c->remote_port_);
00080         return;
00081     }
00082 
00083     connections_.insert(ConnTable::value_type(key, c));
00084 }
00085 
00086 //----------------------------------------------------------------------
00087 void
00088 TCPTunnel::kill_connection(Connection* c)
00089 {
00090     oasys::ScopeLock l(&lock_, "TCPTunnel::kill_connection");
00091     
00092     ConnTable::iterator i;
00093     ConnKey key(c->client_addr_,
00094                 c->client_port_,
00095                 c->remote_addr_,
00096                 c->remote_port_);
00097     
00098     i = connections_.find(key);
00099 
00100     if (i == connections_.end()) {
00101         log_err("can't find connection to kill %s:%d -> %s:%d",
00102                 intoa(c->client_addr_),
00103                 c->client_port_,
00104                 intoa(c->remote_addr_),
00105                 c->remote_port_);
00106         return;
00107     }
00108 
00109     connections_.erase(i);
00110 }
00111 
00112 //----------------------------------------------------------------------
00113 void
00114 TCPTunnel::handle_bundle(dtn::APIBundle* bundle)
00115 {
00116     oasys::ScopeLock l(&lock_, "TCPTunnel::handle_bundle");
00117 
00118     log_debug("handle_bundle got %zu byte bundle", bundle->payload_.len());
00119     
00120     DTNTunnel::BundleHeader hdr;
00121     memcpy(&hdr, bundle->payload_.buf(), sizeof(hdr));
00122     hdr.client_port_ = htons(hdr.client_port_);
00123     hdr.remote_port_ = htons(hdr.remote_port_);
00124 
00125     Connection* conn;
00126     ConnTable::iterator i;
00127     ConnKey key(hdr.client_addr_,
00128                 hdr.client_port_,
00129                 hdr.remote_addr_,
00130                 hdr.remote_port_);
00131     
00132     i = connections_.find(key);
00133 
00134     if (i == connections_.end()) {
00135         log_info("new connection %s:%d -> %s:%d",
00136                  intoa(hdr.client_addr_),
00137                  hdr.client_port_,
00138                  intoa(hdr.remote_addr_),
00139                  hdr.remote_port_);
00140         
00141         conn = new Connection(this, &bundle->spec_.source,
00142                               hdr.client_addr_, hdr.client_port_,
00143                               hdr.remote_addr_, hdr.remote_port_);
00144         conn->start();
00145         connections_.insert(ConnTable::value_type(key, conn));
00146 
00147     } else {
00148         conn = i->second;
00149         ASSERT(conn != NULL);
00150     }
00151 
00152     conn->handle_bundle(bundle);
00153     return;
00154 }
00155 
00156 //----------------------------------------------------------------------
00157 TCPTunnel::Listener::Listener(TCPTunnel* t,
00158                               in_addr_t listen_addr, u_int16_t listen_port,
00159                               in_addr_t remote_addr, u_int16_t remote_port)
00160     : TCPServerThread("TCPTunnel::Listener",
00161                       "/dtntunnel/tcp/listener",
00162                       Thread::DELETE_ON_EXIT),
00163       tcptun_(t),
00164       listen_addr_(listen_addr),
00165       listen_port_(listen_port),
00166       remote_addr_(remote_addr),
00167       remote_port_(remote_port)
00168 {
00169     bind_listen_start(listen_addr, listen_port);
00170 }
00171 
00172 //----------------------------------------------------------------------
00173 void
00174 TCPTunnel::Listener::accepted(int fd, in_addr_t addr, u_int16_t port)
00175 {
00176     Connection* c = new Connection(tcptun_, DTNTunnel::instance()->dest_eid(),
00177                                    fd, addr, port, remote_addr_, remote_port_);
00178     tcptun_->new_connection(c);
00179     c->start();
00180 }
00181 
00182 //----------------------------------------------------------------------
00183 TCPTunnel::Connection::Connection(TCPTunnel* t, dtn_endpoint_id_t* dest_eid,
00184                                   in_addr_t client_addr, u_int16_t client_port,
00185                                   in_addr_t remote_addr, u_int16_t remote_port)
00186     : Thread("TCPTunnel::Connection", Thread::DELETE_ON_EXIT),
00187       Logger("TCPTunnel::Connection", "/dtntunnel/tcp/conn"),
00188       tcptun_(t),
00189       sock_(logpath_),
00190       queue_(logpath_),
00191       next_seqno_(0),
00192       client_addr_(client_addr),
00193       client_port_(client_port),
00194       remote_addr_(remote_addr),
00195       remote_port_(remote_port)
00196 {
00197     dtn_copy_eid(&dest_eid_, dest_eid);
00198 }
00199 
00200 //----------------------------------------------------------------------
00201 TCPTunnel::Connection::Connection(TCPTunnel* t, dtn_endpoint_id_t* dest_eid,
00202                                   int fd,
00203                                   in_addr_t client_addr, u_int16_t client_port,
00204                                   in_addr_t remote_addr, u_int16_t remote_port)
00205     : Thread("TCPTunnel::Connection", Thread::DELETE_ON_EXIT),
00206       Logger("TCPTunnel::Connection", "/dtntunnel/tcp/conn"),
00207       tcptun_(t),
00208       sock_(fd, client_addr, client_port, logpath_),
00209       queue_(logpath_),
00210       next_seqno_(0),
00211       client_addr_(client_addr),
00212       client_port_(client_port),
00213       remote_addr_(remote_addr),
00214       remote_port_(remote_port)
00215 {
00216     dtn_copy_eid(&dest_eid_, dest_eid);
00217 }
00218 
00219 //----------------------------------------------------------------------
00220 TCPTunnel::Connection::~Connection()
00221 {
00222     dtn::APIBundle* b;
00223     while(queue_.try_pop(&b)) {
00224         delete b;
00225     }
00226 }
00227 
00228 //----------------------------------------------------------------------
00229 void
00230 TCPTunnel::Connection::run()
00231 {
00232     DTNTunnel* tunnel = DTNTunnel::instance();
00233     u_int32_t send_seqno = 0;
00234     u_int32_t next_recv_seqno = 0;
00235     bool sock_eof = false;
00236 
00237     // header for outgoing bundles
00238     DTNTunnel::BundleHeader hdr;
00239     hdr.protocol_    = IPPROTO_TCP;
00240     hdr.seqno_       = 0;
00241     hdr.client_addr_ = client_addr_;
00242     hdr.client_port_ = htons(client_port_);
00243     hdr.remote_addr_ = remote_addr_;
00244     hdr.remote_port_ = htons(remote_port_);
00245     
00246     if (sock_.state() != oasys::IPSocket::ESTABLISHED) {
00247         int err = sock_.connect(remote_addr_, remote_port_);
00248         if (err != 0) {
00249             log_err("error connecting to %s:%d",
00250                     intoa(remote_addr_), remote_port_);
00251 
00252             // send an empty bundle back
00253             dtn::APIBundle* b = new dtn::APIBundle();
00254             memcpy(b->payload_.buf(sizeof(hdr)), &hdr, sizeof(hdr));
00255             b->payload_.set_len(sizeof(hdr));
00256             if (tunnel->send_bundle(b, &dest_eid_) != DTN_SUCCESS) {
00257                 tcptun_->kill_connection(this);
00258                 exit(1);
00259             }
00260             goto done;
00261         }
00262     }
00263 
00264     while (1) {
00265         struct pollfd pollfds[2];
00266 
00267         struct pollfd* msg_poll  = &pollfds[0];
00268         msg_poll->fd             = queue_.read_fd();
00269         msg_poll->events         = POLLIN;
00270         msg_poll->revents        = 0;
00271 
00272         struct pollfd* sock_poll = &pollfds[1];
00273         sock_poll->fd            = sock_.fd();
00274         sock_poll->events        = POLLIN | POLLERR;
00275         sock_poll->revents       = 0;
00276 
00277         // if the socket already had an eof, we just poll for activity
00278         // on the message queue
00279         log_debug("blocking in poll...");
00280         int nfds = sock_eof ? 1 : 2;
00281         int nready = oasys::IO::poll_multiple(pollfds, nfds, -1, NULL, logpath_);
00282         if (nready <= 0) {
00283             log_err("unexpected error in poll: %s", strerror(errno));
00284             goto done;
00285         }
00286 
00287         // check first for activity on the socket
00288         if (sock_poll->revents != 0) {
00289             dtn::APIBundle* b = new dtn::APIBundle();
00290 
00291             u_int payload_len = tunnel->max_size();
00292             char* bp = b->payload_.buf(sizeof(hdr) + payload_len);
00293             int ret = sock_.read(bp + sizeof(hdr), payload_len);
00294             if (ret < 0) {
00295                 log_err("error reading from socket: %s", strerror(errno));
00296                 delete b;
00297                 goto done;
00298             }
00299 
00300             hdr.seqno_ = ntohl(send_seqno++);
00301             b->payload_.set_len(sizeof(hdr) + ret);
00302             memcpy(b->payload_.buf(), &hdr, sizeof(hdr));
00303             if (tunnel->send_bundle(b, &dest_eid_) != DTN_SUCCESS)
00304                 exit(1);
00305             else
00306                 log_info("sent %d byte payload to dtn", ret);
00307 
00308             if (ret == 0) {
00309                 sock_eof = true;
00310             }
00311         }
00312         
00313         // now check for activity on the incoming bundle queue
00314         if (msg_poll->revents != 0) {
00315             dtn::APIBundle* b = queue_.pop_blocking();
00316             ASSERT(b);
00317 
00318             DTNTunnel::BundleHeader* recv_hdr =
00319                 (DTNTunnel::BundleHeader*)b->payload_.buf();
00320 
00321             u_int32_t recv_seqno = ntohl(recv_hdr->seqno_);
00322 
00323             // check the seqno match -- reordering should have been
00324             // handled before the bundle was put on the blocking
00325             // message queue
00326             if (recv_seqno != next_recv_seqno) {
00327                 log_err("got out of order bundle: seqno %d, expected %d",
00328                         recv_seqno, next_recv_seqno);
00329                 delete b;
00330                 goto done;
00331             }
00332             ++next_recv_seqno;
00333 
00334             u_int len = b->payload_.len() - sizeof(hdr);
00335 
00336             if (len == 0) {
00337                 log_info("got zero byte payload... closing connection");
00338                 sock_.close();
00339                 delete b;
00340                 goto done;
00341             }
00342             
00343             int cc = sock_.writeall(b->payload_.buf() + sizeof(hdr), len);
00344             if (cc != (int)len) {
00345                 log_err("error writing payload to socket: %s", strerror(errno));
00346                 delete b;
00347                 goto done;
00348             }
00349 
00350             log_info("sent %d byte payload to client", len);
00351             
00352             delete b;
00353         }
00354     }
00355 
00356  done:
00357     tcptun_->kill_connection(this);
00358 }
00359 
00360 //----------------------------------------------------------------------
00361 void
00362 TCPTunnel::Connection::handle_bundle(dtn::APIBundle* bundle)
00363 {
00364     DTNTunnel::BundleHeader* hdr =
00365         (DTNTunnel::BundleHeader*)bundle->payload_.buf();
00366     
00367     u_int32_t recv_seqno = ntohl(hdr->seqno_);
00368     
00369     // if it's out of order, stick it in the reorder table and return
00370     if (recv_seqno != next_seqno_)
00371     {
00372         log_info("got out of order bundle: expected seqno %d, got %d",
00373                  next_seqno_, recv_seqno);
00374         
00375         reorder_table_[recv_seqno] = bundle;
00376         return;
00377     }
00378 
00379     // deliver the one that just arrived
00380     log_info("delivering %zu byte bundle with seqno %d",
00381              bundle->payload_.len(), recv_seqno);
00382     queue_.push_back(bundle);
00383     next_seqno_++;
00384     
00385     // once we get one that's in order, that might let us transfer
00386     // more bundles out of the reorder table and into the queue
00387     ReorderTable::iterator iter;
00388     while (1) {
00389         iter = reorder_table_.find(next_seqno_);
00390         if (iter == reorder_table_.end()) {
00391             break;
00392         }
00393 
00394         bundle = iter->second;
00395         log_info("delivering %zu byte bundle with seqno %d (from reorder table)",
00396                  bundle->payload_.len(), next_seqno_);
00397         
00398         reorder_table_.erase(iter);
00399         next_seqno_++;
00400         
00401         queue_.push_back(bundle);
00402     }
00403 }
00404 
00405 } // namespace dtntunnel
00406 

Generated on Fri Dec 22 14:48:01 2006 for DTN Reference Implementation by  doxygen 1.5.1